Como usar o Pub/Sub em aplicativos do Spring

Esta página descreve como usar o Pub/Sub em aplicativos Java criados com o Spring Framework.

O Spring Cloud GCP tem vários módulos para enviar mensagens para tópicos do Pub/Sub e receber mensagens de assinaturas do Pub/Sub usando o Spring Framework. Use esses módulos de maneira independente ou combine-os em diferentes casos de uso:

OBSERVAÇÃO: a biblioteca Spring Cloud GCP não oferece acesso a AckReplyConsumerWithResponse, que é um módulo necessário para implementar o recurso de execução única exata usando a biblioteca de cliente Java.

Antes de começar

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. Defina a variável de ambiente GOOGLE_CLOUD_PROJECT como seu ID do projeto Trusted Cloud .
  4. Como usar o Spring Cloud GCP Pub/Sub Starter

    O módulo Spring Cloud GCP Pub/Sub Starter instala a biblioteca de cliente Java do Pub/Sub usando o módulo Spring Cloud GCP Pub/Sub. É possível chamar a API Pub/Sub no aplicativo Spring usando as classes que o Spring Cloud GCP Pub/Sub Starter fornece ou a biblioteca de cliente Java do Pub/Sub. Se estiver usando as classes que o Spring Cloud GCP Spring Starter fornece, você poderá substituir as configurações padrão do Pub/Sub.

    Como instalar o módulo

    Para instalar o módulo Spring Cloud GCP Pub/Sub Starter, adicione estas dependências ao seu arquivo pom.xml:

    1. A lista de materiais (BOM, na sigla em inglês) do Spring Cloud:

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. O artefato Spring Cloud GCP Pub/Sub Starter:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    Operações suportadas

    O módulo Spring Cloud GCP Pub/Sub Starter inclui as seguintes classes:

    • PubSubAdmin para operações administrativas:
      • Criar tópicos e assinaturas
      • Receber tópicos e assinaturas.
      • Listar tópicos e assinaturas.
      • Excluir tópicos e assinaturas.
      • Receber e definir prazos de confirmação em uma assinatura.
    • PubSubTemplate para enviar e receber mensagens:
      • Publicar mensagens em tópicos.
      • Receber mensagens de assinaturas de maneira síncrona.
      • Receber mensagens de assinaturas de maneira assíncrona.
      • Confirmar mensagens.
      • Modificar prazos de confirmação.
      • Converter mensagens do Pub/Sub em objetos Java Plain Old (POJOs).

    Usar adaptadores de canal da Spring Integration

    Se seu aplicativo Spring usa canais de mensagens da Spring Integration, é possível rotear mensagens entre seus canais de mensagem e o Pub/Sub usando adaptadores de canal.

    Como instalar os módulos

    Para instalar módulos para adaptadores do canal Spring Integration, adicione o seguinte ao seu arquivo pom.xml:

    1. O BOM do Spring Cloud GCP (em inglês).

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Os artefatos do Spring Cloud GCP Pub/Sub Starter e da Spring Integration Core:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    Como receber mensagens do Pub/Sub

    Para receber mensagens de uma assinatura do Pub/Sub no aplicativo Spring, use um adaptador de canal de entrada. O adaptador de canal de entrada converte as mensagens do Pub/Sub recebidas em POJOs e, em seguida, encaminha os POJOs para um canal de mensagens.

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    O exemplo acima usa os seguintes beans do Spring e o recurso do Pub/Sub:

    • Um bean do canal de mensagens chamado inputMessageChannel.
    • Um adaptador de canal de entrada é chamado de inboundChannelAdapter do tipo PubSubInboundChannelAdapter.
    • Um código de assinatura do Pub/Sub chamado sub-one.

    O inboundChannelAdapter extrai mensagens de sub-one de forma assíncrona usando um PubSubTemplate e envia as mensagens para inputMessageChannel.

    O inboundChannelAdapter define o modo de confirmação como MANUAL para que o aplicativo reconheça as mensagens depois de processá-las. O modo de confirmação padrão dos tipos PubSubInboundChannelAdapter é AUTO.

    O bean ServiceActivator messageReceiver registra cada mensagem que chega em inputMessageChannel à saída padrão e, em seguida, confirma a mensagem.

    Como publicar mensagens no Pub/Sub

    Para publicar mensagens de um canal de mensagens em um tópico do Pub/Sub, use um adaptador de canal de saída. O adaptador de canal de saída converte os POJOs para mensagens do Pub/Sub e, em seguida, envia as mensagens para um tópico do Pub/Sub.

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    O exemplo acima usa os seguintes beans e o recurso Pub/Sub:

    • Um bean do canal de mensagens chamado inputMessageChannel.
    • Um bean do adaptador de canal de saída chamado messageSender do tipo PubSubMessageHandler.
    • Um ID de tópico do Pub/Sub chamado topic-two.

    O bean ServiceActivator aplica a lógica em messageSender a cada mensagem em inputMessageChannel.

    O PubSubMessageHandler em messageSender publica mensagens na inputMessageChannel usando um PubSubTemplate. O PubSubMessageHandler publica mensagens no tópico topic-two do Pub/Sub.

    Como usar o Spring Cloud Stream Binder

    Para chamar a API Pub/Sub em um aplicativo Spring Cloud Stream, use o módulo Spring Cloud GCP Pub/Sub Stream Binder.

    Como instalar o módulo

    Para instalar o módulo Springer do Spring Cloud Stream, adicione o seguinte ao arquivo pom.xml:

    1. O BOM do Spring Cloud GCP (em inglês).

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. O artefato Spring Cloud Stream Binder:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    Como receber mensagens do Pub/Sub

    Para usar seu aplicativo como um coletor de eventos, configure o vinculador de entrada especificando o seguinte:

    • Um bean Consumer que define a lógica de gerenciamento de mensagens. Por exemplo, o bean Consumer a seguir é chamado de receiveMessageFromTopicTwo:

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • Um ID do tópico Pub/Sub no arquivo de configuração application.properties. Por exemplo, o arquivo de configuração a seguir usa um ID de tópico Pub/Sub chamado topic-two:

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    O código de exemplo recebe mensagens do Pub/Sub. O exemplo faz o seguinte:

    1. Encontra o ID do tópico do Pub/Sub topic-two no destino de vinculação de entrada em application.properties.
    2. Cria uma assinatura do Pub/Sub para topic-two.
    3. Usa o nome de vinculação receiveMessageFromTopicTwo-in-0 para encontrar o bean Consumer chamado receiveMessageFromTopicTwo.
    4. Imprime as mensagens recebidas na saída padrão e as confirma automaticamente.

    Como publicar mensagens no Pub/Sub

    Para usar seu aplicativo como uma origem de evento, configure o vinculador de saída especificando o seguinte:

    • Um bean Supplier que define a origem das mensagens dentro do seu aplicativo. Por exemplo, o bean Supplier a seguir é chamado de sendMessageToTopicOne:

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • Um ID do tópico Pub/Sub no arquivo de configuração application.properties. Por exemplo, o arquivo de configuração a seguir usa um ID de tópico Pub/Sub chamado topic-one:

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    O código de exemplo publica mensagens no Pub/Sub. O exemplo faz o seguinte:

    1. Encontra o ID do tópico do Pub/Sub topic-one no destino de vinculação de saída em application.properties.
    2. Usa o nome de vinculação sendMessageToTopicOne-out-0 para encontrar o bean Supplier chamado sendMessageToTopicOne.
    3. Envia uma mensagem numerada para topic-one a cada 10 segundos.