Gleichzeitigkeitserkennung

In diesem Dokument erfahren Sie, wie Sie die Nebenläufigkeitserkennung für Nachrichten verwenden, die in einem Thema veröffentlicht werden.

Mit der Nebenläufigkeitserkennung können Sie die Standardanzahl der Hintergrund-Threads (E/A) überschreiben, die von der Clientbibliothek zum Veröffentlichen von Nachrichten verwendet werden. So können die Publisher-Clients Nachrichten parallel senden.

Die Nebenläufigkeitserkennung ist eine verfügbare Funktion in der Pub/Sub Clientbibliothek auf hoher Ebene. Sie können auch Ihre eigene Nebenläufigkeitserkennung implementieren, wenn Sie eine Bibliothek auf niedriger Ebene verwenden.

Die Unterstützung für die Nebenläufigkeitserkennung hängt von der Programmiersprache der Clientbibliothek ab. Bei Sprachimplementierungen, die parallele Threads wie C++, Go und Java unterstützen, legen die Clientbibliotheken die Anzahl der Threads jeweils anhand ihrer Standardeinstellung fest.

Auf dieser Seite wird das Konzept der Nebenläufigkeitserkennung erläutert und wie Sie die Funktion für Ihre Publisher-Clients einrichten. Informationen zum Konfigurieren Ihrer Abonnenten-Clients für die Nebenläufigkeitserkennung finden Sie unter Mehr Nachrichten mit Nebenläufigkeitserkennung verarbeiten.

Hinweis

Bevor Sie den Veröffentlichungs-Workflow konfigurieren, müssen Sie die folgenden Aufgaben ausgeführt haben:

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen für Ihr Thema die IAM-Rolle Pub/Sub-Publisher (roles/pubsub.publisher) zuzuweisen, damit Sie die Berechtigungen erhalten, die Sie zum Veröffentlichen von Nachrichten in einem Thema benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Sie benötigen zusätzliche Berechtigungen, um Themen und Abos zu erstellen oder zu aktualisieren.

Konfigurationen für die Nebenläufigkeitserkennung

Die Standardwerte für die Variablen der Nebenläufigkeitserkennung und die Namen der Variablen können sich zwischen den Clientbibliotheken unterscheiden. In der Java-Clientbibliothek sind die Methoden zum Konfigurieren der Nebenläufigkeitserkennung beispielsweise setExecutorProvider() und setChannelProvider(). Weitere Informationen finden Sie unter der API-Referenzdokumentation.

  • Mit setExecutorProvider() können Sie den Executor-Provider anpassen, der zum Verarbeiten von Veröffentlichungsantworten verwendet wird. Sie können den Executor-Bereitsteller beispielsweise in einen ändern, der einen einzelnen, freigegebenen Executor mit einer begrenzten Anzahl von Threads für mehrere Publisher-Clients zurückgibt. Diese Konfiguration trägt dazu bei, die Anzahl der erstellten Threads zu begrenzen.

  • Mit setChannelProvider() können Sie den Channel-Provider anpassen, der zum Öffnen von Verbindungen zu Pub/Sub verwendet wird. Normalerweise konfigurieren Sie diesen Wert nur, wenn Sie denselben Channel für mehrere Publisher-Clients verwenden möchten. Wenn Sie einen Channel für zu viele Clients wiederverwenden, kann dies zu GOAWAY- oder ENHANCE_YOUR_CALM-Fehlern führen. Wenn diese Fehler in den Logs Ihrer Anwendung oder in Cloud Loggingangezeigt werden, erstellen Sie weitere Channels.

Codebeispiele für die Nebenläufigkeitserkennung

C++

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Im folgenden Beispiel wird die Hauptversion der Go Pub/Sub-Clientbibliothek (Version 2) verwendet. Wenn Sie noch die Version 1-Bibliothek verwenden, finden Sie weitere Informationen im Migrationsleitfaden zu Version 2. Eine Liste der Codebeispiele für Version 1 finden Sie unter Veraltete Codebeispiele.

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.NumGoroutines = 1

	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Im folgenden Beispiel wird die Ruby Pub/Sub-Clientbibliothek Version 3 verwendet. Wenn Sie noch die Version 2-Bibliothek verwenden, finden Sie weitere Informationen im Migrationsleitfaden zu Version 3. Eine Liste der Codebeispiele für Ruby Version 2 finden Sie unter Veraltete Codebeispiele.

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Ruby in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Ruby API.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}

publisher.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

Nächste Schritte