Utilizzo di Pub/Sub nelle applicazioni Spring

Questa pagina descrive come utilizzare Pub/Sub nelle applicazioni Java create con Spring Framework.

Spring Cloud GCP dispone di diversi moduli per l'invio di messaggi agli argomenti Pub/Sub e la ricezione di messaggi dalle sottoscrizioni Pub/Sub utilizzando Spring Framework. Puoi utilizzare questi moduli in modo indipendente o combinarli per diversi casi d'uso:

NOTA: la libreria Spring Cloud GCP non fornisce l'accesso a AckReplyConsumerWithResponse, che è un modulo obbligatorio per implementare la funzionalità di invio esatto una sola volta utilizzando la libreria client Java.

Prima di iniziare

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. Imposta la variabile di ambiente GOOGLE_CLOUD_PROJECT sull' Trusted Cloud ID progetto.
  4. Utilizzo di Spring Cloud GCP Pub/Sub Starter

    Il modulo Spring Cloud GCP Pub/Sub Starter installa la libreria client Java Pub/Sub utilizzando il modulo Spring Cloud GCP Pub/Sub. Puoi chiamare l'API Pub/Sub dalla tua applicazione Spring utilizzando le classi fornite da Spring Cloud GCP Pub/Sub Starter o la libreria client Java Pub/Sub. Se utilizzi le classi fornite da Spring Cloud GCP Pub/Sub Starter, puoi sostituire le configurazioni Pub/Sub predefinite.

    Installazione del modulo

    Per installare il modulo Spring Cloud GCP Pub/Sub Starter, aggiungi queste dipendenze al file pom.xml:

    1. La Bill of Materials (BOM) di Spring Cloud:

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. L'artefatto Spring Cloud GCP Pub/Sub Starter:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    Operazioni supportate

    Il modulo Spring Cloud GCP Pub/Sub Starter include le seguenti classi:

    • PubSubAdmin per le operazioni amministrative:
      • Crea argomenti e sottoscrizioni.
      • Recupera argomenti e sottoscrizioni.
      • Elenca argomenti e sottoscrizioni.
      • Eliminare argomenti e sottoscrizioni.
      • Recuperare e impostare le scadenze di conferma per una sottoscrizione.
    • PubSubTemplate per l'invio e la ricezione di messaggi:
      • Pubblicare messaggi negli argomenti.
      • Esegui il pull sincrono dei messaggi dalle sottoscrizioni.
      • Esegui il pull asincrono dei messaggi dalle sottoscrizioni.
      • Confermare i messaggi.
      • Modificare le scadenze di conferma.
      • Converti i messaggi Pub/Sub in POJO (Plain Old Java Object).

    Utilizzo degli adattatori del canale Spring Integration

    Se la tua applicazione Spring utilizza i canali di messaggi Spring Integration, puoi instradare i messaggi tra i canali di messaggi e Pub/Sub utilizzando gli adattatori di canale.

    Installare i moduli

    Per installare i moduli per gli adattatori di canale Spring Integration, aggiungi quanto segue al file pom.xml:

    1. La BOM di Spring Cloud GCP.

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Gli artefatti Spring Cloud GCP Pub/Sub Starter e Spring Integration Core:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    Ricezione di messaggi da Pub/Sub

    Per ricevere messaggi da un abbonamento Pub/Sub nella tua applicazione Spring, utilizza un adattatore del canale in entrata. L'adattatore del canale in entrata converte i messaggi Pub/Sub in POJO e poi li inoltra a un canale di messaggi.

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    L'esempio precedente utilizza i seguenti bean Spring e la risorsa Pub/Sub:

    • Un bean del canale di messaggi denominato inputMessageChannel.
    • Un bean adattatore del canale in entrata denominato inboundChannelAdapter di tipo PubSubInboundChannelAdapter.
    • Un ID sottoscrizione Pub/Sub denominato sub-one.

    inboundChannelAdapter recupera in modo asincrono i messaggi da sub-one utilizzando un PubSubTemplate e li invia a inputMessageChannel.

    inboundChannelAdapter imposta la modalità di riconoscimento su MANUAL, in modo che l'applicazione possa riconoscere i messaggi dopo averli elaborati. La modalità di riconoscimento predefinita dei tipi PubSubInboundChannelAdapter è AUTO.

    Il bean ServiceActivator messageReceiver registra ogni messaggio in arrivo in inputMessageChannel nell'output standard e quindi conferma il messaggio.

    Pubblicazione di messaggi in Pub/Sub

    Per pubblicare messaggi da un canale di messaggi a un argomento Pub/Sub, utilizza un adattatore del canale in uscita. L'adattatore del canale in uscita converte i POJO in messaggi Pub/Sub e poi li invia a un argomento Pub/Sub.

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    L'esempio precedente utilizza i seguenti bean Spring e la seguente risorsa Pub/Sub:

    • Un bean del canale di messaggi denominato inputMessageChannel.
    • Un bean adattatore del canale in uscita denominato messageSender di tipo PubSubMessageHandler.
    • Un ID argomento Pub/Sub denominato topic-two.

    Il bean ServiceActivator applica la logica in messageSender a ogni messaggio in inputMessageChannel.

    PubSubMessageHandler in messageSender pubblica messaggi in inputMessageChannel utilizzando un PubSubTemplate. PubSubMessageHandler pubblica i messaggi nell'argomento Pub/Sub topic-two.

    Utilizzo di Spring Cloud Stream Binder

    Per chiamare l'API Pub/Sub in un'applicazione Spring Cloud Stream, utilizza il modulo Spring Cloud GCP Pub/Sub Stream Binder.

    Installazione del modulo

    Per installare il modulo Spring Cloud Stream Binder, aggiungi quanto segue al file pom.xml:

    1. La BOM di Spring Cloud GCP.

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. L'artefatto Spring Cloud Stream Binder:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    Ricezione di messaggi da Pub/Sub

    Per utilizzare l'applicazione come sink di eventi, configura il binder di input specificando quanto segue:

    • Un bean Consumer che definisce la logica di gestione dei messaggi. Ad esempio, il seguente bean Consumer è denominato receiveMessageFromTopicTwo:

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • Un ID argomento Pub/Sub nel file di configurazione application.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-two:

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    Il codice di esempio riceve messaggi da Pub/Sub. L'esempio esegue queste operazioni:

    1. Trova l'ID argomento Pub/Sub topic-two nella destinazione del binding di input in application.properties.
    2. Crea una sottoscrizione Pub/Sub a topic-two.
    3. Utilizza il nome del binding receiveMessageFromTopicTwo-in-0 per trovare il bean Consumer denominato receiveMessageFromTopicTwo.
    4. Stampa i messaggi in arrivo nell'output standard e li conferma automaticamente.

    Pubblicazione di messaggi in Pub/Sub

    Per utilizzare l'applicazione come origine eventi, configura il binder di output specificando quanto segue:

    • Un bean Supplier che definisce la provenienza dei messaggi all'interno dell'applicazione. Ad esempio, il seguente bean Supplier è denominato sendMessageToTopicOne:

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • Un ID argomento Pub/Sub nel file di configurazione application.properties. Ad esempio, il seguente file di configurazione utilizza un ID argomento Pub/Sub denominato topic-one:

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    Il codice di esempio pubblica i messaggi su Pub/Sub. L'esempio esegue queste operazioni:

    1. Trova l'ID argomento Pub/Sub topic-one nella destinazione del binding di output in application.properties.
    2. Utilizza il nome del binding sendMessageToTopicOne-out-0 per trovare il bean Supplier denominato sendMessageToTopicOne.
    3. Invia un messaggio numerato a topic-one ogni 10 secondi.