Controle de simultaneidade

Este documento fornece informações sobre como usar o controle de simultaneidade com mensagens publicadas em um tópico.

O controle de simultaneidade ajuda a substituir o número padrão de linhas de execução em segundo plano (E/S) usadas pela biblioteca de cliente para publicar mensagens. Isso permite que os clientes editores enviem mensagens em paralelo.

O controle de simultaneidade é um recurso disponível na biblioteca de cliente de alto nível do Pub/Sub. Você também pode implementar seu próprio controle de simultaneidade ao usar uma biblioteca de baixo nível.

O suporte ao controle de simultaneidade depende da linguagem de programação da biblioteca de cliente. Para implementações de linguagem que aceitam linhas de execução paralelas, como C++, Go e Java, as bibliotecas de cliente fazem uma escolha padrão do número de linhas de execução.

Nesta página, explicamos o conceito de controle de simultaneidade e como configurar o recurso para os clientes editores. Para configurar seus clientes assinantes para controle de simultaneidade, consulte Processar mais mensagens com controle de simultaneidade.

Antes de começar

Antes de configurar o fluxo de trabalho de publicação, verifique se você concluiu as seguintes tarefas:

Funções exigidas

Para receber as permissões necessárias para publicar mensagens em um tópico, peça ao administrador para conceder a você o papel do IAM Editor do Pub/Sub (roles/pubsub.publisher) no tópico. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Você precisa de permissões adicionais para criar ou atualizar tópicos e assinaturas.

Configurações de controle de simultaneidade

Os valores padrão das variáveis de controle de simultaneidade e os nomes das variáveis podem variar entre as bibliotecas de cliente. Por exemplo, na biblioteca de cliente Java, os métodos para configurar o controle de simultaneidade são setExecutorProvider() e setChannelProvider(). Para mais informações, consulte a documentação de referência da API.

  • Com setExecutorProvider(), é possível personalizar o provedor de executor usado para processar respostas de publicação. Por exemplo, você pode mudar o provedor de executor para um que retorne um único executor compartilhado com um número limitado de linhas de execução em vários clientes editores. Essa configuração ajuda a limitar o número de linhas de execução criadas.

  • setChannelProvider() permite personalizar o provedor de canais usado para abrir conexões com o Pub/Sub. Normalmente, você não configura esse valor, a menos que queira usar o mesmo canal em vários clientes editores. Reutilizar um canal em muitos clientes pode resultar em erros GOAWAY ou ENHANCE_YOUR_CALM. Se você encontrar esses erros nos registros do aplicativo ou no Cloud Logging, crie mais canais.

Exemplos de código para controle de simultaneidade

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

O exemplo a seguir usa a versão principal da biblioteca de cliente do Go Pub/Sub (v2). Se você ainda estiver usando a biblioteca v1, consulte o guia de migração para a v2. Para conferir uma lista de exemplos de código da v1, consulte os exemplos de código descontinuados.

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.NumGoroutines = 1

	result := publisher.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

O exemplo a seguir usa a biblioteca de cliente do Ruby Pub/Sub v3. Se você ainda estiver usando a biblioteca v2, consulte o guia de migração para a v3. Para conferir uma lista de exemplos de código do Ruby v2, consulte os exemplos de código descontinuados.

Antes de tentar esse exemplo, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}

publisher.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!

A seguir