Processar mais mensagens com controle de simultaneidade

O controle de simultaneidade é um recurso disponível na biblioteca de cliente de alto nível do Pub/Sub. Também é possível implementar seu próprio controle de simultaneidade ao usar uma biblioteca de baixo nível.

O suporte ao controle de simultaneidade depende da linguagem de programação da biblioteca de cliente. Para implementações de linguagem que aceitam linhas de execução paralelas, como C++, Go e Java, as bibliotecas de cliente fazem uma escolha padrão do número de linhas de execução.

Essa escolha pode não ser ideal para o aplicativo. Por exemplo, se o aplicativo assinante não estiver acompanhando o volume de mensagens recebidas e não estiver ligado à CPU, aumente a contagem de linhas de execução. Para operações de processamento de mensagens intensivas da CPU, diminua o número de linhas de execução.

Nesta página, explicamos o conceito de controle de simultaneidade e como configurar o recurso para os clientes assinantes. Para configurar seus clientes editores para controle de simultaneidade, consulte Controle de simultaneidade.

Configurações de controle de simultaneidade

Os valores padrão das variáveis de controle de simultaneidade e os nomes das variáveis podem variar entre as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API. Por exemplo, na biblioteca de cliente Java, os métodos para configurar o controle de simultaneidade são setParallelPullCount(), setExecutorProvider(), setSystemExecutorProvider() e setChannelProvider().

  • setParallelPullCount() permite decidir quantos streams abrir. É possível abrir mais streams se o cliente assinante puder processar mais dados do que são enviados em um único stream, que é de 10 MBps.

  • Com setExecutorProvider(), é possível personalizar o provedor de executor usado para processar mensagens. Por exemplo, você pode mudar o provedor de executor para um que retorne um único executor compartilhado com um número limitado de linhas de execução em vários clientes assinantes. Essa configuração ajuda a limitar o número de threads criadas. O número total de linhas de execução usadas para controle de simultaneidade depende do provedor de executor transmitido na biblioteca de cliente e da contagem de extração paralela.

  • setSystemExecutorProvider() permite personalizar o provedor de executor usado para gerenciamento de concessão. Normalmente, você não configura esse valor, a menos que queira usar o mesmo provedor de executor em setExecutorProvider e setSystemExecutorProvider. Por exemplo, você pode usar o mesmo provedor de executor se tiver várias assinaturas de baixa taxa de transferência. Usar o mesmo valor limita o número de linhas de execução no cliente.

  • setChannelProvider() permite personalizar o provedor de canais usado para abrir conexões com o Pub/Sub. Normalmente, você não configura esse valor, a menos que queira usar o mesmo canal em vários clientes assinantes. Reutilizar um canal em muitos clientes pode resultar em erros GOAWAY ou ENHANCE_YOUR_CALM. Se você encontrar esses erros nos registros do aplicativo ou no Cloud Logging, crie mais canais.

Exemplos de código para controle de simultaneidade

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxConcurrencyOption>(8)
          .set<GrpcBackgroundThreadPoolSizeOption>(16)));

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `std::thread::hardware_concurrency()` as the
  // maximum number of concurrent callbacks.
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        // This handler executes in the I/O threads, applications could use,
        // std::async(), a thread-pool, or any other mechanism to transfer the
        // execution to other threads.
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

Go

O exemplo a seguir usa a versão principal da biblioteca de cliente do Go Pub/Sub (v2). Se você ainda estiver usando a biblioteca v1, consulte o guia de migração para a v2. Para conferir uma lista de exemplos de código da v1, consulte os exemplos de código descontinuados.

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsConcurrencyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// NumGoroutines determines the number of streams sub.Receive will spawn to pull
	// messages. It is recommended to set this to 1, unless your throughput
	// is greater than 10 MB/s, as even having 1 stream can still result in
	// messages being handled asynchronously.
	sub.ReceiveSettings.NumGoroutines = 1
	// MaxOutstandingMessages limits the number of concurrent handlers of messages.
	// In this case, up to 8 unacked messages can be handled concurrently.
	sub.ReceiveSettings.MaxOutstandingMessages = 8

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive returned error: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

O exemplo a seguir usa a biblioteca de cliente do Ruby Pub/Sub v3. Se você ainda estiver usando a biblioteca v2, consulte o guia de migração para a v3. Para conferir uma lista de exemplos de código do Ruby v2, consulte os exemplos de código descontinuados.

Antes de tentar esse exemplo, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
listener = subscriber.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

A seguir

Leia sobre as outras opções de entrega que você pode configurar para uma assinatura: