在 Spring 应用中使用 Pub/Sub

本页面介绍了如何在通过 Spring Framework 构建的 Java 应用中使用 Pub/Sub。

Spring Cloud GCP 具有多个模块,用于向 Pub/Sub 主题发送消息,以及使用 Spring Framework 从 Pub/Sub 订阅中接收消息。您可以单独使用这些模块,也可以将它们组合起来用于不同的使用场景:

注意:Spring Cloud GCP 库不提供对 AckReplyConsumerWithResponse 的访问权限,而该模块是使用 Java 客户端库实现“只发送一次”功能所需的模块。

准备工作

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. 将环境变量 GOOGLE_CLOUD_PROJECT 设置为您的 Trusted Cloud 项目 ID。
  4. 使用 Spring Cloud GCP Pub/Sub 入门版

    Spring Cloud GCP Pub/Sub 入门版模块使用 Spring Cloud GCP Pub/Sub 模块安装 Pub/Sub Java 客户端库。您可以使用 Spring Cloud GCP Pub/Sub 入门版提供的类或 Pub/Sub Java 客户端库从 Spring 应用调用 Pub/Sub API。如果您使用的是 Spring Cloud GCP Pub/Sub 入门版提供的类,则可以替换默认的 Pub/Sub 配置

    安装模块

    如需安装 Spring Cloud GCP Pub/Sub 入门版模块,请将以下依赖项添加到 pom.xml 文件中:

    1. Spring Cloud 物料清单 (BOM)

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub 入门版工件:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    支持的操作

    Spring Cloud GCP Pub/Sub 入门版模块包括以下类:

    • PubSubAdmin,用于管理操作:
      • 创建主题和订阅。
      • 获取主题和订阅。
      • 列出主题和订阅。
      • 删除主题和订阅。
      • 获取和设置订阅的确认时限。
    • PubSubTemplate,用于发送和接收消息:
      • 向主题发布消息。
      • 从订阅中同步拉取消息。
      • 从订阅中异步拉取消息。
      • 确认消息。
      • 修改确认时限。
      • 将 Pub/Sub 消息转换为普通的旧式 Java 对象 (POJO)。

    使用 Spring Integration 渠道适配器

    如果您的 Spring 应用使用 Spring Integration 消息通道,您可以使用渠道适配器在消息渠道和 Pub/Sub 之间路由消息。

    安装模块

    如需为 Spring Integration 渠道适配器安装模块,请将以下内容添加到 pom.xml 文件中:

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub 入门版和 Spring Integration 核心工件:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    从 Pub/Sub 接收消息

    如需从 Spring 应用中的 Pub/Sub 订阅接收消息,请使用入站渠道适配器。入站渠道适配器将传入的 Pub/Sub 消息转换为 POJO,然后将 POJO 转发到消息渠道。

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

    • 名为 inputMessageChannel 的消息渠道 Bean。
    • 名为 inboundChannelAdapter 的入站渠道适配器 Bean,类型为 PubSubInboundChannelAdapter
    • 名为 sub-one 的 Pub/Sub 订阅 ID。

    inboundChannelAdapter 使用 PubSubTemplatesub-one 中异步拉取消息,并将消息发送到 inputMessageChannel

    inboundChannelAdapter 会将确认模式设置为 MANUAL,以便应用可以在处理消息后进行确认。PubSubInboundChannelAdapter 类型的默认确认模式是 AUTO

    ServiceActivator Bean messageReceiver 会将到达 inputMessageChannel 的每条消息记录到标准输出,然后确认消息。

    将消息发布到 Pub/Sub

    如需将消息渠道的消息发布到 Pub/Sub 主题,请使用出站渠道适配器。出站渠道适配器将 POJO 转换为 Pub/Sub 消息,然后将这些消息发送到 Pub/Sub 主题。

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    上面的示例使用以下 Spring Bean 和 Pub/Sub 资源:

    • 名为 inputMessageChannel 的消息渠道 Bean。
    • 名为 messageSender 的出站渠道适配器 Bean,类型为 PubSubMessageHandler
    • 名为 topic-two 的 Pub/Sub 主题 ID。

    ServiceActivator Bean 将 messageSender 中的逻辑应用于 inputMessageChannel 中的每条消息。

    messageSender 中的 PubSubMessageHandler 使用 PubSubTemplateinputMessageChannel 中发布消息。PubSubMessageHandler 会将消息发布到 Pub/Sub 主题 topic-two

    使用 Spring Cloud Stream Binder

    如需在 Spring Cloud Stream 应用中调用 Pub/Sub API,请使用 Spring Cloud GCP Pub/Sub Stream Binder 模块。

    安装模块

    如需安装 Spring Cloud Stream Binder 模块,请将以下内容添加到 pom.xml 文件中:

    1. Spring Cloud GCP BOM

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud Stream Binder 工件:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    从 Pub/Sub 接收消息

    要将应用用作事件接收器,请通过指定以下内容来配置输入 Binder:

    • 定义消息处理逻辑的 Consumer Bean。例如,以下 Consumer Bean 名为 receiveMessageFromTopicTwo

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-two 的 Pub/Sub 主题 ID:

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    示例代码会从 Pub/Sub 接收消息。该示例执行以下操作:

    1. application.properties 的输入绑定目的地中找到 Pub/Sub 主题 ID topic-two
    2. 创建针对 topic-two 的 Pub/Sub 订阅。
    3. 使用绑定名称 receiveMessageFromTopicTwo-in-0 查找名为 receiveMessageFromTopicTwoConsumer Bean。
    4. 将传入的消息输出到标准输出并自动确认。

    将消息发布到 Pub/Sub

    要将应用用作事件源,请指定以下内容以配置输出 Binder:

    • 一个 Supplier Bean,用于定义消息来自应用中的何处。例如,以下 Supplier Bean 名为 sendMessageToTopicOne

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • 配置文件 application.properties 中的 Pub/Sub 主题 ID。例如,以下配置文件使用名为 topic-one 的 Pub/Sub 主题 ID:

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    示例代码会将消息发布到 Pub/Sub。该示例执行以下操作:

    1. application.properties 的输出绑定目的地中找到 Pub/Sub 主题 ID topic-one
    2. 使用绑定名称 sendMessageToTopicOne-out-0 查找名为 sendMessageToTopicOneSupplier Bean。
    3. 每 10 秒向 topic-one 发送一条已编号的消息。