Pub/Sub を Apache Kafka に接続する

このドキュメントでは、Pub/Sub グループの Kafka コネクタを使用して Apache Kafka と Pub/Sub を統合する方法について説明します。

Pub/Sub Kafka コネクタの概要

Apache Kafka は、イベント ストリーミングのオープンソース プラットフォームです。分散型アーキテクチャでよく使用され、疎結合のコンポーネント間の通信を可能にします。Pub/Sub は、メッセージを非同期で送受信するためのマネージド サービスです。Kafka と同様に、Pub/Sub を使用してクラウド アーキテクチャ内のコンポーネント間で通信できます。

Pub/Sub グループの Kafka コネクタを使用すると、これら 2 つのシステムを統合できます。コネクタ JAR には次のコネクタがパッケージ化されています。

  • シンクコネクタは、1 つ以上の Kafka トピックからレコードを読み取り、Pub/Sub に公開します。
  • ソースコネクタは、Pub/Sub トピックからメッセージを読み取り、Kafka にパブリッシュします。

Pub/Sub グループの Kafka コネクタを使用する可能性のあるいくつかのシナリオを次に示します。

  • Kafka ベースのアーキテクチャを Cloud de Confiance by S3NSに移行する。
  • フロントエンド システムがCloud de Confiance by S3NS外の Kafka にイベントを保存するが、Kafka イベントを受信する必要がある一部のバックエンド サービスを実行するために Cloud de Confiance by S3NS も使用する。
  • オンプレミスの Kafka ソリューションからログを収集し、Cloud de Confiance by S3NS に送信してデータ分析を行う。
  • フロントエンド システムは Cloud de Confiance by S3NSを使用しているが、Kafka を使用してオンプレミスでもデータを保存する。

このコネクタには、Kafka と他のシステム間でデータをストリーミングするためのフレームワークである Kafka コネクト が必要です。このコネクタを使用するには、Kafka クラスタとともに Kafka Connect を実行する必要があります。

このドキュメントでは、Kafka と Pub/Sub の両方に精通していることを前提としています。この文書を読む前に、Pub/Sub クイックスタートのいずれかを完了することをお勧めします。

Pub/Sub コネクタは、 Cloud de Confiance IAM と Kafka Connect ACL 間の統合をサポートしていません。

コネクタを使ってみる

このセクションでは、次のタスクについて説明します。

  1. Pub/Sub グループの Kafka コネクタを構成する
  2. Kafka から Pub/Sub にイベントを送信する。
  3. Pub/Sub から Kafka にメッセージを送信する。

前提条件

Kafka をインストールする

Apache Kafka クイックスタートに沿って、ローカルマシンにシングルノード Kafka をインストールします。クイックスタートで、次の手順を行います。

  1. 最新の Kafka リリースをダウンロードして展開します。
  2. Kafka 環境を起動します。
  3. Kafka トピックを作成します。

認証

Pub/Sub メッセージを送受信するために、Pub/Sub グループの Kafka コネクタが Pub/Sub による認証を行う必要があります。認証を設定するには、次の手順を実行します。

  1. Google Cloud CLI をインストールします。

  2. フェデレーション ID(連携 ID)を使用するように gcloud CLI を構成します。

    詳細については、連携 ID を使用して gcloud CLI にログインするをご覧ください。

  3. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  4. Cloud de Confiance プロジェクトを作成または選択します

    プロジェクトの選択または作成に必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、resourcemanager.projects.create 権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する
    • Cloud de Confiance プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Cloud de Confiance プロジェクトの名前に置き換えます。

    • 作成した Cloud de Confiance プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Cloud de Confiance プロジェクトの名前に置き換えます。

  5. ユーザー アカウントのローカル認証情報を作成します。

    gcloud auth application-default login

    認証エラーが返され、外部 ID プロバイダ(IdP)を使用している場合は、 連携 ID を使用して gcloud CLI にログインしていることを確認します。

  6. ユーザー アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    次のように置き換えます。

コネクタ JAR をダウンロードする

ローカルマシンにコネクタの JAR ファイルをダウンロードします。詳細については、GitHub ReadMe のコネクタの取得をご覧ください。

コネクタ構成ファイルをコピーする

  1. コネクタの GitHub リポジトリのクローンを作成するか、ダウンロードします。

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. config ディレクトリの内容を Kafka インストールの config サブディレクトリにコピーします。

    cp config/* [path to Kafka installation]/config/
    

これらのファイルには、コネクタの構成設定が含まれています。

Kafka Connect 構成を更新する

  1. ダウンロードした Kafka コネクト バイナリが含まれているディレクトリに移動します。
  2. Kafka コネクト バイナリ ディレクトリにある config/connect-standalone.properties という名前のファイルをテキスト エディタで開きます。
  3. plugin.path property がコメントアウトされている場合は、コメント化解除します。
  4. plugin.path property を更新して、コネクタ JAR へのパスを追加します。

    例:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. offset.storage.file.filename プロパティをローカル ファイル名に設定します。スタンドアロン モードでは、Kafka はこのファイルを使用してオフセット データを保存します。

    例:

    offset.storage.file.filename=/tmp/connect.offsets
    

Kafka から Pub/Sub にイベントを転送する

このセクションでは、シンクコネクタを起動し、Kafka にイベントをパブリッシュして、Pub/Sub から転送されたメッセージを読み取る方法について説明します。

  1. Google Cloud CLI を使用して、サブスクリプションを含む Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    以下を置き換えます。

    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: トピックの Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-sink-connector.properties というファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC

    以下を置き換えます。

    • KAFKA_TOPICS: 読み取る Kafka トピックのカンマ区切りのリスト。
    • PROJECT_ID: Pub/Sub トピックを含む Cloud de Confiance by S3NS プロジェクト。
    • PUBSUB_TOPIC: Kafka からメッセージを受信する Pub/Sub トピック。
  3. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Apache Kafka クイックスタートの手順に沿って、Kafka トピックにイベントを書き込みます。

  5. gcloud CLI を使用して、Pub/Sub からイベントを読み取ります。

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

Pub/Sub から Kafka にメッセージを転送する。

このセクションでは、ソースコネクタの起動、Pub/Sub へのメッセージのパブリッシュ、Kafka から転送されたメッセージの読み取りを行う方法について説明します。

  1. gcloud CLI を使用して、サブスクリプションで Pub/Sub トピックを作成します。

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

    以下を置き換えます。

    • PUBSUB_TOPIC: Pub/Sub トピックの名前。
    • PUBSUB_SUBSCRIPTION: Pub/Sub サブスクリプションの名前。
  2. テキスト エディタで /config/cps-source-connector.properties という名前のファイルを開きます。コメントで "TODO" とマークされている次のプロパティの値を追加します。

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION

    以下を置き換えます。

    • KAFKA_TOPIC: Pub/Sub メッセージを受信する Kafka トピック。
    • PROJECT_ID: Pub/Sub トピックを含む Cloud de Confiance by S3NS プロジェクト。
    • PUBSUB_TOPIC: Pub/Sub トピック。
  3. Kafka ディレクトリから次のコマンドを実行します。

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. メッセージを gcloud CLI を使用して Pub/Sub にパブリッシュします。

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
  5. Kafka からのメッセージを読み取ります。Apache Kafka クイックスタートの手順に沿って、Kafka トピックからメッセージを読み取ります。

メッセージ コンバージョン

Kafka レコードにはキーと値が含まれています。これらは可変長のバイト配列です。必要に応じて、Key-Value ペアであるKafka レコードのヘッダーを含めることもできます。Pub/Sub メッセージには、メッセージ本文と 0 個以上の Key-Value 属性の 2 つの主要部分があります。

Kafka コネクトはコンバータを使用して、Kafka との間でキーと値をシリアル化します。 シリアル化を制御するには、コネクタ構成ファイルで次のプロパティを設定します。

  • key.converter: レコードキーをシリアル化するために使用されるコンバータ。
  • value.converter: レコード値をシリアル化するために使用されるコンバータ。

Pub/Sub メッセージの本文は ByteString オブジェクトであるため、最も効率的な変換はペイロードを直接コピーすることです。そのため、可能であれば、同じメッセージ本文のシリアル化解除と再シリアル化を防ぐため、プリミティブ データ型(整数、浮動小数点、文字列、バイトスキーマ)を生成するコンバータの使用をおすすめします。

Kafka から Pub/Sub への変換

シンクコネクタは、Kafka レコードを次のように Pub/Sub メッセージに変換します。

  • Kafka レコードキーは、Pub/Sub メッセージに "key" という名前の属性として保存されます。
  • デフォルトでは、コネクタは Kafka レコードのヘッダーをすべてドロップします。ただし、headers.publish 構成オプションを true に設定すると、コネクタはヘッダーを Pub/Sub 属性として書き込みます。コネクタは、Pub/Sub のメッセージ属性の制限を超えるヘッダーをスキップします。
  • 整数、浮動小数点数、文字列、バイトのスキーマの場合、コネクタは Kafka レコード値のバイトを Pub/Sub メッセージ本文に直接渡します。
  • 構造体スキーマの場合、コネクタは各フィールドを Pub/Sub メッセージの属性として書き込みます。たとえば、フィールドが { "id"=123 } の場合、生成される Pub/Sub メッセージには "id"="123" という属性が与えられます。フィールドの値は常に文字列に変換されます。マップ型と構造体型は、構造体内のフィールド型としてサポートされていません。
  • マップスキーマの場合、コネクタは各 Key-Value ペアを Pub/Sub メッセージの属性として書き込みます。たとえば、マップが {"alice"=1,"bob"=2} の場合、結果の Pub/Sub メッセージには "alice"="1""bob"="2" の 2 つの属性を持ちます。キーと値は文字列に変換されます。

構造体とマップのスキーマには、次のような追加の動作があります。

  • 必要に応じて、messageBodyName 構成プロパティを設定することで、特定の構造体フィールドまたはマップキーをメッセージ本文に指定できます。フィールドやキーの値はメッセージ本文に ByteString として格納されます。messageBodyName を設定しない場合、構造体とマップのスキーマのメッセージ本文は空になります。

  • 配列値の場合、コネクタはプリミティブ配列タイプのみをサポートします。配列内の値の順序は、1 つの ByteString オブジェクトに連結されます。

Pub/Sub から Kafka への変換

ソースコネクタは、Pub/Sub メッセージを次のように Kafka レコードに変換します。

  • Kafka レコードキー: デフォルトでは、キーは null に設定されています。必要に応じて、kafka.key.attribute 構成オプションを設定して、キーとして使用する Pub/Sub メッセージ属性を指定できます。その場合、コネクタはその名前の属性を検索し、レコードキーを属性値に設定します。指定された属性が存在しない場合、レコードキーは null に設定されます。

  • Kafka レコード値. コネクタはレコード値を次のように書き込みます。

    • Pub/Sub メッセージにカスタム属性がない場合、コネクタは value.converter によって指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] 型として Kafka レコード値に直接書き込みます。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headersfalse の場合、コネクタはレコード値に構造体を書き込みます。この構造体は、属性ごとに 1 つのフィールドと、Pub/Sub メッセージ本文(バイトとして保存)を持つ "message" という名前のフィールドを含んでいます。

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      この場合、struct スキーマと互換性のある value.converterorg.apache.kafka.connect.json.JsonConverter など)を使用する必要があります。

    • Pub/Sub メッセージにカスタム属性があり、kafka.record.headerstrue の場合、コネクタは属性を Kafka レコード ヘッダーとして書き込みます。value.converter で指定されたコンバータを使用して、Pub/Sub メッセージ本文を byte[] タイプとして Kafka レコード値に直接書き込みます。

  • Kafka レコード ヘッダー。デフォルトでは、kafka.record.headerstrue に設定しない限りヘッダーは空です。

構成オプション

Kafka Connect API が提供する構成に加えて、Pub/Sub グループの Kafka コネクタは、Pub/Sub コネクタの構成で説明されているシンクとソースの構成をサポートしています。

サポートの利用

ご不明な点がある場合は、サポート チケットを作成してください。一般的な質問やディスカッションについては、GitHub リポジトリで問題を作成してください。

次のステップ