並行控制

本文說明如何搭配發布至主題的訊息使用並行控制。

並行控制可協助您覆寫用戶端程式庫用來發布訊息的預設背景 (I/O) 執行緒數量。這可讓發布端用戶端平行傳送訊息。

並行控制是 Pub/Sub 高階用戶端程式庫的可用功能。使用低階程式庫時,您也可以實作自己的並行控制。

並行控制支援取決於用戶端程式庫的程式設計語言。對於支援平行執行緒的語言實作 (例如 C++、Go 和 Java),用戶端程式庫會預設選擇執行緒數量。

本頁說明並行控制的概念,以及如何為發布商用戶端設定這項功能。如要設定訂閱端用戶端進行並行控制,請參閱「透過並行控制處理更多訊息」。

事前準備

設定發布工作流程前,請務必完成下列工作:

必要的角色

如要取得將訊息發布至主題所需的權限,請要求管理員為您授予主題的 Pub/Sub 發布者 (roles/pubsub.publisher) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

您需要其他權限,才能建立或更新主題和訂閱項目。

並行控制設定

並行控制變數的預設值和變數名稱可能因用戶端程式庫而異。舉例來說,在 Java 用戶端程式庫中,設定並行控制的方法為 setExecutorProvider()setChannelProvider()。詳情請參閱 API 參考說明文件

  • setExecutorProvider() 可讓你自訂用於處理發布回應的執行器供應商。舉例來說,您可以將執行器供應器變更為傳回單一共用執行器的供應器,並在多個發布者用戶端中,限制執行緒數量。這項設定有助於限制建立的執行緒數量。

  • setChannelProvider() 可讓您自訂用於開啟 Pub/Sub 連線的管道供應商。除非您想在多個發布商用戶端中使用相同管道,否則通常不需要設定這個值。在過多用戶端重複使用管道,可能會導致 GOAWAYENHANCE_YOUR_CALM 錯誤。如果在應用程式記錄或 Cloud Logs 中看到這些錯誤,請建立更多管道。

並行控制的程式碼範例

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.NumGoroutines = 1

	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

以下範例使用 Ruby Pub/Sub 用戶端程式庫 v3。如果您仍在使用第 2 版程式庫,請參閱 第 3 版遷移指南。如要查看 Ruby 第 2 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫的操作說明設定 Ruby 環境。詳情請參閱 Pub/Sub Ruby API 參考說明文件

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}

publisher.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

後續步驟