本文档简要介绍了拉取订阅、其工作流和相关属性。
在拉取订阅中,订阅方客户端向 Pub/Sub 服务器请求消息。
拉取模式可以使用 Pull 或 StreamingPull 这两个服务 API 中的一个。如需运行所选 API,您可以选择 Google 提供的高级客户端库,也可以选择自动生成的低级客户端库。您还可以选择异步或同步消息处理。
准备工作
在阅读本文档之前,请确保您熟悉以下内容:
Pub/Sub 的工作原理以及不同的 Pub/Sub 术语。
Pub/Sub 支持的不同订阅类型,以及您可能想要使用拉取订阅的原因。
拉取订阅工作流
对于拉取订阅,订阅者客户端会向 Pub/Sub 服务器发起请求以检索消息。订阅者客户端使用以下 API 之一:
大多数订阅者客户端不会直接发出这些请求。相反,客户端依赖于 Trusted Cloud by S3NS提供的高级客户端库,该库会在内部执行流式拉取请求并异步传送消息。对于需要更好地控制消息拉取方式的订阅者客户端,Pub/Sub 使用低级且自动生成的 gRPC 库。此库直接发出拉取或流式拉取请求。这些请求可以是同步的,也可以是异步的。
以下两张图片展示了订阅方客户端与拉取订阅之间的工作流程。


拉取工作流
拉取工作流如下(请参阅图 1):
- 订阅者客户端明确调用
pull
方法,该方法将请求待传送的消息。此请求为PullRequest
,如图所示。 Pub/Sub 服务器会返回零条或多条消息以及确认 ID。包含零条消息或包含错误的响应并不一定表示没有可接收的消息。此响应即为图片中所示的
PullResponse
。订阅者客户端明确调用
acknowledge
方法。客户端使用返回的确认 ID 来确认消息已处理,无需再次传送。
对于单个流式拉取请求,订阅者客户端可能会因连接处于打开状态而收到多个响应。相比之下,每个拉取请求仅返回一个响应。
拉取订阅的属性
您为拉取订阅配置的属性决定了如何将消息写入订阅。如需了解详情,请参阅订阅属性。
Pub/Sub 服务 API
Pub/Sub 拉取订阅可以使用以下两个 API 之一来检索消息:
- 拉取
- StreamingPull
当您使用这些 API 接收消息时,请使用一元 Acknowledge 和 ModifyAckDeadline RPC。以下部分介绍了这两种 Pub/Sub API。
StreamingPull API
在可能的情况下,Pub/Sub 客户端库使用 StreamingPull 来最大限度提高吞吐量并缩短延迟时间。尽管您可能永远不会直接使用 StreamingPull API,但了解它与 Pull API 的不同之处却很重要。
StreamingPull API 依赖持久双向连接来接收多条消息(当有消息时)。工作流程如下:
客户端向服务器发送建立连接的请求。 如果超出连接配额,服务器会返回资源耗尽错误。客户端库会自动重试配额不足错误。
如果没有错误,或者连接配额再次可用,服务器会持续向连接的客户端发送消息。
如果或当吞吐量配额超出时,服务器会停止发送消息。不过,连接并未中断。当有足够的吞吐量配额再次可用时,数据流就会恢复。
客户端或服务器最终会关闭连接。
StreamingPull API 会保持连接处于打开状态。Pub/Sub 服务器会在一段时间后定期关闭连接,以避免长时间运行的粘性连接。客户端库会自动重新打开 StreamingPull 连接。
消息会在可用时发送到连接。因此,StreamingPull API 可最大限度地缩短消息延迟时间并最大限度地提高消息吞吐量。
详细了解 StreamingPull RPC 方法:StreamingPullRequest 和 StreamingPullResponse。
Pull API
此 API 是一种基于请求和响应模型的传统一元 RPC。单个拉取响应对应于单个拉取请求。 工作流程如下:
客户端向服务器发送消息请求。 如果超出吞吐量配额,服务器会返回资源耗尽错误。
如果没有错误或吞吐量配额再次可用,服务器会回复零条或多条消息和确认 ID。
使用一元 Pull API 时,如果响应中没有消息或包含错误,并不一定表示没有可接收的消息。
使用 Pull API 无法保证低延迟和高消息吞吐量。若要通过 Pull API 实现高吞吐量和低延迟时间,您必须同时拥有多个待处理的请求。当旧请求收到响应时,系统会创建新请求。设计此类解决方案容易出错且难以维护。我们建议您针对此类使用情形使用 StreamingPull API。
仅当您需要严格控制以下方面时,才使用 Pull API 而不是 StreamingPull API:
- 订阅方客户端可以处理的消息数量
- 客户端内存和资源
如果订阅者是 Pub/Sub 与另一个以拉取方式运行的服务之间的代理,您也可以使用此 API。
详细了解 Pull REST 方法:方法:projects.subscriptions.pull。
详细了解 Pull RPC 方法:PullRequest 和 PullResponse。
消息处理模式的类型
为订阅者客户端选择以下拉取模式之一。
异步拉取模式
异步拉取模式可将订阅方客户端中消息的接收与消息的处理分离开。对于大多数订阅者客户端,此模式是默认模式。异步拉取模式可以使用 StreamingPull API 或一元 Pull API。异步拉取还可以使用高级客户端库或低级自动生成的客户端库。
您可以在本文档的后面部分详细了解客户端库。
同步拉取模式
在同步拉取模式下,消息的接收和处理按顺序进行,彼此之间没有分离。因此,与 StreamingPull API 和一元 Pull API 类似,异步处理比同步处理具有更低的延迟和更高的吞吐量。
仅当低延迟和高吞吐量与其他要求相比不是最重要的因素时,才使用同步拉取模式。例如,应用可能仅限于使用同步编程模型。或者,受资源限制的应用可能需要更精确地控制内存、网络或 CPU。在这种情况下,请使用单向 Pull API 的同步模式。
Pub/Sub 客户端库
Pub/Sub 提供高级和低级自动生成的客户端库。
高级 Pub/Sub 客户端库
高级客户端库提供了一些选项,可用于通过租期管理来控制确认时限。与在订阅级别使用控制台或 CLI 配置确认期限相比,这些选项的精细程度更高。高级客户端库还实现了对有序传送、精确一次传送和流量控制等功能的支持。
我们建议使用高级别客户端库中的异步拉取和 StreamingPull API。并非所有支持Trusted Cloud by S3NS 的语言都支持高级客户端库中的 Pull API。
如需使用高级客户端库,请参阅 Pub/Sub 客户端库。
低级自动生成的 Pub/Sub 客户端库
如果必须直接使用 Pull API,可以使用低级客户端库。您可以使用低级自动生成的客户端库进行同步或异步处理。使用低级自动生成的客户端库时,您必须手动对有序传送、精确一次传送、流量控制和租约管理等功能进行编码。
当您使用低级自动生成的客户端库时,可以采用同步处理模型,该模型适用于所有受支持的语言。在直接使用拉取 API 有意义的情况下,您可以使用低级自动生成的客户端库和同步拉取。例如,您可能已有依赖于此模型的应用逻辑。
如需直接使用低级自动生成的客户端库,请参阅 Pub/Sub API 概览。
客户端库代码示例
StreamingPull 和高级客户端库代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级客户端库检索自定义属性
以下示例展示了如何异步拉取消息以及从元数据中检索自定义属性。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Go
以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
使用高级别客户端库处理错误
以下示例展示了如何处理订阅消息时出现的错误。
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
Go
以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Ruby
以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
一元拉取代码示例
C++
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
C#
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档。
Java
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档。
Node.js
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
PHP
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档。
Ruby
以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例。
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
协议
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
"returnImmediately": "false",
"maxMessages": "1"
}
响应:
200 OK
{
"receivedMessages": [{
"ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
"message": {
"data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
"messageId": "19917247034"
}
}]
}
请求:
POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
"ackIds": [
"dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
]
}
Python
在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档。
Pub/Sub 提供消息列表。 如果该列表包含多条消息,则 Pub/Sub 会对排序键相同的消息进行排序。以下是一些重要注意事项:
即使积压的消息数量达到
max_messages
,在请求中为max_messages
设置值也并不保证系统会返回max_messages
条消息。Pub/Sub Pull API 可能会返回少于max_messages
条消息,以缩短可随时传送的消息的传送延迟时间。如果拉取响应中包含 0 条消息,则不得将其用作积压中没有消息的指示。您可能会收到包含 0 条消息的响应,并在后续请求中收到消息。
要通过一元拉取模式来实现较短的消息传送延迟时间,同时拥有许多待处理的拉取请求至关重要。随着主题吞吐量的增加,需要更多的拉取请求。 通常,对于某些不宜延迟的应用,StreamingPull 模式更合适。
配额和限制
Pull 和 StreamingPull 连接均受配额和限制的约束。如需了解详情,请参阅 Pub/Sub 配额和限制。
后续步骤
为您的主题创建拉取订阅。
排查拉取订阅问题。
使用 gcloud CLI 创建或修改订阅。
使用 REST API 创建或修改订阅。
使用 RPC API 创建或修改订阅。