Kontrol konkurensi

Dokumen ini memberikan informasi tentang penggunaan kontrol konkurensi dengan pesan yang dipublikasikan ke topik.

Kontrol konkurensi membantu Anda mengganti jumlah default thread latar belakang (I/O) yang digunakan oleh library klien untuk memublikasikan pesan. Hal ini memungkinkan klien penayang mengirim pesan secara paralel.

Kontrol konkurensi adalah fitur yang tersedia di library klien Pub/Sub tingkat tinggi. Anda juga dapat menerapkan kontrol konkurensi Anda sendiri saat menggunakan library tingkat rendah.

Dukungan untuk kontrol konkurensi bergantung pada bahasa pemrograman library klien. Untuk implementasi bahasa yang mendukung thread paralel, seperti C++, Go, dan Java, library klien membuat pilihan default untuk jumlah thread.

Halaman ini menjelaskan konsep kontrol konkurensi dan cara menyiapkan fitur untuk klien penayang Anda. Untuk mengonfigurasi klien pelanggan Anda untuk kontrol konkurensi, lihat Memproses lebih banyak pesan dengan kontrol konkurensi.

Sebelum memulai

Sebelum mengonfigurasi alur kerja publikasi, pastikan Anda telah menyelesaikan tugas berikut:

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan guna memublikasikan pesan ke topik, minta administrator untuk memberi Anda peran IAM Pub/Sub Publisher (roles/pubsub.publisher) pada topik Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Anda memerlukan izin tambahan untuk membuat atau memperbarui topik dan langganan.

Konfigurasi kontrol konkurensi

Nilai default untuk variabel kontrol konkurensi dan nama variabel mungkin berbeda di seluruh library klien. Misalnya, di library klien Java, metode untuk mengonfigurasi kontrol konkurensi adalah setExecutorProvider() dan setChannelProvider(). Untuk mengetahui informasi selengkapnya, lihat dokumen referensi API.

  • setExecutorProvider() memungkinkan Anda menyesuaikan penyedia eksekutor yang digunakan untuk memproses respons publikasi. Misalnya, Anda dapat mengubah penyedia eksekutor menjadi penyedia yang menampilkan satu eksekutor bersama dengan jumlah thread terbatas di beberapa klien penayang. Konfigurasi ini membantu membatasi jumlah thread yang dibuat.

  • setChannelProvider() memungkinkan Anda menyesuaikan penyedia saluran yang digunakan untuk membuka koneksi ke Pub/Sub. Biasanya, Anda tidak mengonfigurasi nilai ini kecuali jika ingin menggunakan saluran yang sama di beberapa klien penayang. Menggunakan kembali saluran di terlalu banyak klien dapat menyebabkan error GOAWAY atau ENHANCE_YOUR_CALM. Jika Anda melihat error ini di log aplikasi atau Log Cloud, buat lebih banyak saluran.

Contoh kode untuk kontrol konkurensi

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Contoh berikut menggunakan versi utama library klien Go Pub/Sub (v2). Jika Anda masih menggunakan library v1, lihat panduan migrasi ke v2. Untuk melihat daftar contoh kode v1, lihat contoh kode yang tidak digunakan lagi.

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.NumGoroutines = 1

	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Contoh berikut menggunakan library klien Ruby Pub/Sub v3. Jika Anda masih menggunakan library v2, lihat panduan migrasi ke v3. Untuk melihat daftar contoh kode Ruby v2, lihat contoh kode yang tidak digunakan lagi.

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Ruby Pub/Sub.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}

publisher.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

Langkah berikutnya