Gleichzeitigkeitserkennung

Dieses Dokument enthält Informationen zur Verwendung der Parallelitätssteuerung mit Nachrichten, die in einem Thema veröffentlicht werden.

Mit der Parallelitätssteuerung können Sie die Standardanzahl der Hintergrund-Threads (E/A) überschreiben, die von der Clientbibliothek zum Veröffentlichen von Nachrichten verwendet werden. So können Publisher-Clients Nachrichten parallel senden.

Die Funktion zur Steuerung der Parallelität ist in der Pub/Sub-Clientbibliothek auf hoher Ebene verfügbar. Sie können auch Ihre eigene Parallelitätssteuerung implementieren, wenn Sie eine Low-Level-Bibliothek verwenden.

Die Unterstützung für die Steuerung der Gleichzeitigkeit hängt von der Programmiersprache der Clientbibliothek ab. Bei Sprachimplementierungen, die parallele Threads wie C++, Go und Java unterstützen, legen die Clientbibliotheken die Anzahl der Threads jeweils anhand ihrer Standardeinstellung fest.

Auf dieser Seite wird das Konzept der Parallelitätssteuerung erläutert und beschrieben, wie Sie die Funktion für Ihre Publisher-Clients einrichten. Informationen zum Konfigurieren Ihrer Abonnentenclients für die Gleichzeitigkeitssteuerung finden Sie unter Mehr Nachrichten mit Gleichzeitigkeitssteuerung verarbeiten.

Hinweise

Bevor Sie den Veröffentlichungs-Workflow konfigurieren, müssen Sie die folgenden Aufgaben ausführen:

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub Publisher (roles/pubsub.publisher) für das Thema zuzuweisen, damit Sie die Berechtigungen erhalten, die Sie zum Veröffentlichen von Nachrichten in einem Thema benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Sie benötigen zusätzliche Berechtigungen, um Themen und Abos zu erstellen oder zu aktualisieren.

Konfigurationen für die Gleichzeitigkeitserkennung

Die Standardwerte für die Variablen zur Steuerung der Parallelität und die Namen der Variablen können sich je nach Clientbibliothek unterscheiden. In der Java-Clientbibliothek sind die Methoden zum Konfigurieren der Parallelitätssteuerung beispielsweise setExecutorProvider() und setChannelProvider(). Weitere Informationen finden Sie in der API-Referenzdokumentation.

  • Mit setExecutorProvider() können Sie den Executor-Anbieter anpassen, der für die Verarbeitung von Veröffentlichungsantworten verwendet wird. Sie können beispielsweise den Executor-Anbieter in einen Anbieter ändern, der einen einzelnen, gemeinsam genutzten Executor mit einer begrenzten Anzahl von Threads für mehrere Publisher-Clients zurückgibt. Diese Konfiguration trägt dazu bei, die Anzahl der erstellten Threads zu begrenzen.

  • Mit setChannelProvider() können Sie den Channel-Anbieter anpassen, der zum Öffnen von Verbindungen zu Pub/Sub verwendet wird. Normalerweise konfigurieren Sie diesen Wert nicht, es sei denn, Sie möchten denselben Channel für mehrere Publisher-Clients verwenden. Wenn ein Channel für zu viele Clients wiederverwendet wird, kann dies zu GOAWAY- oder ENHANCE_YOUR_CALM-Fehlern führen. Wenn diese Fehler in den Logs Ihrer Anwendung oder in Cloud Logs angezeigt werden, erstellen Sie weitere Channels.

Codebeispiele für die Parallelitätssteuerung

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Im folgenden Beispiel wird die Hauptversion der Go Pub/Sub-Clientbibliothek (v2) verwendet. Wenn Sie noch die v1-Bibliothek verwenden, finden Sie hier den Migrationsleitfaden für v2. Eine Liste der Codebeispiele für Version 1 finden Sie unter Eingestellte Codebeispiele.

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.NumGoroutines = 1

	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Im folgenden Beispiel wird die Ruby-Pub/Sub-Clientbibliothek v3 verwendet. Wenn Sie noch die v2-Bibliothek verwenden, finden Sie hier die Migrationsanleitung für v3. Eine Liste der Ruby v2-Codebeispiele finden Sie unter Eingestellte Codebeispiele.

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Ruby im Schnellstart: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}

publisher.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

Nächste Schritte