在 Spring 應用程式中使用 Pub/Sub

本頁說明如何在以 Spring Framework 建構的 Java 應用程式中使用 Pub/Sub。

Spring Cloud GCP 包含多個模組,可使用 Spring Framework 將訊息傳送至 Pub/Sub 主題,以及從 Pub/Sub 訂閱項目接收訊息。您可以單獨使用這些模組,也可以視不同用途組合使用:

注意:Spring Cloud GCP 程式庫不會提供 AckReplyConsumerWithResponse 的存取權,這是使用 Java 用戶端程式庫實作「只傳送一次」功能時的必要模組。

事前準備

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. GOOGLE_CLOUD_PROJECT 環境變數設為您的 Trusted Cloud 專案 ID。
  4. 使用 Spring Cloud GCP Pub/Sub Starter

    Spring Cloud GCP Pub/Sub Starter 模組會使用 Spring Cloud GCP Pub/Sub 模組安裝 Pub/Sub Java 用戶端程式庫。您可以使用 Spring Cloud GCP Pub/Sub Starter 提供的類別或 Pub/Sub Java 用戶端程式庫,從 Spring 應用程式呼叫 Pub/Sub API。如果您使用 Spring Cloud GCP Pub/Sub Starter 提供的類別,可以覆寫預設的 Pub/Sub 設定

    安裝模組

    如要安裝 Spring Cloud GCP Pub/Sub Starter 模組,請在 pom.xml 檔案中新增下列依附元件:

    1. Spring Cloud 物料清單 (BOM)

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub Starter 構件:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    支援的作業

    Spring Cloud GCP Pub/Sub Starter 模組包含下列類別:

    • PubSubAdmin 適用於管理作業:
      • 建立主題和訂閱項目。
      • 取得主題和訂閱項目。
      • 列出主題和訂閱項目。
      • 刪除主題和訂閱項目。
      • 取得及設定訂閱項目的確認期限。
    • PubSubTemplate 收發訊息:
      • 將訊息發布至主題。
      • 從訂閱項目同步提取訊息。
      • 從訂閱項目非同步提取訊息。
      • 確認訊息。
      • 修改確認期限。
      • 將 Pub/Sub 訊息轉換為簡單傳統的 Java 物件 (POJO)。

    使用 Spring Integration 管道轉接程式

    如果 Spring 應用程式使用 Spring Integration 訊息管道,您可以透過管道轉接程式,在訊息管道和 Pub/Sub 之間傳送訊息。

    安裝模組

    如要安裝 Spring Integration 管道介面的模組,請在 pom.xml 檔案中新增以下程式碼:

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub Starter 和 Spring Integration Core 構件:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    接收 Pub/Sub 訊息

    如要在 Spring 應用程式中接收 Pub/Sub 訂閱項目的訊息,請使用輸入管道轉接程式。內送管道轉接程式會將傳入的 Pub/Sub 訊息轉換為 POJO,然後將 POJO 轉送至訊息管道。

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    上述範例使用下列 Spring Bean 和 Pub/Sub 資源:

    • 名為 inputMessageChannel 的訊息管道 Bean。
    • 名為 inboundChannelAdapter 的輸入通道轉接器 Bean,類型為 PubSubInboundChannelAdapter
    • 名為 sub-one 的 Pub/Sub 訂閱項目 ID。

    inboundChannelAdapter 會使用 PubSubTemplatesub-one 非同步提取訊息,然後將訊息傳送至 inputMessageChannel

    inboundChannelAdapter 會將確認模式設為 MANUAL,因此應用程式可以在處理訊息後確認訊息。PubSubInboundChannelAdapter 類型的預設確認模式為 AUTO

    ServiceActivator Bean messageReceiver 會將抵達 inputMessageChannel 的每封郵件記錄到標準輸出內容,然後確認郵件。

    將訊息發布至 Pub/Sub

    如要將訊息從訊息管道發布至 Pub/Sub 主題,請使用輸出管道轉接程式。輸出管道轉接程式會將 POJO 轉換為 Pub/Sub 訊息,然後將訊息傳送至 Pub/Sub 主題。

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    上述範例使用下列 Spring Bean 和 Pub/Sub 資源:

    • 名為 inputMessageChannel 的訊息管道 Bean。
    • 名為 messageSender 的外寄管道介面卡 Bean,類型為 PubSubMessageHandler
    • 名為 topic-two 的 Pub/Sub 主題 ID。

    ServiceActivator 豆會將 messageSender 中的邏輯套用至 inputMessageChannel 中的每則訊息。

    messageSender 中的 PubSubMessageHandler 會使用 PubSubTemplateinputMessageChannel 中發布訊息。PubSubMessageHandler 將訊息發布至 Pub/Sub 主題 topic-two

    使用 Spring Cloud Stream Binder

    如要在 Spring Cloud Stream 應用程式中呼叫 Pub/Sub API,請使用 Spring Cloud GCP Pub/Sub Stream Binder 模組。

    安裝模組

    如要安裝 Spring Cloud Stream Binder 模組,請在 pom.xml 檔案中新增以下內容:

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud Stream Binder 構件:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    接收 Pub/Sub 訊息

    如要將應用程式做為事件接收器,請指定下列項目來設定輸入繫結器:

    • 定義訊息處理邏輯的 Consumer Bean。舉例來說,下列 Consumer Bean 名為 receiveMessageFromTopicTwo

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • 設定檔 application.properties 中的 Pub/Sub 主題 ID。舉例來說,下列設定檔使用名為 topic-two 的 Pub/Sub 主題 ID:

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    範例程式碼會接收來自 Pub/Sub 的訊息。這個範例會執行下列操作:

    1. application.properties 的輸入繫結目的地中,找出 Pub/Sub 主題 ID topic-two
    2. 建立 topic-two 的 Pub/Sub 訂閱項目。
    3. 使用繫結名稱 receiveMessageFromTopicTwo-in-0 尋找名為 receiveMessageFromTopicTwoConsumer bean。
    4. 將收到的訊息列印至標準輸出內容,並自動確認訊息。

    將訊息發布至 Pub/Sub

    如要將應用程式做為事件來源,請指定下列項目來設定輸出繫結器:

    • Supplier Bean,用於定義應用程式內訊息的來源。舉例來說,下列 Supplier Bean 名為 sendMessageToTopicOne

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • 設定檔 application.properties 中的 Pub/Sub 主題 ID。舉例來說,下列設定檔使用名為 topic-one 的 Pub/Sub 主題 ID:

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    範例程式碼會將訊息發布至 Pub/Sub。這個範例會執行下列操作:

    1. application.properties 的輸出繫結目的地中,找出 Pub/Sub 主題 ID topic-one
    2. 使用繫結名稱 sendMessageToTopicOne-out-0 尋找名為 sendMessageToTopicOneSupplier bean。
    3. 每 10 秒傳送一則編號訊息給 topic-one