Menggunakan Pub/Sub di aplikasi Spring

Halaman ini menjelaskan cara menggunakan Pub/Sub di aplikasi Java yang dibangun dengan Spring Framework.

Spring Cloud GCP memiliki beberapa modul untuk mengirim pesan ke topik Pub/Sub dan menerima pesan dari langganan Pub/Sub menggunakan Spring Framework. Anda dapat menggunakan modul ini secara terpisah atau menggabungkannya untuk berbagai kasus penggunaan:

CATATAN: Library Spring Cloud GCP tidak menyediakan akses ke AckReplyConsumerWithResponse, yang merupakan modul yang diperlukan untuk menerapkan fitur exactly-once menggunakan library klien Java.

Sebelum memulai

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. Tetapkan variabel lingkungan GOOGLE_CLOUD_PROJECT ke ID project Trusted Cloud Anda.
  4. Menggunakan Spring Cloud GCP Pub/Sub Starter

    Modul Spring Cloud GCP Pub/Sub Starter menginstal library klien Pub/Sub Java menggunakan modul Spring Cloud GCP Pub/Sub. Anda dapat memanggil Pub/Sub API dari aplikasi Spring menggunakan class yang disediakan oleh Spring Cloud GCP Pub/Sub Starter atau library klien Pub/Sub Java. Jika Anda menggunakan class yang disediakan oleh Spring Cloud GCP Pub/Sub Starter, Anda dapat mengganti konfigurasi Pub/Sub default.

    Menginstal modul

    Untuk menginstal modul Spring Cloud GCP Pub/Sub Starter, tambahkan dependensi ini ke file pom.xml Anda:

    1. Spring Cloud Bill of Materials (BOM):

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Artefak Spring Cloud GCP Pub/Sub Starter:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    Operasi yang didukung

    Modul Spring Cloud GCP Pub/Sub Starter mencakup class berikut:

    • PubSubAdmin untuk operasi administratif:
      • Buat topik dan langganan.
      • Mendapatkan topik dan langganan.
      • Mencantumkan topik dan langganan.
      • Menghapus topik dan langganan.
      • Mendapatkan dan menetapkan batas waktu konfirmasi pada langganan.
    • PubSubTemplate untuk mengirim dan menerima pesan:
      • Memublikasikan pesan ke topik.
      • Menarik pesan secara sinkron dari langganan.
      • Mengambil pesan dari langganan secara asinkron.
      • Mengonfirmasi pesan.
      • Mengubah batas waktu konfirmasi.
      • Mengonversi pesan Pub/Sub menjadi Plain Old Java Objects (POJO).

    Menggunakan adaptor saluran Spring Integration

    Jika aplikasi Spring Anda menggunakan saluran pesan Spring Integration, Anda dapat merutekan pesan antara saluran pesan dan Pub/Sub menggunakan adaptor saluran.

    Menginstal modul

    Untuk menginstal modul untuk adaptor saluran Spring Integration, tambahkan kode berikut ke file pom.xml Anda:

    1. Spring Cloud GCP BOM.

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Artefak Spring Cloud GCP Pub/Sub Starter dan Spring Integration Core:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    Menerima pesan dari Pub/Sub

    Untuk menerima pesan dari langganan Pub/Sub di aplikasi Spring, gunakan adaptor saluran masuk. Adaptor saluran masuk mengonversi pesan Pub/Sub masuk menjadi POJO, lalu meneruskan POJO ke saluran pesan.

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    Contoh di atas menggunakan bean Spring dan resource Pub/Sub berikut:

    • Bean saluran pesan bernama inputMessageChannel.
    • Bean adaptor saluran masuk bernama inboundChannelAdapter dari jenis PubSubInboundChannelAdapter.
    • ID langganan Pub/Sub bernama sub-one.

    inboundChannelAdapter secara asinkron menarik pesan dari sub-one menggunakan PubSubTemplate dan mengirim pesan ke inputMessageChannel.

    inboundChannelAdapter menetapkan mode konfirmasi ke MANUAL sehingga aplikasi dapat mengonfirmasi pesan setelah memprosesnya. Mode konfirmasi default jenis PubSubInboundChannelAdapter adalah AUTO.

    Bean ServiceActivator messageReceiver mencatat setiap pesan yang tiba di inputMessageChannel ke output standar, lalu mengonfirmasi pesan.

    Memublikasikan pesan ke Pub/Sub

    Untuk memublikasikan pesan dari saluran pesan ke topik Pub/Sub, gunakan adaptor saluran keluar. Adaptor saluran keluar mengonversi POJO menjadi pesan Pub/Sub, lalu mengirimkan pesan ke topik Pub/Sub.

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    Contoh di atas menggunakan bean Spring dan resource Pub/Sub berikut:

    • Bean saluran pesan bernama inputMessageChannel.
    • Bean adaptor saluran keluar bernama messageSender berjenis PubSubMessageHandler.
    • ID topik Pub/Sub bernama topic-two.

    Bean ServiceActivator menerapkan logika di messageSender ke setiap pesan di inputMessageChannel.

    PubSubMessageHandler di messageSender memublikasikan pesan di inputMessageChannel menggunakan PubSubTemplate. PubSubMessageHandler memublikasikan pesan ke topik Pub/Sub topic-two.

    Menggunakan Binder Spring Cloud Stream

    Untuk memanggil Pub/Sub API di aplikasi Spring Cloud Stream, gunakan modul Spring Cloud GCP Pub/Sub Stream Binder.

    Menginstal modul

    Untuk menginstal modul Spring Cloud Stream Binder, tambahkan kode berikut ke file pom.xml Anda:

    1. Spring Cloud GCP BOM.

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Artefak Spring Cloud Stream Binder:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    Menerima pesan dari Pub/Sub

    Untuk menggunakan aplikasi sebagai sink peristiwa, konfigurasi pengikat input dengan menentukan hal berikut:

    • Bean Consumer yang menentukan logika penanganan pesan. Misalnya, bean Consumer berikut diberi nama receiveMessageFromTopicTwo:

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • ID topik Pub/Sub dalam file konfigurasi application.properties. Misalnya, file konfigurasi berikut menggunakan ID topik Pub/Sub bernama topic-two:

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    Contoh kode menerima pesan dari Pub/Sub. Contoh ini melakukan hal berikut:

    1. Menemukan ID topik Pub/Sub topic-two di tujuan binding input dalam application.properties.
    2. Membuat langganan Pub/Sub ke topic-two.
    3. Menggunakan nama binding receiveMessageFromTopicTwo-in-0 untuk menemukan bean Consumer bernama receiveMessageFromTopicTwo.
    4. Mencetak pesan masuk ke output standar dan secara otomatis mengonfirmasi pesan tersebut.

    Memublikasikan pesan ke Pub/Sub

    Untuk menggunakan aplikasi Anda sebagai sumber peristiwa, konfigurasi pengikat output dengan menentukan hal berikut:

    • Bean Supplier yang menentukan asal pesan dalam aplikasi Anda. Misalnya, bean Supplier berikut diberi nama sendMessageToTopicOne:

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • ID topik Pub/Sub dalam file konfigurasi application.properties. Misalnya, file konfigurasi berikut menggunakan ID topik Pub/Sub bernama topic-one:

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    Contoh kode memublikasikan pesan ke Pub/Sub. Contoh ini melakukan hal berikut:

    1. Menemukan ID topik Pub/Sub topic-one di tujuan binding output di application.properties.
    2. Menggunakan nama binding sendMessageToTopicOne-out-0 untuk menemukan bean Supplier bernama sendMessageToTopicOne.
    3. Mengirim pesan bernomor ke topic-one setiap 10 detik.