仅传送一次

本页介绍了如何使用 Pub/Sub 的一次性传送功能来接收和确认消息,该功能可让您跟踪消息并防止重复处理消息。启用该功能后,Pub/Sub 将提供以下语义:

  • 订阅者可以确定消息确认是否成功。

  • 在成功确认消息后,不会再重新传送。

  • 在消息未处理期间,不会发生重新传送。在确认截止时间到期或消息被确认之前,消息都被视为未完成。

  • 如果由于确认截止时间到期或客户端发起的否定确认而导致多次有效传送,则只能使用最新的确认 ID 来确认消息。任何具有之前确认 ID 的请求都会失败。

启用“只处理一次”功能后,订阅者可以按照以下准则确保消息只处理一次:

  • 在确认时限内确认消息。

  • 维护有关处理消息进度的信息,直到成功确认消息。

  • 使用有关消息处理进度的信息,以防止在确认失败时重复工作。

只有拉取订阅类型支持“正好一次”传送,包括使用 StreamingPull API 的订阅者。推送订阅和导出订阅不支持“正好一次”传送。

Pub/Sub 支持在云区域内基于 Pub/Sub 定义的唯一消息 ID 实现“正好一次”传送。

重新提交与重复

请务必了解预期重新传送与意外重新传送之间的区别。

  • 重新传送可能因客户端主动对消息进行否定确认而发生,也可能因客户端在确认截止时间到期之前未延长消息的确认截止时间而发生。重新传送被视为有效,系统按预期运行。

    如需排查重新传送问题,请参阅处理重复项

  • 重复是指在成功确认消息后或在确认截止期限到期之前重新发送消息。

  • 重新传送的消息在多次重新传送尝试中会保留相同的消息 ID。

启用“仅传送一次”功能的订阅不会收到重复传送的消息。

客户端库中的“仅传送一次”支持

  • 受支持的客户端库具有用于确认响应的接口(例如:Go)。您可以使用此接口检查确认请求是否成功。 如果确认请求成功,则可以保证客户端不会收到重新传送的消息。如果确认请求失败,客户端可以预期会重新传送。

  • 客户端还可以使用受支持的客户端库,而无需确认接口。不过,在这种情况下,确认失败可能会导致消息静默重新传送。

  • 受支持的客户端库具有用于设置最短租期延长时间的接口(例如:Go)。您必须将最小租约延期时间的值设置为一个较大的数字,以避免任何与网络相关的确认过期。 最大值设置为 600 秒。

  • 如果您使用的是 Java 客户端库,并且使用 setChannelProvider() 方法通过自定义 gRPC Channel 初始化订阅者,建议您在构建 TransportChannelProvider 时将 maxInboundMetadataSize 设置为至少 1MB。对于此配置,您可以使用 InstantiatingGrpcChannelProvider.Builder.setMaxInboundMetadataSize()ManagedChannelBuilder.maxInboundMetadataSize() 方法。

与“仅一次”传送相关的变量的默认值和范围以及变量名称可能因客户端库而异。例如,在 Java 客户端库中,以下变量用于控制“仅一次”传送。

变量 说明
setEnableExactlyOnceDelivery 启用或停用“仅传送一次”功能。 true 或 false 默认值=false
minDurationPerAckExtension 用于延长修改确认时限的最短时间(以秒为单位)。 范围=0 到 600 默认值=无
maxDurationPerAckExtension 用于延长修改确认时限的最长时间(以秒为单位)。 范围=0 到 600 默认值=无

如果采用“正好一次”传送,当确认 ID 已过期时,向 Pub/Sub 发出的 modifyAckDeadlineacknowledgment 请求会失败。在这种情况下,由于可能已有新的递送在进行中,因此服务会将过期的确认 ID 视为无效。这是为实现“仅一次”传送而设计的。然后,您会看到 acknowledgmentModifyAckDeadline 请求返回 INVALID_ARGUMENT 响应。如果停用“仅传送一次”功能,当确认 ID 过期时,这些请求会返回 OK

为确保 acknowledgmentModifyAckDeadline 请求具有有效的确认 ID,请考虑将 minDurationPerAckExtension 的值设置为较高的数字。

地区注意事项

仅当订阅者连接到同一区域中的服务时,系统才会保证仅传送一次。如果订阅者应用分布在多个区域,即使启用了“仅传送一次”功能,也可能会导致消息重复传送。发布者可以向任何区域发送消息,并且仍可保证“只发送一次”。

在 Trusted Cloud中运行应用时,默认情况下,应用会连接到同一区域中的 Pub/Sub 端点。因此,在 Trusted Cloud内的单个区域中运行应用通常可确保您与单个区域进行交互。

当您在 Trusted Cloud之外或多个区域中运行订阅者应用时,您可以在配置 Pub/Sub 客户端时使用位置端点,以确保您连接到单个区域。Pub/Sub 的所有位置端点都指向单个区域。如需详细了解位置级端点,请参阅 Pub/Sub 端点。 如需查看 Pub/Sub 的所有位置级端点的列表,请参阅位置级端点列表

创建采用“仅传送一次”功能的订阅

您可以使用 Trusted Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API 创建具有“仅一次”传送保证的订阅。

拉取订阅

控制台

如需创建具有“仅一次”传送功能的拉取订阅,请按以下步骤操作:

  1. 在 Trusted Cloud 控制台中,前往订阅页面。

    前往订阅页面

  2. 点击创建订阅

  3. 输入订阅 ID

  4. 从下拉菜单中选择或创建一个主题。

    订阅将接收来自该主题的消息。

  5. 仅传送一次部分,选择启用“仅传送一次”

  6. 点击创建

gcloud

如需创建具有“仅一次”传送功能的拉取订阅,请将 gcloud pubsub subscriptions create 命令与 --enable-exactly-once-delivery 标志搭配使用:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

替换以下内容:

  • SUBSCRIPTION_ID:要创建的订阅的 ID
  • TOPIC_ID:要附加到订阅的主题的 ID

REST

如需创建具有“仅一次”传送功能的订阅,请使用 projects.subscriptions.create 方法。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

替换以下内容:

  • PROJECT_ID:要在其中创建订阅的项目的 ID
  • SUBSCRIPTION_ID:要创建的订阅的 ID

如需创建具有“仅一次”传送的拉取订阅,请在请求正文中指定此设置:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

替换以下内容:

  • PROJECT_ID:包含主题的项目的 ID
  • TOPIC_ID:要附加到订阅的主题的 ID

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbSub := &pubsubpb.Subscription{
		Name:                      subscription,
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	}
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, pbSub)
	if err != nil {
		return fmt.Errorf("failed to create exactly once sub: %w", err)
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Ruby

以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new project_id: project_id
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_exactly_once_delivery: true

puts "Created subscription with exactly once delivery enabled: " \
     "#{subscription_id}"

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

监控“仅传送一次”订阅

subscription/exactly_once_warning_count 指标用于记录可能导致重新传送的事件(有效或重复)数量。此指标用于统计 Pub/Sub 未能处理与确认 ID(ModifyAckDeadlineacknowledgment 请求)关联的请求的次数。失败的原因可能是服务器端或客户端问题。例如,如果用于维护“正好一次”传送信息的持久层不可用,则属于服务器端事件。如果客户端尝试使用无效的确认 ID 来确认消息,则会发生基于客户端的事件。

了解指标

subscription/exactly_once_warning_count 捕获可能或可能不会导致实际重新传送的事件,并且可能会因客户端行为而产生噪声。例如:重复的 acknowledgmentModifyAckDeadline 请求(具有无效的确认 ID)会反复递增相应指标。

以下指标也有助于了解客户端行为:

在大多数可以重试的失败事件中,受支持的客户端库会自动重试请求。

配额

“仅传送一次”订阅需满足额外的配额要求。以下情况会强制执行这些配额:

  • 已从启用了“仅传送一次”功能的订阅中消费的消息数(按区域划分)。
  • 使用启用了“仅传送一次”功能的订阅时,每个区域中已确认或时限已延长的消息数量。

如需详细了解这些配额,请参阅配额主题中的表格。

“仅传送一次”和“按顺序传送”订阅

Pub/Sub 支持通过按序传送实现“正好一次”传送。

将排序与“正好一次”传送模式搭配使用时,Pub/Sub 会要求确认按顺序进行。如果确认消息的顺序不正确,服务会因临时错误而导致请求失败。如果在按顺序确认交付之前确认时限到期,客户端将收到重新交付的消息。因此,当您使用具有“仅一次”传送语义的排序时,客户端吞吐量会限制在每秒数千条消息的数量级。

“仅传送一次”和推送订阅

Pub/Sub 仅支持通过拉取订阅实现“正好一次”传送。

从推送订阅中消费消息的客户端通过以成功响应来回复推送请求,从而确认消息。不过,客户端不知道 Pub/Sub 订阅是否已收到响应并对其进行了处理。这与拉取订阅不同,在拉取订阅中,确认请求由客户端发起,如果请求成功处理,Pub/Sub 订阅会做出响应。因此,“正好一次”传送语义与推送订阅不太契合。

注意事项

  • 如果在 CreateSubscription 时未指定确认期限,则启用“仅传送一次”功能的订阅的默认确认期限为 60 秒。

  • 较长的默认确认期限有助于避免因网络事件而导致的重新传送。受支持的客户端库不使用默认的订阅确认时限。

  • 与常规订阅相比,仅传送一次订阅的发布到订阅延迟时间要长得多。

  • 如果您需要高吞吐量,则“正好一次”传送客户端还必须使用流式拉取

  • 即使启用了“仅传送一次”功能,订阅也可能会因发布方重复发布而收到同一消息的多个副本。发布端重复可能是由发布客户端或 Pub/Sub 服务多次重试发布造成的。发布客户端在重试期间多次发布唯一消息,会导致重新传送具有不同 消息 ID 的消息。Pub/Sub 服务多次发布唯一消息以响应客户端的发布请求,会导致重新传送具有相同 消息 ID 的消息。

  • 您可以在 subscription/exactly_once_warning_count 中重试失败的操作,并且受支持的客户端库会自动重试这些操作。不过,与无效确认 ID 相关的失败无法重试。