Spring 애플리케이션에서 Pub/Sub 사용

이 페이지에서는 Spring Framework로 빌드된 Java 애플리케이션에서 Pub/Sub를 사용하는 방법을 설명합니다.

Spring Cloud GCP에는 Spring Framework를 사용하여 Pub/Sub 구독에서 메시지를 수신하고 Pub/Sub 주제로 메시지를 전송하기 위한 몇 가지 모듈이 포함되어 있습니다. 이러한 모듈은 여러 사용 사례에 맞게 개별적으로 또는 결합된 형태로 사용할 수 있습니다.

참고: Spring Cloud GCP 라이브러리에서는 Java 클라이언트 라이브러리를 사용하여 1회만 구현하는 기능을 구현하는 데 필요한 모듈인 AckReplyConsumerWithResponse에 대해 액세스 권한을 제공하지 않습니다.

시작하기 전에

  1. Set up a Trusted Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Trusted Cloud console.

  2. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  3. 환경 변수 GOOGLE_CLOUD_PROJECT를 Trusted Cloud 프로젝트 ID로 설정합니다.
  4. Spring Cloud GCP Pub/Sub Starter 사용

    Spring Cloud GCP Pub/Sub Starter 모듈은 Spring Cloud GCP Pub/Sub 모듈을 사용하여 Pub/Sub Java 클라이언트 라이브러리를 설치합니다. Spring Cloud GCP Pub/Sub Starter가 제공하는 클래스 또는 Pub/Sub Java 클라이언트 라이브러리를 사용하여 Spring 애플리케이션에서 Pub/Sub API를 호출할 수 있습니다. Spring Cloud GCP Pub/Sub Starter가 제공하는 클래스를 사용하는 경우 기본 Pub/Sub 구성을 무효화할 수 있습니다.

    모듈 설치

    Spring Cloud GCP Pub/Sub Starter 모듈을 설치하려면 해당 종속 항목을 pom.xml 파일에 추가합니다.

    1. Spring Cloud 명세서(BOM):

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub Starter 아티팩트:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>

    지원되는 작업

    Spring Cloud GCP Pub/Sub Starter 모듈에는 다음 클래스가 포함되어 있습니다.

    • 관리 작업을 위한 PubSubAdmin:
      • 주제 및 구독을 만듭니다.
      • 주제 및 구독을 가져옵니다.
      • 주제 및 구독을 나열합니다.
      • 주제 및 구독을 삭제합니다.
      • 구독에 대한 확인 기한을 가져오고 설정합니다.
    • 메시지 송신 및 수신을 위한 PubSubTemplate:
      • 주제에 메시지를 게시합니다.
      • 구독에서 동기식으로 메시지를 가져옵니다.
      • 구독에서 비동기식으로 메시지를 가져옵니다.
      • 메시지를 확인합니다.
      • 확인 기한을 수정합니다.
      • Pub/Sub 메시지를 일반 구형 자바 객체(POJO)로 변환합니다.

    Spring Integration 채널 어댑터 사용

    Spring 애플리케이션에 Spring 통합 메시지 채널이 사용되는 경우 채널 어댑터를 사용하여 메시지 채널과 Pub/Sub 사이에 메시지를 라우팅할 수 있습니다.

    모듈 설치

    Spring 통합 채널 어댑터에 대해 모듈을 설치하려면 pom.xml 파일에 다음을 추가합니다.

    1. Spring Cloud GCP BOM.

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud GCP Pub/Sub Starter 및 Spring 통합 코어 아티팩트:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
      </dependency>
      <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
      </dependency>

    Pub/Sub에서 메시지 수신

    Spring 애플리케이션에서 Pub/Sub 구독으로부터 메시지를 수신하려면 인바운드 채널 어댑터를 사용합니다. 인바운드 채널 어댑터는 수신되는 Pub/Sub 메시지를 POJO로 변환하고 POJO를 메시지 채널로 전달합니다.

    // Create a message channel for messages arriving from the subscription `sub-one`.
    @Bean
    public MessageChannel inputMessageChannel() {
      return new PublishSubscribeChannel();
    }
    
    // Create an inbound channel adapter to listen to the subscription `sub-one` and send
    // messages to the input message channel.
    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
        @Qualifier("inputMessageChannel") MessageChannel messageChannel,
        PubSubTemplate pubSubTemplate) {
      PubSubInboundChannelAdapter adapter =
          new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
      adapter.setOutputChannel(messageChannel);
      adapter.setAckMode(AckMode.MANUAL);
      adapter.setPayloadType(String.class);
      return adapter;
    }
    
    // Define what happens to the messages arriving in the message channel.
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public void messageReceiver(
        String payload,
        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
      LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
      message.ack();
    }

    위 예시에서는 다음 Spring bean 및 Pub/Sub 리소스가 사용됩니다.

    • 이름이 inputMessageChannel인 메시지 채널 bean
    • 이름이 inboundChannelAdapter이고 유형이 PubSubInboundChannelAdapter인 인바운드 채널 어댑터 bean
    • 이름이 sub-one인 Pub/Sub 구독 ID

    inboundChannelAdapterPubSubTemplate를 사용하여 sub-one에서 메시지를 비동기식으로 가져오고 메시지를 inputMessageChannel로 전송합니다.

    inboundChannelAdapter는 애플리케이션이 메시지 처리 후 이를 확인할 수 있도록 확인 모드를 MANUAL로 설정합니다. PubSubInboundChannelAdapter 유형의 기본 확인 모드는 AUTO입니다.

    ServiceActivator bean messageReceiverinputMessageChannel에서 도착하는 각 메시지를 표준 출력에 로깅한 후 메시지를 확인합니다.

    Pub/Sub에 메시지 게시

    메시지 채널에서 Pub/Sub 주제에 메시지를 게시하려면 아웃바운드 채널 어댑터를 사용합니다. 아웃바운드 채널 어댑터는 POJO를 Pub/Sub 메시지로 변환한 후 메시지를 Pub/Sub 주제로 전송합니다.

    // Create an outbound channel adapter to send messages from the input message channel to the
    // topic `topic-two`.
    @Bean
    @ServiceActivator(inputChannel = "inputMessageChannel")
    public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
      PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");
    
      adapter.setSuccessCallback(
          ((ackId, message) ->
              LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));
    
      adapter.setFailureCallback(
          (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));
    
      return adapter;
    }

    위 예시에서는 다음 Spring bean 및 Pub/Sub 리소스가 사용됩니다.

    • 이름이 inputMessageChannel인 메시지 채널 bean
    • 이름이 messageSender이고 유형이 PubSubMessageHandler인 아웃바운드 채널 어댑터 bean
    • 이름이 topic-two인 Pub/Sub 주제 ID

    ServiceActivator bean은 messageSender의 논리를 inputMessageChannel의 각 메시지에 적용합니다.

    messageSenderPubSubMessageHandlerPubSubTemplate을 사용하여 inputMessageChannel의 메시지를 게시합니다. PubSubMessageHandler는 메시지를 Pub/Sub 주제 topic-two에 게시합니다.

    Spring Cloud Stream Binder 사용

    Spring Cloud Stream 애플리케이션에서 Pub/Sub API를 호출하려면 Spring Cloud GCP Pub/Sub Stream Binder 모듈을 사용합니다.

    모듈 설치

    Spring Cloud Stream Binder 모듈을 설치하려면 pom.xml 파일에 다음을 추가합니다.

    1. Spring Cloud GCP BOM.

      <dependencyManagement>
        <dependencies>
          <dependency>
            <groupId>com.google.cloud</groupId>
            <artifactId>spring-cloud-gcp-dependencies</artifactId>
            <version>3.7.7</version>
            <type>pom</type>
            <scope>import</scope>
          </dependency>
        </dependencies>
      </dependencyManagement>
    2. Spring Cloud Stream Binder 아티팩트:

      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
      </dependency>

    Pub/Sub에서 메시지 수신

    애플리케이션을 이벤트 싱크로 사용하려면 다음을 지정하여 입력 바인더를 구성합니다.

    • 메시지 처리 논리를 정의하는 Consumer bean. 예를 들어 다음 Consumer bean은 이름이 receiveMessageFromTopicTwo입니다.

      // Create an input binder to receive messages from `topic-two` using a Consumer bean.
      @Bean
      public Consumer<Message<String>> receiveMessageFromTopicTwo() {
        return message -> {
          LOGGER.info(
              "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
        };
      }
    • 구성 파일 application.properties의 Pub/Sub 주제 ID. 예를 들어 다음 구성 파일은 topic-two라는 Pub/Sub 주제 ID를 사용합니다.

      # Bind the Pub/Sub topic `topic-two` to the Consumer bean
      # `receiveMessageFromTopicTwo`. Your Spring application will
      # automatically create and attach a subscription to the topic.
      spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

    이 예시 코드는 Pub/Sub에서 메시지를 수신합니다. 이 예시는 다음을 수행합니다.

    1. application.properties의 입력 바인딩 대상에서 Pub/Sub 주제 ID topic-two를 찾습니다.
    2. topic-two에 대해 Pub/Sub 구독을 만듭니다.
    3. 바인딩 이름 receiveMessageFromTopicTwo-in-0을 사용하여 receiveMessageFromTopicTwo라는 Consumer bean을 찾습니다.
    4. 수신 메시지를 표준 출력으로 출력하고 이를 자동으로 확인합니다.

    Pub/Sub에 메시지 게시

    애플리케이션을 이벤트 소스로 사용하려면 다음을 지정하여 출력 바인더를 구성합니다.

    • 애플리케이션 내에서 메시지가 수신되는 위치를 정의하는 Supplier bean. 예를 들어 다음 Supplier bean은 이름이 sendMessageToTopicOne입니다.

      // Create an output binder to send messages to `topic-one` using a Supplier bean.
      @Bean
      public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
        return () ->
            Flux.<Message<String>>generate(
                    sink -> {
                      try {
                        Thread.sleep(10000);
                      } catch (InterruptedException e) {
                        // Stop sleep earlier.
                      }
      
                      Message<String> message =
                          MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                      LOGGER.info(
                          "Sending a message via the output binder to topic-one! Payload: "
                              + message.getPayload());
                      sink.next(message);
                    })
                .subscribeOn(Schedulers.boundedElastic());
      }
    • 구성 파일 application.properties의 Pub/Sub 주제 ID. 예를 들어 다음 구성 파일은 topic-one라는 Pub/Sub 주제 ID를 사용합니다.

      # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
      # `topic-one`. If the topic does not exist, one will be created.
      spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

    이 예시 코드는 Pub/Sub에 메시지를 게시합니다. 이 예시는 다음을 수행합니다.

    1. application.properties의 출력 바인딩 대상에서 Pub/Sub 주제 ID topic-one를 찾습니다.
    2. 바인딩 이름 sendMessageToTopicOne-out-0을 사용하여 sendMessageToTopicOne라는 Supplier bean을 찾습니다.
    3. 10초마다 topic-one으로 번호가 지정된 메시지를 전송합니다.