Pub/Sub in Spring-Anwendungen verwenden

Auf dieser Seite wird beschrieben, wie Sie Pub/Sub in Java-Anwendungen verwenden, die mit dem Spring Framework erstellt wurden.

Spring Cloud GCP bietet mehrere Module, um mithilfe des Spring Framework Nachrichten an Pub/Sub-Themen zu senden und Nachrichten von Pub/Sub-Abos zu empfangen. Sie können diese Module unabhängig voneinander verwenden oder für verschiedene Anwendungsfälle kombinieren:

HINWEIS: Die Spring Cloud GCP-Bibliothek bietet keinen Zugriff auf AckReplyConsumerWithResponse. Dieses Modul ist jedoch erforderlich, um die Exactly-Once-Funktion mit der Java-Clientbibliothek zu implementieren.

Hinweise

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. Legen Sie die Umgebungsvariable GOOGLE_CLOUD_PROJECT auf Ihre Trusted Cloud Projekt-ID fest.
  4. Spring Cloud GCP Pub/Sub Starter verwenden

    Das Modul Spring Cloud GCP Pub/Sub Starter installiert die Pub/Sub-Java-Clientbibliothek mithilfe des Moduls Spring Cloud GCP Pub/Sub. Sie können die Pub/Sub API aus Ihrer Spring-Anwendung mit den Klassen aufrufen, die der Spring Cloud GCP Pub/Sub Starter oder die Pub/Sub Java-Clientbibliothek bereitstellt. Wenn Sie die Klassen verwenden, die vom Spring Cloud GCP Pub/Sub Starter bereitgestellt werden, können Sie die standardmäßigen Pub/Sub-Konfigurationen überschreiben.

    Modul installieren

    Fügen Sie der Datei pom.xml diese Abhängigkeiten hinzu, um das Modul „Spring Cloud GCP Pub/Sub-Starter“ zu installieren:

    1. Die Spring Cloud Bill of Materials (BOM):

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Das Artefakt „Spring Cloud GCP Pub/Sub Starter“:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    Unterstützte Vorgänge

    Das Modul „Spring Cloud GCP Pub/Sub-Starter“ enthält die folgenden Klassen:

    • PubSubAdmin für Verwaltungsvorgänge:
      • Themen und Abos erstellen.
      • Themen und Abos abrufen.
      • Themen und Abos auflisten.
      • Themen und Abos löschen.
      • Bestätigungsfristen für ein Abo abrufen und festlegen.
    • PubSubTemplate, um Nachrichten zu senden und zu empfangen:
      • Nachrichten in Themen veröffentlichen.
      • Nachrichten synchron von Abos abrufen.
      • Nachrichten asynchron von Abos abrufen.
      • Nachrichten bestätigen.
      • Bestätigungsfristen ändern.
      • Pub/Sub-Nachrichten in Plain Old Java Objects (POJOs) konvertieren.

    Spring Integration-Kanaladapter verwenden

    Wenn Ihre Spring-Anwendung Spring Integration-Nachrichtenkanäle verwendet, können Sie Nachrichten mithilfe von Kanaladaptern zwischen Ihren Nachrichtenkanälen und Pub/Sub weiterleiten.

    • Ein Eingangskanaladapter leitet Nachrichten von einem Pub/Sub-Abo an einen Nachrichtenkanal weiter.
    • Ein Ausgangskanaladapter veröffentlicht Nachrichten von einem Nachrichtenkanal in einem Pub/Sub-Thema.

    Module installieren

    Fügen Sie der Datei pom.xml Folgendes hinzu, um Module für Spring Integration-Kanaladapter zu installieren:

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Die Artefakte „Spring Cloud GCP Pub/Sub Starter“ und „Spring Integration Core“:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    Nachrichten von Pub/Sub empfangen

    Verwenden Sie einen Eingangskanaladapter, um Nachrichten aus einem Pub/Sub-Abo in Ihrer Spring-Anwendung zu empfangen. Der Eingangskanaladapter konvertiert eingehende Pub/Sub-Nachrichten in POJOs und leitet dann die POJOs an einen Nachrichtenkanal weiter.

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    Im obigen Beispiel werden die folgenden Spring Beans und Pub/Sub-Ressourcen verwendet:

    • Eine Nachrichtenkanal-Bean namens inputMessageChannel.
    • Eine Eingangskanaladapter-Bean namens inboundChannelAdapter vom Typ PubSubInboundChannelAdapter.
    • Pub/Sub-Abo-ID namens sub-one.

    Der inboundChannelAdapter ruft Nachrichten mithilfe eines PubSubTemplate asynchron aus sub-one ab und sendet die Nachrichten an inputMessageChannel.

    Das inboundChannelAdapter setzt den Bestätigungsmodus auf MANUAL, damit die Anwendung Nachrichten bestätigen kann, nachdem sie sie verarbeitet hat. Der Standardbestätigungsmodus von PubSubInboundChannelAdapter-Typen ist AUTO.

    Die ServiceActivator-Bean messageReceiver protokolliert jede in inputMessageChannel ankommende Nachricht in der Standardausgabe und bestätigt dann die Nachricht.

    Nachrichten in Pub/Sub veröffentlichen

    Verwenden Sie einen Ausgangskanaladapter, um Nachrichten von einem Nachrichtenkanal in einem Pub/Sub-Thema zu veröffentlichen. Der ausgehende Kanaladapter konvertiert POJOs in Pub/Sub-Nachrichten und sendet die Nachrichten dann an ein Pub/Sub-Thema.

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    Im obigen Beispiel werden die folgenden Spring-Beans und Pub/Sub-Ressourcen verwendet:

    • Eine Nachrichtenkanal-Bean namens inputMessageChannel.
    • Eine Ausgangskanaladapter-Bean namens messageSender vom Typ PubSubMessageHandler.
    • Eine Pub/Sub-Themen-ID namens topic-two.

    Die ServiceActivator-Bean wendet die Logik in messageSender auf jede Nachricht in inputMessageChannel an.

    Der PubSubMessageHandler in messageSender veröffentlicht Nachrichten im inputMessageChannel mithilfe eines PubSubTemplate. Der PubSubMessageHandler veröffentlicht Nachrichten für das Pub/Sub-Thema topic-two.

    Spring Cloud Stream Binder verwenden

    Verwenden Sie das Modul Spring Cloud GCP Pub/Sub Stream Binder, um die Pub/Sub API in einer Spring Cloud Stream-Anwendung aufzurufen.

    Modul installieren

    Fügen Sie der Datei pom.xml Folgendes hinzu, um das Modul „Spring Cloud Stream Bender“ zu installieren:

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Das Artefakt „Spring Cloud Stream Bender“:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    Nachrichten von Pub/Sub empfangen

    Um Ihre Anwendung als Ereignissenke zu verwenden, konfigurieren Sie den Eingabebinder. Geben Sie dazu Folgendes an:

    • Eine Bean vom Typ Consumer, die die Logik der Nachrichtenverarbeitung definiert. Die folgende Bean vom Typ Consumer hat beispielsweise den Namen receiveMessageFromTopicTwo:

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • Eine Pub/Sub-Themen-ID in der Konfigurationsdatei application.properties. Die folgende Konfigurationsdatei verwendet beispielsweise eine Pub/Sub-Themen-ID namens topic-two:

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    Der Beispielcode empfängt Nachrichten von Pub/Sub. Das Beispiel führt Folgendes aus:

    1. Ermittelt die Pub/Sub-Themen-ID topic-two im Eingabebindungsziel in application.properties.
    2. Erstellt ein Pub/Sub-Abo für topic-two.
    3. Verwendet den Bindungsnamen receiveMessageFromTopicTwo-in-0, um die Bean vom Typ Consumer namens receiveMessageFromTopicTwo zu finden.
    4. Gibt eingehende Nachrichten in der Standardausgabe aus und bestätigt sie automatisch.

    Nachrichten in Pub/Sub veröffentlichen

    Wenn Sie Ihre Anwendung als Ereignisquelle verwenden möchten, konfigurieren Sie den Ausgabebinder. Geben Sie dazu Folgendes an:

    • Eine Bean vom Typ Supplier, die definiert, woher Nachrichten aus Ihrer Anwendung stammen. Die folgende Bean vom Typ Supplier hat beispielsweise den Namen sendMessageToTopicOne:

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • Eine Pub/Sub-Themen-ID in der Konfigurationsdatei application.properties. Die folgende Konfigurationsdatei verwendet beispielsweise eine Pub/Sub-Themen-ID namens topic-one:

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    Der Beispielcode veröffentlicht Nachrichten an Pub/Sub. Das Beispiel führt Folgendes aus:

    1. Findet die Pub/Sub-Themen-ID topic-one im Ausgabebindungsziel in application.properties.
    2. Verwendet den Bindungsnamen sendMessageToTopicOne-out-0, um die Bean vom Typ Supplier namens sendMessageToTopicOne zu finden.
    3. Sendet alle 10 Sekunden eine nummerierte Nachricht an topic-one.