Confluent Cloud インポート トピックを使用すると、Confluent Cloud から外部ソースとして Pub/Sub にデータを継続的に取り込むことができます。その後、Pub/Sub がサポートしている任意の宛先にデータをストリーミングできます。
このドキュメントでは、Confluent Cloud インポート トピックを作成して管理する方法について説明します。標準トピックを作成するには、標準トピックを作成するをご覧ください。
インポート トピックの詳細については、インポート トピックについてをご覧ください。
始める前に
Pub/Sub 公開プロセスの詳細を確認する。
次のものを含む、Confluent Cloud インポート トピックの管理に必要なロールと権限を構成します。
Trusted Cloud が外部ストリーミング サービスにアクセスできるように、Workload Identity 連携を設定します。
必要なロールと権限
Confluent Cloud インポート トピックの作成と管理に必要な権限を取得するには、トピックまたはプロジェクトに対する Pub/Sub 編集者 (roles/pubsub.editor
)IAM ロールを付与するように管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
この事前定義ロールには、Confluent Cloud インポート トピックの作成と管理に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。
必要な権限
Confluent Cloud インポート トピックの作成と管理には、次の権限が必要です。
-
インポート トピックを作成する:
pubsub.topics.create
-
インポート トピックを削除する:
pubsub.topics.delete
-
インポート トピックを取得する:
pubsub.topics.get
-
インポート トピックを一覧表示する:
pubsub.topics.list
-
インポート トピックに公開する:
pubsub.topics.publish and pubsub.serviceAgent
-
インポート トピックを更新する:
pubsub.topics.update
-
インポート トピックの IAM ポリシーを取得する:
pubsub.topics.getIamPolicy
-
インポート トピックの IAM ポリシーを構成する:
pubsub.topics.setIamPolicy
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。
Confluent Cloud にアクセスするためのフェデレーション ID を設定する
Workload Identity 連携を使用すると、 Trusted Cloud サービスが Trusted Cloudの外部で実行されているワークロードにアクセスできるようになります。ID 連携を使用すれば、他のクラウドのリソースにアクセスするために認証情報を維持したり、 Trusted Cloud に渡したりする必要がなくなります。代わりに、ワークロード自体の ID を使用して Trusted Cloud に対する認証を行い、リソースにアクセスできます。
Trusted Cloudにサービス アカウントを作成する
このステップの実行は任意です。すでにサービス アカウントがある場合は、新しいサービス アカウントを作成せずに、そのサービス アカウントをこの手順で使用できます。既存のサービス アカウントを使用している場合は、次のステップのためにサービス アカウントの一意の ID を記録するに進みます。
Confluent Cloud インポート トピックの場合、Pub/Sub はサービス アカウントを ID として使用して、Confluent Cloud からリソースにアクセスします。
前提条件、必要なロールと権限、命名ガイドラインなど、サービス アカウントの作成の詳細については、サービス アカウントを作成するをご覧ください。サービス アカウントの作成後、サービス アカウントが使用できるようになるまでに 60 秒以上かかる場合があります。この動作は、読み取りオペレーションが結果整合性に基づいているためです。新しいサービス アカウントが利用可能になるまで時間がかかることがあります。
サービス アカウントの一意の ID を記録する
Confluent Cloud コンソールで ID プロバイダとプールを設定するには、サービス アカウントの一意の ID が必要です。
Trusted Cloud コンソールで、[サービス アカウント] の詳細ページに移動します。
作成したサービス アカウントまたは使用する予定のサービス アカウントをクリックします。
[サービス アカウントの詳細] ページで、一意の ID 番号を記録します。
この ID は、Confluent Cloud コンソールで ID プロバイダとプールを設定するワークフローの一部として必要になります。
サービス アカウント トークン作成者のロールを Pub/Sub サービス アカウントに追加する
サービス アカウント トークン作成者のロール(roles/iam.serviceAccountTokenCreator
)を使用すると、プリンシパルはサービス アカウントに有効期間の短い認証情報を作成できます。これらのトークンまたは認証情報は、サービス アカウントの権限を借用するために使用されます。
サービス アカウントの権限借用の詳細については、サービス アカウントの権限借用をご覧ください。
この手順で Pub/Sub パブリッシャーのロール(roles/pubsub.publisher
)を追加することもできます。ロールの詳細、およびロールを追加する理由については、Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加するをご覧ください。
Trusted Cloud コンソールで、[IAM] ページに移動します。
[ S3NS提供のロール付与を含める] チェックボックスをオンにします。
service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com
形式のサービス アカウントを探します。このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。
必要に応じて、[別のロールを追加] をクリックします。
サービス アカウント トークン作成者ロール(
roles/iam.serviceAccountTokenCreator
)を検索してクリックします。[保存] をクリックします。
Confluent Cloud で ID プロバイダを作成する
Confluent Cloud の認証を行うには、Google Cloud サービス アカウントに ID プールが必要です。まず、Confluent Cloud で ID プロバイダを作成する必要があります。
Confluent Cloud で ID プロバイダを作成する方法については、OAuth/OIDC ID プロバイダを追加するをご覧ください。
Confluent Cloud コンソールにログインします。
メニューで [アカウントとアクセス] をクリックします。
[Workload Identity] をクリックします。
[プロバイダを追加] をクリックします。
[OAuth/OIDC] をクリックし、[次へ] をクリックします。
[その他の OIDC プロバイダ] をクリックし、[次へ] をクリックします。
ID プロバイダの名前と目的の説明を入力します。
[詳細構成を表示] をクリックします。
[発行者 URI] フィールドに「
https://accounts.google.com
」と入力します。[JWKS URI] フィールドに「
https://www.googleapis.com/oauth2/v3/certs
」と入力します。[Validate & Save] をクリックします。
ID プールを作成し、Confluent Cloud で適切なロールを付与する
ID プロファイルで ID プールを作成し、Pub/Sub サービス アカウントが Confluent Cloud Kafka トピックを認証して読み取ることができるように、必要なロールを付与する必要があります。
ID プールの作成に進む前に、クラスタが Confluent Cloud で作成されていることを確認してください。
ID プールの作成方法については、OAuth/OIDC ID プロバイダで ID プールを使用するをご覧ください。
Confluent Cloud コンソールにログインします。
メニューで [アカウントとアクセス] をクリックします。
[Workload Identity] をクリックします。
Confluent Cloud で ID プロバイダを作成するで作成した ID プロバイダをクリックします。
[Add pool] をクリックします。
ID プールの名前と説明を入力します。
[ID クレーム] を
claims
に設定します。[フィルタを設定] で、[詳細] タブをクリックします。次のコードを入力します。
claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
<SERVICE_ACCOUNT_UNIQUE_ID>
は、サービス アカウントの一意の ID を記録するで確認したサービス アカウントの一意の ID に置き換えます。[次へ] をクリックします。
[新しい権限を追加] をクリックします。[次へ] をクリックします。
関連するクラスタで、[ロールの割り当てを追加] をクリックします。
[オペレーター] ロールをクリックし、[追加] をクリックします。
このロールは Pub/Sub に付与されます。Pub/Sub に取り込む Confluent Kafka トピックを含むクラスタへのサービス アカウント アクセス。
クラスタの下にある [トピック] をクリックします。[ロールの割り当てを追加] をクリックします。
[DeveloperRead] ロールを選択します。
適切なオプションをクリックして、トピックまたはプレフィックスを指定します。たとえば、特定のトピック、接頭辞ルール、すべてのトピックなどです。
[追加] をクリックします。
[次へ] をクリックします。
[Validate & Save] をクリックします。
Pub/Sub パブリッシャーのロールを Pub/Sub プリンシパルに追加する
公開を有効にするには、Pub/Sub が Confluent Cloud インポート トピックに公開できるように、Pub/Sub サービス アカウントにパブリッシャーのロールを割り当てる必要があります。
Pub/Sub サービス エージェントのロールを Pub/Sub サービス アカウントに追加する
Pub/Sub がインポート トピック プロジェクトのパブリッシュ割り当てを使用できるようにするには、Pub/Sub サービス エージェントにインポート トピックのプロジェクトに対する serviceusage.services.use
権限が必要です。
この権限を付与するには、Pub/Sub サービス アカウントに Pub/Sub サービス エージェントのロールを追加することをおすすめします。
Pub/Sub サービス アカウントに Pub/Sub サービス エージェント ロールがない場合は、次のように付与できます。
Trusted Cloud コンソールで、[IAM] ページに移動します。
[ S3NS提供のロール付与を含める] チェックボックスをオンにします。
service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com
形式のサービス アカウントを探します。このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。
必要に応じて、[別のロールを追加] をクリックします。
Pub/Sub サービス エージェントのロール(
roles/pubsub.serviceAgent
)を検索してクリックします。[保存] をクリックします。
すべてのトピックからのパブリッシュを有効にする
Confluent Cloud インポート トピックを作成していない場合は、この方法を使用します。
Trusted Cloud コンソールで、[IAM] ページに移動します。
[ S3NS提供のロール付与を含める] チェックボックスをオンにします。
service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com
形式のサービス アカウントを探します。このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。
必要に応じて、[別のロールを追加] をクリックします。
Pub/Sub パブリッシャーのロール(
roles/pubsub.publisher
)を検索してクリックします。[保存] をクリックします。
単一トピックからのパブリッシュを有効にする
このメソッドは、Confluent Cloud インポート トピックがすでに存在する場合にのみ使用します。
In the Trusted Cloud console, activate Cloud Shell.
At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
gcloud pubsub topics add-iam-policy-binding
コマンドを実行します。gcloud pubsub topics add-iam-policy-binding TOPIC_ID \ --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com" \ --role="roles/pubsub.publisher"
次のように置き換えます。
TOPIC_ID
: Confluent Cloud インポート トピックのトピック ID。PROJECT_NUMBER
: プロジェクト番号。プロジェクト番号を表示するには、プロジェクトを特定するをご覧ください。
Trusted Cloud コンソールで、[IAM] ページに移動します。
トピックの作成または更新の呼び出しを発行するプリンシパルに対して、[プリンシパルを編集] ボタンをクリックします。
必要に応じて、[別のロールを追加] をクリックします。
サービス アカウント ユーザーロール(
roles/iam.serviceAccountUser
)を検索してクリックします。[保存] をクリックします。
トピックとサブスクリプションを別々に作成すると、連続して作成した場合でもデータが失われる可能性があります。トピックは、定期購入なしで利用できる期間が短く設定されています。この間にトピックにデータが送信されると、データは失われます。最初にトピックを作成し、サブスクリプションを作成してから、トピックをインポート トピックに変換することで、インポート プロセス中にメッセージが失われることはありません。
既存のインポート トピックの Kafka トピックを同じ名前で再作成する必要がある場合、Kafka トピックを削除して再作成することはできません。この操作により、Pub/Sub のオフセット管理が無効になり、データ損失が発生する可能性があります。この問題を軽減するには、次の手順を行います。
- Pub/Sub インポート トピックを削除します。
- Kafka トピックを削除します。
- Kafka トピックを作成します。
- Pub/Sub インポート トピックを作成します。
Confluent Cloud Kafka トピックのデータは、常に最も古いオフセットから読み取られます。
-
Google Cloud コンソールの トピック ページに移動します。
- [トピックを作成] をクリックします。
- [トピック ID] フィールドに、インポート トピックの ID を入力します。トピックの命名の詳細については、命名ガイドラインをご覧ください。
- [デフォルトのサブスクリプションを追加する] を選択します。
- [取り込みを有効にする] を選択します。
- 取り込み元として [Confluent Cloud] を選択します。
- 次の詳細情報を入力します。
- ブートストラップ サーバー: Pub/Sub に取り込む Kafka トピックを含むクラスタのブートストラップ サーバー。形式は
hostname:port
です。 - クラスタ ID: Pub/Sub に取り込む Kafka トピックを含むクラスタの ID。
- トピック: Pub/Sub に取り込む Kafka トピックの名前。
- Identity プール ID: Confluent Cloud での認証に使用される Identity プールのプール ID。
- サービス アカウント: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。
- ブートストラップ サーバー: Pub/Sub に取り込む Kafka トピックを含むクラスタのブートストラップ サーバー。形式は
- [トピックを作成] をクリックします。
-
In the Trusted Cloud console, activate Cloud Shell.
At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
-
gcloud pubsub topics create
コマンドを実行します。gcloud pubsub topics create TOPIC_ID
--confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER
--confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID
--confluent-cloud-ingestion-topic CONFLUENT_TOPIC
--confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID
--confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT次のように置き換えます。
-
TOPIC_ID
: Pub/Sub トピックの名前または ID。 -
CONFLUENT_BOOTSTRAP_SERVER
: Pub/Sub に取り込む Kafka トピックを含むクラスタのブートストラップ サーバー。形式はhostname:port
です。 -
CONFLUENT_CLUSTER_ID
: Pub/Sub に取り込む Kafka トピックを含むクラスタの ID。 -
CONFLUENT_TOPIC
: Pub/Sub に取り込む Kafka トピックの名前。 -
CONFLUENT_IDENTITY_POOL_ID
: Confluent Cloud での認証に使用される Identity プールのプール ID。 -
PUBSUB_SERVICE_ACCOUNT
: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。
-
Trusted Cloud コンソールで、[トピック] ページに移動します。
Confluent Cloud インポート トピックをクリックします。
[トピックの詳細] ページで、[編集] をクリックします。
変更するフィールドを更新します。
[更新] をクリックします。
-
In the Trusted Cloud console, activate Cloud Shell.
At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
インポート トピックの設定が失われないようにするには、トピックを更新するたびにすべての設定を含めるようにしてください。何かを省略すると、Pub/Sub は設定を元のデフォルト値にリセットします。
次のサンプルに記載されているフラグをすべて指定して、
gcloud pubsub topics update
コマンドを実行します。gcloud pubsub topics update TOPIC_ID \ --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \ --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \ --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \ --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \ --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
次のように置き換えます。
TOPIC_ID
: Pub/Sub トピックの名前または ID。CONFLUENT_BOOTSTRAP_SERVER
: Pub/Sub に取り込む Kafka トピックを含むクラスタのブートストラップ サーバー。形式はhostname:port
です。CONFLUENT_CLUSTER_ID
: Pub/Sub に取り込む Kafka トピックを含むクラスタの IDCONFLUENT_TOPIC
: Pub/Sub に取り込む Kafka トピックの名前。CONFLUENT_IDENTITY_POOL_ID
: Confluent Cloud での認証に使用される Identity プールのプール ID。CONFLUENT_IDENTITY_POOL_ID
: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。
トピックのサブスクリプションの種類を選択する。
トピックにメッセージを公開する方法を学習する。
gcloud CLI、REST API、またはクライアント ライブラリを使用して、トピックを作成または変更する。
サービス アカウントのユーザーロールをサービス アカウントに追加する
サービス アカウント ユーザーのロール(roles/iam.serviceAccountUser
)には、プリンシパルがサービス アカウントを Confluent Cloud インポート トピックの取り込み設定に接続し、そのサービス アカウントを連携 ID に使用できるようにする権限 iam.serviceAccounts.actAs
が含まれています。
Confluent Cloud インポート トピックを使用する
新しいインポート トピックを作成することも、既存のトピックを編集することもできます。
考慮事項
Confluent Cloud インポート トピックを作成する
トピックに関連付けられたプロパティの詳細については、トピックのプロパティをご覧ください。
次の手順を完了していることを確認してください。
Confluent Cloud インポート トピックを作成する手順は次のとおりです。
コンソール
gcloud
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。
Go
次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。まだ v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 コードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。
Node.ts
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
問題が発生した場合は、Confluent Cloud インポート トピックのトラブルシューティングをご覧ください。
Confluent Cloud Hubs インポート トピックを編集する
Confluent Cloud インポート トピックの取り込みデータソースの設定を編集する手順は次のとおりです。
コンソール
gcloud
割り当てと上限
インポート トピックのパブリッシャーのスループットは、トピックのパブリッシュ割り当てによってバインドされます。詳細については、Pub/Sub の割り当てと上限をご覧ください。