Traiter plus de messages avec le contrôle de simultanéité

Le contrôle de la concurrence est une fonctionnalité disponible dans la bibliothèque cliente de haut niveau Pub/Sub. Vous pouvez également implémenter votre propre contrôle de la simultanéité lorsque vous utilisez une bibliothèque de bas niveau.

La compatibilité du contrôle de la simultanéité dépend du langage de programmation de la bibliothèque cliente. Pour les implémentations de langage prenant en charge les threads parallèles, tels que C++, Go et Java, les bibliothèques clientes font un choix par défaut pour le nombre de threads.

Ce choix peut ne pas être optimal pour votre application. Par exemple, si votre application d'abonné ne parvient pas à absorber le volume de messages entrants et n'est pas liée au processeur, vous devez augmenter le nombre de threads. Pour les opérations de traitement de messages nécessitant une utilisation intensive du processeur, il peut être approprié de réduire le nombre de threads.

Cette page explique le concept de contrôle de la simultanéité et comment configurer la fonctionnalité pour vos clients abonnés. Pour configurer vos clients éditeur pour le contrôle de la simultanéité, consultez Contrôle de la simultanéité.

Configurations de contrôle de simultanéité

Les valeurs par défaut des variables de contrôle de la simultanéité et les noms des variables peuvent varier d'une bibliothèque cliente à l'autre. Pour en savoir plus, consultez la documentation de référence sur les API. Par exemple, dans la bibliothèque cliente Java, les méthodes permettant de configurer le contrôle de la simultanéité sont setParallelPullCount(), setExecutorProvider(), setSystemExecutorProvider() et setChannelProvider().

  • setParallelPullCount() vous permet de décider du nombre de flux à ouvrir. Vous pouvez ouvrir d'autres flux si votre client abonné peut gérer plus de données que celles envoyées sur un seul flux, soit 10 Mbit/s.

  • setExecutorProvider() vous permet de personnaliser le fournisseur d'exécution utilisé pour le traitement des messages. Par exemple, vous pouvez remplacer le fournisseur d'exécuteur par un fournisseur qui renvoie un seul exécuteur partagé avec un nombre limité de threads sur plusieurs clients abonnés. Cette configuration permet de limiter le nombre de threads créés. Le nombre total de threads utilisés pour le contrôle de la simultanéité dépend du fournisseur d'exécution transmis dans la bibliothèque cliente et du nombre d'extractions parallèles.

  • setSystemExecutorProvider() vous permet de personnaliser le fournisseur d'exécution utilisé pour la gestion des baux. En règle générale, vous ne configurez pas cette valeur, sauf si vous souhaitez utiliser le même fournisseur d'exécuteur dans setExecutorProvider et setSystemExecutorProvider. Par exemple, vous pouvez utiliser le même fournisseur d'exécuteur si vous avez un certain nombre d'abonnements à faible débit. L'utilisation de la même valeur limite le nombre de threads dans le client.

  • setChannelProvider() vous permet de personnaliser le fournisseur de canal utilisé pour ouvrir des connexions à Pub/Sub. En règle générale, vous ne configurez pas cette valeur, sauf si vous souhaitez utiliser le même canal sur plusieurs clients abonnés. Si vous réutilisez un canal pour un trop grand nombre de clients, des erreurs GOAWAY ou ENHANCE_YOUR_CALM peuvent se produire. Si ces erreurs s'affichent dans les journaux de votre application ou dans Cloud Logging, créez d'autres canaux.

Exemples de code pour le contrôle de la concurrence

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

L'exemple suivant utilise la version majeure de la bibliothèque cliente Go Pub/Sub (v2). Si vous utilisez toujours la bibliothèque v1, consultez le guide de migration vers la v2. Pour consulter la liste des exemples de code de la version 1, consultez les exemples de code obsolètes.

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// NumGoroutines determines the number of streams sub.Receive will spawn to pull
	// messages. It is recommended to set this to 1, unless your throughput
	// is greater than 10 MB/s, as even having 1 stream can still result in
	// messages being handled asynchronously.
	sub.ReceiveSettings.NumGoroutines = 1
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

L'exemple suivant utilise la bibliothèque cliente Ruby Pub/Sub v3. Si vous utilisez toujours la bibliothèque v2, consultez le guide de migration vers la v3. Pour afficher la liste des exemples de code Ruby v2, consultez les exemples de code obsolètes.

Avant d'essayer cet exemple, suivez les instructions de configuration pour Ruby du guide de démarrage rapide : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
listener = subscriber.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Étapes suivantes

Découvrez les autres options de diffusion que vous pouvez configurer pour un abonnement :