1 回限りの配信

このページでは、Pub/Sub の 1 回限りの機能を使用してメッセージを受信して確認応答する方法について説明します。これにより、メッセージの重複処理を追跡して防止できます。この機能を有効にすると、Pub/Sub では次のセマンティックが提供されます。

  • サブスクライバーは、メッセージの確認応答が正常に完了したかどうかを確認できます。

  • メッセージの確認応答が正常に完了すると、再配信は行われません。

  • メッセージが未処理の間、再配信は発生しません。メッセージは、確認応答期限が切れるか、確認応答が行われるまで、未処理とみなされます。

  • 複数の有効な配信が存在する場合は、確認応答期限の経過またはクライアントが開始した否定確認応答により、メッセージの確認応答に最新の確認応答 ID のみを使用できます。以前の確認応答 ID を持つリクエストは失敗します。

1 回限りの機能が有効になっている場合、サブスクライバーは次のガイドラインに従って、メッセージが 1 回だけ処理されるようにすることができます。

  • 確認応答期限内にメッセージを確認応答します。

  • メッセージの処理の進行状況に関する情報を、正常に確認応答されるまで保持します。

  • メッセージの処理の進行状況に関する情報を使用して、確認応答が失敗した場合の重複作業を防ぎます。

StreamingPull API を使用するサブスクライバーを含め、pull サブスクリプション タイプのみが 1 回限りの配信をサポートします。push とエクスポートのサブスクリプションでは 1 回限りの配信はサポートされていません。

Pub/Sub では、Pub/Sub で定義された一意のメッセージ ID に基づき、クラウド リージョン内で exactly-once(1 回限り)の配信がサポートされます。

再納品と重複

想定されている再配信と予期しない再配信の違いを理解することが重要です。

  • 再配信は、メッセージに対してクライアントによる否定確認応答が行われた場合、または確認応答期限が切れる前にクライアントが確認応答期限を延長しなかった場合のいずれかか原因で発生することがあります。再配信は有効であり、システムは意図したとおりに動作しているとみなされます。

    再配信のトラブルシューティングについては、重複の処理をご覧ください。

  • 重複とは、確認応答が成功した後、または確認応答期限が切れる前にメッセージが再送信された場合を指します。

  • 再配信されたメッセージには、再配信の試行までの同じメッセージ ID が保持されます。

1 回限りの配信が有効になっているサブスクリプションに対して、配信が重複しないことが保証されています。

クライアント ライブラリでの 1 回限りの配信のサポート

  • サポートされているクライアント ライブラリには、レスポンス付きの確認応答のインターフェースがあります(例: Go)。このインターフェースを使用して、確認応答リクエストが成功したかどうかを確認できます。確認応答リクエストが成功した場合、クライアントは再配信を受信しないことが保証されます。確認応答リクエストが失敗した場合、クライアントは再配信を想定できます。

  • クライアントは、確認応答インターフェースなしで、サポートされているクライアント ライブラリを使用することもできます。ただし、このような確認応答の失敗によってメッセージのサイレント再配信が生じる可能性があります。

  • サポートされているクライアント ライブラリには、最小リース延長時間を設定するためのインターフェースがあります(例: Go)。ネットワーク関連の確認応答の有効期限切れを回避するには、最小リース延長の値を大きな数値に設定する必要があります。最大値は 600 秒に設定されています。

  • Java クライアント ライブラリを使用しており、setChannelProvider() メソッドを使用してカスタム gRPC Channel でサブスクライバーを初期化する場合は、TransportChannelProvider をビルドするときに maxInboundMetadataSize を 1 MB 以上に設定することをおすすめします。この構成では、InstantiatingGrpcChannelProvider.Builder.setMaxInboundMetadataSize() メソッドまたは ManagedChannelBuilder.maxInboundMetadataSize() メソッドを使用できます。

1 回限りの配信に関連する変数のデフォルト値と範囲、および変数の名前は、クライアント ライブラリによって異なる場合があります。たとえば、Java クライアント ライブラリでは、次の変数が 1 回限りの配信を制御します。

変数 説明
setEnableExactlyOnceDelivery 1 回限りの配信を有効または無効にします。 true または false(デフォルト =false)
minDurationPerAckExtension 確認応答期限の変更に使用する最小時間(秒単位)。 範囲=0~600(デフォルト =none)
maxDurationPerAckExtension 確認応答期限の変更の延長に使用する最大時間(秒単位)。 範囲=0~600(デフォルト =none)

1 回限りの配信の場合、確認応答 ID がすでに期限切れの場合、PubASub への modifyAckDeadline または acknowledgment リクエストが失敗します。この場合、新しい配信がすでに処理中の可能性があるため、サービスは期限切れの確認応答 ID を無効と見なします。これは 1 回限りの配信の配信のための設計です。acknowledgment リクエストと ModifyAckDeadline リクエストが INVALID_ARGUMENT レスポンスを返すことがわかります。1 回限りの配信の配信が無効になっていると、確認応答 ID が期限切れになった場合にこれらのリクエストは OK を返します。

acknowledgment リクエストと ModifyAckDeadline リクエストに有効な確認応答 ID が含まれるようにするには、minDurationPerAckExtension の値を大きな数値に設定することを検討してください。

リージョンに関する考慮事項

1 回限りの配信の保証は、サブスクライバーが同じリージョンでサービスに接続する場合にのみ適用されます。サブスクライバー アプリケーションが複数のリージョンに分散されていると、1 回限りの配信が有効になっている場合でも、メッセージの重複配信が発生する可能性があります。パブリッシャーは任意のリージョンにメッセージを送信でき、1 回限りの保証は維持されます。

Trusted Cloud内でアプリケーションを実行すると、デフォルトで同じリージョンの Pub/Sub エンドポイントに接続されます。したがって、 Trusted Cloud内の単一リージョンでアプリケーションを実行すると、通常は単一リージョンとやり取りすることになります。

Trusted Cloudの外部または複数のリージョンでサブスクライバー アプリケーションを実行している場合は、Pub/Sub クライアントの構成時にロケーション エンドポイントを使用することで、単一リージョンに接続していることを保証できます。Pub/Sub のすべてのロケーション エンドポイントは、単一のリージョンを指します。ロケーション エンドポイントの詳細については、Pub/Sub エンドポイントをご覧ください。Pub/Sub のすべてのロケーション エンドポイントのリストについては、ロケーション エンドポイントのリストをご覧ください。

1 回限りの配信のサブスクリプションを作成する

Trusted Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用して、1 回限りの配信を行うサブスクリプションを作成できます。

pull サブスクリプション

Console

1 回限りの配信を行う pull サブスクリプションを作成するには、次の手順を行います。

  1. Trusted Cloud コンソールで、[サブスクリプション] ページに移動します。

    [サブスクリプション] に移動

  2. [サブスクリプションを作成] をクリックします。

  3. [サブスクリプション ID] を入力します。

  4. プルダウン メニューからトピックを選択するか、作成します。

    サブスクリプションがトピックからメッセージを受信します。

  5. [1 回限りの配信] セクションで、[1 回限りの配信を有効にする] を選択します。

  6. [作成] をクリックします。

gcloud

1 回限りの配信を行う pull サブスクリプションを作成するには、--enable-exactly-once-delivery フラグを指定して gcloud pubsub subscriptions create コマンドを使用します。

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

以下を置き換えます。

  • SUBSCRIPTION_ID: 作成するサブスクリプションの ID
  • TOPIC_ID: サブスクリプションに関連付けるトピックの ID

REST

1 回限りの配信を行うサブスクリプションを作成するには、projects.subscriptions.create メソッドを使用します。

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • PROJECT_ID: サブスクリプションを作成するプロジェクトのプロジェクト ID
  • SUBSCRIPTION_ID: 作成するサブスクリプションの ID

1 回限りの配信を行う pull サブスクリプションを作成するには、リクエスト本文に次のように指定します。

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

以下を置き換えます。

  • PROJECT_ID: トピックが含まれるプロジェクトのプロジェクト ID
  • TOPIC_ID: サブスクリプションに関連付けるトピックの ID

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API リファレンス ドキュメントをご覧ください。


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。まだ v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 コードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbSub := &pubsubpb.Subscription{
		Name:                      subscription,
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	}
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, pbSub)
	if err != nil {
		return fmt.Errorf("failed to create exactly once sub: %w", err)
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`,
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().',
  );
}

Ruby

次のサンプルでは、Ruby Pub/Sub クライアント ライブラリ v3 を使用しています。v2 ライブラリをまだ使用している場合は、 v3 への移行ガイドをご覧ください。Ruby v2 のコードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new project_id: project_id
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_exactly_once_delivery: true

puts "Created subscription with exactly once delivery enabled: " \
     "#{subscription_id}"

PHP

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API リファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

1 回限りの配信サブスクリプションをモニタリングする

subscription/exactly_once_warning_count 指標は、再配信につながる可能性のあるイベントの数(有効または重複)を記録します。この指標は、Pub/Sub が確認応答 ID(ModifyAckDeadline リクエストまたは acknowledgment リクエスト)に関連付けられたリクエストの処理に失敗した回数をカウントします。失敗の原因は、サーバーベースまたはクライアント ベースのいずれかです。たとえば、1 回限りの配信情報を維持するために使用される永続性レイヤが使用できない場合、サーバーベースのイベントになります。クライアントが無効な確認応答 ID でメッセージを確認応答しようとすると、クライアント ベースのイベントになります。

指標について

subscription/exactly_once_warning_count は、実際の再配信につながる可能性がある、またはないイベントであり、クライアントの動作によってはノイズの原因となるイベントをキャプチャします。たとえば、確認応答 ID が無効な acknowledgment リクエストまたは ModifyAckDeadline リクエストを繰り返すと、指標が繰り返し増加します。

次の指標は、クライアントの動作の把握する際にも有効です。

  • subscription/expired_ack_deadlines_count 指標は、確認応答 ID の有効期限切れの数を示します。確認応答 ID の有効期限が切れると、ModifyAckDeadline リクエストと acknowledgment リクエストの両方が失敗する可能性があります。

  • service.serviceruntime.googleapis.com/api/request_count 指標を使用すると、リクエストが Trusted Cloud by S3NS に到達したにもかかわらず Pub/Sub に到達しない場合の ModifyAckDeadline リクエストまたは acknowledgment リクエストの失敗をキャプチャできます。クライアントが Trusted Cloud by S3NSから切断された場合など、この指標ではキャプチャできない障害があります。

再試行可能な障害イベントでは、ほとんどの場合、サポートされているクライアント ライブラリによってリクエストが自動的に再試行されます。

割り当て

1 回限りの配信サブスクリプションには、追加の割り当て要件が適用されます。これらの割り当ては、次のものに適用されます。

  • 1 回限りの配信が有効になっているサブスクリプションから消費されたリージョンごとのメッセージ数。
  • 1 回限りの配信が有効になっているサブスクリプションを使用する場合に、確認応答済みのメッセージまたは期限が延長されたメッセージのリージョンごとの数。

これらの割り当ての詳細については、割り当てのトピックの表をご覧ください。

1 回限りの配信と順序指定サブスクリプション

Pub/Sub では、順序付けられた配信による 1 回限りの配信がサポートされています。

1 回限りの配信で順序付けを使用する場合、Pub/Sub は確認応答が順序どおりであることを想定しています。確認応答が順不同である場合、サービスへのリクエストは一時的なエラーによって失敗します。配信に対する正しい確認応答が行われる前に確認応答期限が切れると、クライアントはメッセージの再配信を受け取ります。そのため、1 回限りの配信で順序指定を使用する場合、クライアントのスループットは 1 秒あたり数千件のメッセージに制限されます。

1 回限りの配信と push サブスクリプション

Pub/Sub は、pull サブスクリプションでのみ 1 回限りの配信をサポートします。

push サブスクリプションからのメッセージを使用するクライアントは、push リクエストに正常なレスポンスで応答することによってメッセージの確認応答を行います。しかし、クライアントには、Pub/Sub サブスクリプションがレスポンスを受信して処理したかどうかはわかりません。これは、クライアントによって確認応答リクエストが開始され、リクエストが正常に処理されると Pub/Sub サブスクリプションが応答する pull サブスクリプションと異なります。このため、1 回限りの配信セマンティクスは、push サブスクリプションと一致しません。

知っておくべきこと

  • CreateSubscription で確認応答期限が指定されていない場合、1 回限りの配信が有効になっているサブスクリプションには、60 秒のデフォルトの確認応答期限が設定されます。

  • デフォルトの確認応答期限が長いほど、ネットワーク イベントによる再配信を回避できる可能性が高まります。サポートされているクライアント ライブラリでは、デフォルトのサブスクリプション確認応答期限は使用されません。

  • 1 回限りの配信サブスクリプションでは、通常のサブスクリプションよりもパブリッシュ/サブスクライブ間のレイテンシが大幅に高くなります。

  • 高スループットが必要な場合は、1 回限りの配信クライアントでもストリーミング pull を使用する必要があります。

  • 1 回限りの配信が有効になっていても、パブリッシュ側の重複によって、サブスクリプションが同じメッセージの複数のコピーを受信する場合があります。パブリッシュ側の重複は、パブリッシュを行うクライアントまたは Pub/Sub サービスによる複数の一意のパブリッシュの再試行が原因で発生する可能性があります。再試行を複数回実施する際にパブリッシュを行うクライアントが複数回の一意のパブリッシュを行うと、異なる メッセージ ID を持つ再配信につながります。クライアントのパブリッシュ リクエストに応答するための Pub/Sub サービスによる複数回の一意のパブリッシュが、同じ メッセージ ID を持つ再配信につながります。

  • subscription/exactly_once_warning_count のエラーは再試行できます。サポートされているクライアント ライブラリは、これらのエラーを自動的に再試行します。ただし、無効な確認応答 ID に関連するエラーは再試行できません。