Spring アプリケーションでの Pub/Sub の使用

このページでは、Spring Framework で作成された Java アプリケーションで Pub/Sub を使用する方法について説明します。

Spring Cloud GCP には、Spring Framework を使用して Pub/Sub トピックにメッセージを送信し、Pub/Sub サブスクリプションからメッセージを受信するためのモジュールが複数あります。これらのモジュールは、異なるユースケースに対して個別に使用することも、組み合わせて使用することもできます。

注: Spring Cloud GCP ライブラリでは、Java クライアント ライブラリを使用して 1 回限りの機能を実装するための必須モジュールである AckReplyConsumerWithResponse にはアクセスできません。

始める前に

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. 環境変数 GOOGLE_CLOUD_PROJECT を Trusted Cloud プロジェクト ID に設定します。
  4. Spring Cloud GCP Pub/Sub Starter の使用

    Spring Cloud GCP Pub/Sub Starter モジュールは、Spring Cloud GCP Pub/Sub モジュールを使用して Pub/Sub Java クライアント ライブラリをインストールします。Spring Cloud GCP Pub/Sub Starter が提供するクラスまたは Pub/Sub Java クライアント ライブラリを使用して、Spring アプリケーションから Pub/Sub API を呼び出すことができます。Spring Cloud GCP Pub/Sub Starter が提供するクラスを使用している場合は、デフォルトの Pub/Sub 構成をオーバーライドできます。

    モジュールのインストール

    Spring Cloud GCP Pub/Sub Starter モジュールをインストールするには、次の依存関係を pom.xml ファイルに追加します。

    1. Spring Cloud Bill of Materials(BOM):

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub Starter アーティファクト:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    サポートされているオペレーション

    Spring Cloud GCP Pub/Sub Starter モジュールには、次のクラスが含まれています。

    • 管理オペレーションのための PubSubAdmin:
      • トピックとサブスクリプションの作成
      • トピックとサブスクリプションの取得
      • トピックとサブスクリプションの一覧表示
      • トピックとサブスクリプションの削除
      • サブスクリプションの確認応答期限の取得と設定
    • メッセージを送受信するためのPubSubTemplate:
      • メッセージのトピックへのパブリッシュ
      • サブスクリプションからメッセージを同期的に pull する
      • サブスクリプションからメッセージを非同期で pull する
      • メッセージの確認応答
      • 確認応答期限の変更
      • Pub/Sub メッセージをプレーン オールド Java オブジェクト(POJO)に変換する

    Spring 統合チャネル アダプタの使用

    Spring アプリケーションで Spring 統合メッセージ チャネルを使用する場合、チャネル アダプタを使用して、メッセージ チャネルと Pub/Sub の間でメッセージをルーティングできます。

    モジュールのインストール

    Spring Integration チャネル アダプタ用のモジュールをインストールするには、次のものを pom.xml ファイルに追加します。

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub Starter と Spring Integration Core のアーティファクト。

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    Pub/Sub からのメッセージの受信

    Spring アプリケーションで Pub/Sub サブスクリプションからメッセージを受信するには、受信チャネル アダプタを使用します。受信チャネル アダプタは、受信した Pub/Sub メッセージを POJO に変換し、POJO をメッセージ チャネルに転送します。

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。

    • inputMessageChannel という名前のメッセージ チャネル Bean
    • 型が PubSubInboundChannelAdapterinboundChannelAdapter という名前の受信チャネル アダプタ Bean。
    • sub-one という名前の Pub/Sub サブスクリプション ID。

    inboundChannelAdapter は、PubSubTemplate を使用して sub-one から非同期でメッセージを pull し、inputMessageChannel にメッセージを送信します。

    inboundChannelAdapter で確認応答モードを MANUAL に設定すると、アプリケーションはメッセージの処理後にメッセージの確認応答ができます。PubSubInboundChannelAdapter 型のデフォルトの確認応答モードは、AUTO です。

    ServiceActivator Bean messageReceiverは、inputMessageChannel に到着する各メッセージを標準出力に記録してから、メッセージを確認応答します。

    Pub/Sub へのメッセージのパブリッシュ

    メッセージ チャネルから Pub/Sub トピックにメッセージをパブリッシュするには、送信チャネル アダプタを使用します。送信チャネル アダプタは、POJO を Pub/Sub メッセージに変換し、メッセージを Pub/Sub トピックに送信します。

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。

    • inputMessageChannel という名前のメッセージ チャネル Bean
    • 型が PubSubMessageHandlermessageSender という名前の発信チャネル アダプタ Bean。
    • topic-two という名前の Pub/Sub トピック ID。

    ServiceActivator Bean は、messageSender のロジックを inputMessageChannel 内の各メッセージに適用します。

    messageSenderPubSubMessageHandler は、PubSubTemplate を使用して inputMessageChannel にメッセージをパブリッシュします。PubSubMessageHandler は、Pub/Sub トピック topic-two にメッセージをパブリッシュします。

    Spring Cloud Stream Binder の使用

    Spring Cloud Stream アプリケーションで Pub/Sub API を呼び出すには、Spring Cloud GCP Pub/Sub Stream Binder モジュールを使用します。

    モジュールのインストール

    Spring Cloud Stream Binder モジュールをインストールするには、pom.xml ファイルに次の内容を追加します。

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud Stream Binder アーティファクト。

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    Pub/Sub からのメッセージの受信

    アプリケーションをイベントシンクとして使用するには、次のように入力バインダを構成します。

    • メッセージ処理ロジックを定義する Consumer Bean。たとえば、次の Consumer Bean は receiveMessageFromTopicTwo という名前になっています。

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • 構成ファイル application.properties 内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-two という名前の Pub/Sub トピック ID を使用しています。

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    このサンプルコードは、Pub/Sub からメッセージを受信します。この例では、次のことを行います。

    1. application.properties の入力バインディングの宛先で Pub/Sub トピック ID topic-two を検索します。
    2. topic-two への Pub/Sub サブスクリプションを作成します。
    3. バインディング名 receiveMessageFromTopicTwo-in-0 を使用して、receiveMessageFromTopicTwo という名前の Consumer Bean を見つけます。
    4. 着信したメッセージを標準出力に出力し、自動的に確認応答します。

    Pub/Sub へのメッセージのパブリッシュ

    アプリケーションをイベントソースとして使用するには、次のように出力バインダを構成します。

    • Supplier Bean は、アプリケーション内にメッセージが作られる場所を定義します。たとえば、次の Supplier Bean は sendMessageToTopicOne という名前になっています。

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • 構成ファイル application.properties 内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-one という名前の Pub/Sub トピック ID を使用しています。

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    このサンプルコードは、Pub/Sub にメッセージをパブリッシュします。この例では、次のことを行います。

    1. application.properties の出力バインディングの宛先で Pub/Sub トピック ID topic-one を検索します。
    2. バインディング名 sendMessageToTopicOne-out-0 を使用して、sendMessageToTopicOneという名前の Supplier Bean を見つけます。
    3. 10 秒ごとに topic-one に番号付きメッセージを送信します。