發布失敗通常是因用戶端瓶頸所致,例如服務 CPU 不足、執行緒健康狀態不佳或網路壅塞。發布者重試政策會定義 Pub/Sub 嘗試傳送訊息的次數,以及每次嘗試之間的時間長度。
本文提供如何對發布至主題的訊息使用重試要求。
事前準備
設定發布工作流程前,請務必完成下列工作:
必要的角色
如要取得權限,以便對主題重試訊息要求,請要求管理員授予主題的 Pub/Sub 發布者 (roles/pubsub.publisher
) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
您需要其他權限,才能建立或更新主題和訂閱項目。
關於重試要求
重試設定可控管 Pub/Sub 用戶端程式庫重試發布要求的次數。用戶端程式庫具有下列任一重試設定:
- 初始要求逾時:用戶端程式庫停止等待初始發布要求完成的時間。
- 重試延遲:要求逾時後,用戶端程式庫等待重試要求所花費的時間。
- 總逾時:用戶端程式庫停止重試發布要求後經過的時間。
如要重試發布要求,初始要求逾時時間必須短於總逾時時間。舉例來說,如果您使用指數輪詢,用戶端程式庫會計算要求逾時和重試延遲時間,如下所示:
- 每次發布要求後,要求逾時時間會增加要求逾時時間乘數,直到達到要求逾時時間上限為止。
- 每次重試後,重試延遲時間會增加重試延遲時間乘數,直到達到重試延遲時間上限為止。
重試訊息要求
在發布過程中,您可能會遇到暫時性或永久性發布失敗。如果是暫時性錯誤,通常不需要採取任何特殊行動,因為 Pub/Sub 會自動重試傳送訊息。
如果發布作業成功,但發布端用戶端未及時收到發布回應,也可能發生錯誤。在這種情況下,系統也會重試發布作業。因此,您可能會收到兩封內容相同的郵件,但郵件 ID 不同。
如果發生持續性錯誤,請考慮在發布程序以外實作適當的動作,以免 Pub/Sub 負載過重。
系統會在發布失敗時自動重試,但若發生錯誤,則無法保證會重試。以下程式碼範例會示範如何使用自訂重試設定來建立發布者 (請注意,並非所有用戶端程式庫都支援自訂重試設定;詳情請參閱所選語言的 API 參考說明文件):
C++
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default a publisher will retry for 60 seconds, with an initial backoff
// of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
// 30% after each attempt. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::RetryPolicyOption>(
pubsub::LimitedTimeRetryPolicy(
/*maximum_duration=*/std::chrono::minutes(10))
.clone())
.set<pubsub::BackoffPolicyOption>(
pubsub::ExponentialBackoffPolicy(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone())));
std::vector<future<bool>> done;
for (char const* data : {"1", "2", "3", "go!"}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([](future<StatusOr<std::string>> f) {
return f.get().ok();
}));
}
publisher.Flush();
int count = 0;
for (auto& f : done) {
if (f.get()) ++count;
}
std::cout << count << " messages sent successfully\n";
}
C#
在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件。
using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;
public class PublishMessageWithRetrySettingsAsyncSample
{
public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Retry settings control how the publisher handles retry-able failures
var maxAttempts = 3;
var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
var backoffMultiplier = 1.3; // default: 1.0
var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds
var publisher = await new PublisherClientBuilder
{
TopicName = topicName,
ApiSettings = new PublisherServiceApiSettings
{
PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
maxAttempts: maxAttempts,
initialBackoff: initialBackoff,
maxBackoff: maxBackoff,
backoffMultiplier: backoffMultiplier,
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
.WithTimeout(totalTimeout)
}
}.BuildAsync();
string message = await publisher.PublishAsync(messageText);
Console.WriteLine($"Published message {message}");
}
}
Go
以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例。
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件。
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub/v2"
vkit "cloud.google.com/go/pubsub/v2/apiv1"
gax "github.com/googleapis/gax-go/v2"
"google.golang.org/grpc/codes"
)
func publishWithRetrySettings(w io.Writer, projectID, topicID, msg string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx := context.Background()
config := &pubsub.ClientConfig{
TopicAdminCallOptions: &vkit.TopicAdminCallOptions{
Publish: []gax.CallOption{
gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{
codes.Aborted,
codes.Canceled,
codes.Internal,
codes.ResourceExhausted,
codes.Unknown,
codes.Unavailable,
codes.DeadlineExceeded,
}, gax.Backoff{
Initial: 250 * time.Millisecond, // default 100 milliseconds
Max: 60 * time.Second, // default 60 seconds
Multiplier: 1.45, // default 1.3
})
}),
},
},
}
client, err := pubsub.NewClientWithConfig(ctx, projectID, config)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %w", err)
}
defer client.Close()
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
// Reuse this publisher for all publish calls to send messages in batches.
publisher := client.Publisher(topicID)
result := publisher.Publish(ctx, &pubsub.Message{
Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %w", err)
}
fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
return nil
}
Java
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件。
import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithRetrySettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithRetrySettingsExample(projectId, topicId);
}
public static void publishWithRetrySettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Retry settings control how the publisher handles retry-able failures
Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
double rpcTimeoutMultiplier = 1.0; // default: 1.0
Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(initialRetryDelay)
.setRetryDelayMultiplier(retryDelayMultiplier)
.setMaxRetryDelay(maxRetryDelay)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
.setMaxRpcTimeout(maxRpcTimeout)
.setTotalTimeout(totalTimeout)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();
String message = "first message";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a message with retry settings: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {PubSub} = require('@google-cloud/pubsub');
async function publishWithRetrySettings(topicNameOrId, data) {
const pubsubClient = new PubSub();
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
//
// Reference this document to see the current defaults for publishing:
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
//
// Please note that _all_ items must be included when passing these settings to topic().
// Otherwise, unpredictable (incorrect) defaults may be assumed.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 4,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 60000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 60000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
// Cache topic objects (publishers) and reuse them.
const topic = pubsubClient.topic(topicNameOrId, {
gaxOpts: {
retry: retrySettings,
},
});
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
}
Node.js
在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import {PubSub} from '@google-cloud/pubsub';
async function publishWithRetrySettings(topicNameOrId: string, data: string) {
const pubsubClient = new PubSub();
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
//
// Reference this document to see the current defaults for publishing:
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
//
// Please note that _all_ items must be included when passing these settings to topic().
// Otherwise, unpredictable (incorrect) defaults may be assumed.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 4,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 60000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 60000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
// Cache topic objects (publishers) and reuse them.
const topic = pubsubClient.topic(topicNameOrId, {
gaxOpts: {
retry: retrySettings,
},
});
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
}
Python
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件。
from google import api_core
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
print(future.result())
print(f"Published messages with retry settings to {topic_path}.")
透過排序鍵重試要求
假設您有一個發布商用戶端,您使用 Pub/Sub 用戶端程式庫,為同一個排序鍵 A 發布訊息 1、2 和 3。現在,假設發布商用戶端在 RPC 期限到期前,未收到訊息 1 的已發布回應。訊息 1 必須重新發布。如果假設訊息 2 是在訊息 1 順利完成後才發布,訂閱端用戶端收到的訊息順序就會變成 1、1、2 和 3。每則發布的訊息都有專屬的訊息 ID。從訂閱端用戶端的角度來看,系統發布了四則訊息,前兩則訊息的內容相同。
如果批次設定複雜,透過排序鍵重試發布要求也會變得複雜。用戶端程式庫會將訊息批次處理,以提高發布效率。繼續使用先前的範例,並假設訊息 1 和 2 會一起批次處理。系統會將這個批次做為單一要求傳送至伺服器。如果伺服器未及時傳回回應,發布商用戶端會重試這批兩則訊息。因此,訂閱者用戶端可能會收到訊息 1、2、1、2 和 3。如果您使用 Pub/Sub 用戶端程式庫依序發布訊息,且發布作業失敗,服務會針對相同排序鍵的所有其餘訊息,一併發布失敗。發布者用戶端接著可以決定要執行下列任一操作:
依序重新發布所有失敗的訊息
依序重新發布部分失敗的訊息
發布一組新訊息
如果發生無法重試的錯誤,用戶端程式庫不會發布訊息,且會停止發布含有相同排序鍵的其他訊息。舉例來說,如果發布者將訊息傳送至不存在的主題,就會發生無法重試的錯誤。如要繼續發布含有相同排序鍵的訊息,請呼叫方法來繼續發布,然後再次開始發布。
以下範例說明如何使用相同的排序鍵繼續發布訊息。
C++
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto& datum : data) {
auto const& da = datum; // workaround MSVC lambda capture confusion
auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
auto const msg = da.ordering_key + "#" + da.data;
auto id = f.get();
if (!id) {
std::cout << "An error has occurred publishing " << msg << "\n";
publisher.ResumePublish(da.ordering_key);
return;
}
std::cout << "Message " << msg << " published as id=" << *id << "\n";
};
done.push_back(
publisher
.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build())
.then(handler));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C# 設定操作說明進行操作。詳情請參閱 Pub/Sub C# API 參考說明文件。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class ResumePublishSample
{
public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
var customSettings = new PublisherClient.Settings
{
EnableMessageOrdering = true
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = keysAndMessages.Select(async keyAndMessage =>
{
try
{
string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
publisher.ResumePublish(keyAndMessage.Item1);
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例。
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub/v2"
"google.golang.org/api/option"
)
func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering key are in the same region
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
client, err := pubsub.NewClient(ctx, projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
if err != nil {
fmt.Fprintf(w, "pubsub.NewClient: %v", err)
return
}
defer client.Close()
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
// Reuse this publisher for all publish calls to send messages in batches.
publisher := client.Publisher(topicID)
publisher.EnableMessageOrdering = true
key := "some-ordering-key"
result := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("some-message"),
OrderingKey: key,
})
_, err = result.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n", err)
// Resume publish on an ordering key that has had unrecoverable errors.
// After such an error publishes with this ordering key will fail
// until this method is called.
publisher.ResumePublish(key)
}
fmt.Fprint(w, "Published a message with ordering key successfully\n")
}
Java
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ResumePublishWithOrderingKeys {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
resumePublishWithOrderingKeysExample(projectId, topicId);
}
public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set message ordering to true.
Publisher publisher =
Publisher.newBuilder(topicName)
.setEnableMessageOrdering(true)
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.build();
try {
Map<String, String> messages = new LinkedHashMap<String, String>();
messages.put("message1", "key1");
messages.put("message2", "key2");
messages.put("message3", "key1");
messages.put("message4", "key2");
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
// (Beta) Must call resumePublish to reset key and continue publishing with order.
publisher.resumePublish(pubsubMessage.getOrderingKey());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Node.js
在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function resumePublish(topicNameOrId, data, orderingKey) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions = {
messageOrdering: true,
};
// Cache topic objects (publishers) and reuse them.
//
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
// key are in the same region. For list of locational endpoints for Pub/Sub, see:
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const publisher = pubSubClient.topic(topicNameOrId, publishOptions);
// Publishes the message
try {
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
const messageId = await publisher.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
} catch (e) {
console.log(`Could not publish: ${e}`);
publisher.resumePublishing(orderingKey);
return null;
}
}
Python
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件。
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")
Ruby
以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例。
在試用這個範例之前,請先按照快速入門:使用用戶端程式庫的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件。
# topic_id = "your-topic-id"
pubsub = Google::Cloud::PubSub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
publisher = pubsub.publisher topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
publisher.enable_message_ordering!
10.times do |i|
publisher.publish_async "This is message ##{i}.",
ordering_key: "ordering-key" do |result|
if result.succeeded?
puts "Message ##{i} successfully published."
else
puts "Message ##{i} failed to publish"
# Allow publishing to continue on "ordering-key" after processing the
# failure.
publisher.resume_publish "ordering-key"
end
end
end
# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop!
後續步驟
如要瞭解如何設定進階發布選項,請參閱下列文章: