Confluent Cloud インポート トピックを作成する

Confluent Cloud インポート トピックを使用すると、Confluent Cloud から外部ソースとして Pub/Sub にデータを継続的に取り込むことができます。その後、Pub/Sub がサポートしている任意の宛先にデータをストリーミングできます。

このドキュメントでは、Confluent Cloud インポート トピックを作成して管理する方法について説明します。標準トピックを作成するには、標準トピックを作成するをご覧ください。

インポート トピックの詳細については、インポート トピックについてをご覧ください。

始める前に

必要なロールと権限

Confluent Cloud インポート トピックの作成と管理に必要な権限を取得するには、トピックまたはプロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールを付与するように管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、Confluent Cloud インポート トピックの作成と管理に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Confluent Cloud インポート トピックの作成と管理には、次の権限が必要です。

  • インポート トピックを作成する: pubsub.topics.create
  • インポート トピックを削除する: pubsub.topics.delete
  • インポート トピックを取得する: pubsub.topics.get
  • インポート トピックを一覧表示する: pubsub.topics.list
  • インポート トピックに公開する: pubsub.topics.publish and pubsub.serviceAgent
  • インポート トピックを更新する: pubsub.topics.update
  • インポート トピックの IAM ポリシーを取得する: pubsub.topics.getIamPolicy
  • インポート トピックの IAM ポリシーを構成する: pubsub.topics.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

Confluent Cloud にアクセスするためのフェデレーション ID を設定する

Workload Identity 連携を使用すると、 Trusted Cloud サービスが Trusted Cloudの外部で実行されているワークロードにアクセスできるようになります。ID 連携を使用すれば、他のクラウドのリソースにアクセスするために認証情報を維持したり、 Trusted Cloud に渡したりする必要がなくなります。代わりに、ワークロード自体の ID を使用して Trusted Cloud に対する認証を行い、リソースにアクセスできます。

Trusted Cloudにサービス アカウントを作成する

このステップの実行は任意です。すでにサービス アカウントがある場合は、新しいサービス アカウントを作成せずに、そのサービス アカウントをこの手順で使用できます。既存のサービス アカウントを使用している場合は、次のステップのためにサービス アカウントの一意の ID を記録するに進みます。

Confluent Cloud インポート トピックの場合、Pub/Sub はサービス アカウントを ID として使用して、Confluent Cloud からリソースにアクセスします。

前提条件、必要なロールと権限、命名ガイドラインなど、サービス アカウントの作成の詳細については、サービス アカウントを作成するをご覧ください。サービス アカウントの作成後、サービス アカウントが使用できるようになるまでに 60 秒以上かかる場合があります。この動作は、読み取りオペレーションが結果整合性に基づいているためです。新しいサービス アカウントが利用可能になるまで時間がかかることがあります。

サービス アカウントの一意の ID を記録する

Confluent Cloud コンソールで ID プロバイダとプールを設定するには、サービス アカウントの一意の ID が必要です。

  1. Trusted Cloud コンソールで、[サービス アカウント] の詳細ページに移動します。

    [サービス アカウント] に移動

  2. 作成したサービス アカウントまたは使用する予定のサービス アカウントをクリックします。

  3. [サービス アカウントの詳細] ページで、一意の ID 番号を記録します。

    この ID は、Confluent Cloud コンソールで ID プロバイダとプールを設定するワークフローの一部として必要になります。

サービス アカウント トークン作成者のロールを Pub/Sub サービス アカウントに追加する

サービス アカウント トークン作成者のロールroles/iam.serviceAccountTokenCreator)を使用すると、プリンシパルはサービス アカウントに有効期間の短い認証情報を作成できます。これらのトークンまたは認証情報は、サービス アカウントの権限を借用するために使用されます。

サービス アカウントの権限借用の詳細については、サービス アカウントの権限借用をご覧ください。

この手順で Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を追加することもできます。ロールの詳細、およびロールを追加する理由については、Pub/Sub パブリッシャーのロールを Pub/Sub サービス アカウントに追加するをご覧ください。

  1. Trusted Cloud コンソールで、[IAM] ページに移動します。

    IAM に移動

  2. [ S3NS提供のロール付与を含める] チェックボックスをオンにします。

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 形式のサービス アカウントを探します。

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. サービス アカウント トークン作成者ロールroles/iam.serviceAccountTokenCreator)を検索してクリックします。

  7. [保存] をクリックします。

Confluent Cloud で ID プロバイダを作成する

Confluent Cloud の認証を行うには、Google Cloud サービス アカウントに ID プールが必要です。まず、Confluent Cloud で ID プロバイダを作成する必要があります。

Confluent Cloud で ID プロバイダを作成する方法については、OAuth/OIDC ID プロバイダを追加するをご覧ください。

  1. Confluent Cloud コンソールにログインします。

  2. メニューで [アカウントとアクセス] をクリックします。

  3. [Workload Identity] をクリックします。

  4. [プロバイダを追加] をクリックします。

  5. [OAuth/OIDC] をクリックし、[次へ] をクリックします。

  6. [その他の OIDC プロバイダ] をクリックし、[次へ] をクリックします。

  7. ID プロバイダの名前と目的の説明を入力します。

  8. [詳細構成を表示] をクリックします。

  9. [発行者 URI] フィールドに「https://accounts.google.com」と入力します。

  10. [JWKS URI] フィールドに「https://www.googleapis.com/oauth2/v3/certs」と入力します。

  11. [Validate & Save] をクリックします。

ID プールを作成し、Confluent Cloud で適切なロールを付与する

ID プロファイルで ID プールを作成し、Pub/Sub サービス アカウントが Confluent Cloud Kafka トピックを認証して読み取ることができるように、必要なロールを付与する必要があります。

ID プールの作成に進む前に、クラスタが Confluent Cloud で作成されていることを確認してください。

ID プールの作成方法については、OAuth/OIDC ID プロバイダで ID プールを使用するをご覧ください。

  1. Confluent Cloud コンソールにログインします。

  2. メニューで [アカウントとアクセス] をクリックします。

  3. [Workload Identity] をクリックします。

  4. Confluent Cloud で ID プロバイダを作成するで作成した ID プロバイダをクリックします。

  5. [Add pool] をクリックします。

  6. ID プールの名前と説明を入力します。

  7. [ID クレーム] を claims に設定します。

  8. [フィルタを設定] で、[詳細] タブをクリックします。次のコードを入力します。

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    <SERVICE_ACCOUNT_UNIQUE_ID> は、サービス アカウントの一意の ID を記録するで確認したサービス アカウントの一意の ID に置き換えます。

  9. [次へ] をクリックします。

  10. [新しい権限を追加] をクリックします。[次へ] をクリックします。

  11. 関連するクラスタで、[ロールの割り当てを追加] をクリックします。

  12. [オペレーター] ロールをクリックし、[追加] をクリックします。

    このロールは Pub/Sub に付与されます。Pub/Sub に取り込む Confluent Kafka トピックを含むクラスタへのサービス アカウント アクセス。

  13. クラスタの下にある [トピック] をクリックします。[ロールの割り当てを追加] をクリックします。

  14. [DeveloperRead] ロールを選択します。

  15. 適切なオプションをクリックして、トピックまたはプレフィックスを指定します。たとえば、特定のトピック接頭辞ルールすべてのトピックなどです。

  16. [追加] をクリックします。

  17. [次へ] をクリックします。

  18. [Validate & Save] をクリックします。

Pub/Sub パブリッシャーのロールを Pub/Sub プリンシパルに追加する

公開を有効にするには、Pub/Sub が Confluent Cloud インポート トピックに公開できるように、Pub/Sub サービス アカウントにパブリッシャーのロールを割り当てる必要があります。

Pub/Sub サービス エージェントのロールを Pub/Sub サービス アカウントに追加する

Pub/Sub がインポート トピック プロジェクトのパブリッシュ割り当てを使用できるようにするには、Pub/Sub サービス エージェントにインポート トピックのプロジェクトに対する serviceusage.services.use 権限が必要です。

この権限を付与するには、Pub/Sub サービス アカウントに Pub/Sub サービス エージェントのロールを追加することをおすすめします。

Pub/Sub サービス アカウントに Pub/Sub サービス エージェント ロールがない場合は、次のように付与できます。

  1. Trusted Cloud コンソールで、[IAM] ページに移動します。

    IAM に移動

  2. [ S3NS提供のロール付与を含める] チェックボックスをオンにします。

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 形式のサービス アカウントを探します。

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. Pub/Sub サービス エージェントのロールroles/pubsub.serviceAgent)を検索してクリックします。

  7. [保存] をクリックします。

すべてのトピックからのパブリッシュを有効にする

Confluent Cloud インポート トピックを作成していない場合は、この方法を使用します。

  1. Trusted Cloud コンソールで、[IAM] ページに移動します。

    IAM に移動

  2. [ S3NS提供のロール付与を含める] チェックボックスをオンにします。

  3. service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 形式のサービス アカウントを探します。

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を検索してクリックします。

  7. [保存] をクリックします。

単一トピックからのパブリッシュを有効にする

このメソッドは、Confluent Cloud インポート トピックがすでに存在する場合にのみ使用します。

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics add-iam-policy-binding コマンドを実行します。

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    次のように置き換えます。

    • TOPIC_ID: Confluent Cloud インポート トピックのトピック ID。

    • PROJECT_NUMBER: プロジェクト番号。プロジェクト番号を表示するには、プロジェクトを特定するをご覧ください。

  3. サービス アカウントのユーザーロールをサービス アカウントに追加する

    サービス アカウント ユーザーのロールroles/iam.serviceAccountUser)には、プリンシパルがサービス アカウントを Confluent Cloud インポート トピックの取り込み設定に接続し、そのサービス アカウントを連携 ID に使用できるようにする権限 iam.serviceAccounts.actAs が含まれています。

    1. Trusted Cloud コンソールで、[IAM] ページに移動します。

      IAM に移動

    2. トピックの作成または更新の呼び出しを発行するプリンシパルに対して、[プリンシパルを編集] ボタンをクリックします。

    3. 必要に応じて、[別のロールを追加] をクリックします。

    4. サービス アカウント ユーザーロールroles/iam.serviceAccountUser)を検索してクリックします。

    5. [保存] をクリックします。

    Confluent Cloud インポート トピックを使用する

    新しいインポート トピックを作成することも、既存のトピックを編集することもできます。

    考慮事項

    • トピックとサブスクリプションを別々に作成すると、連続して作成した場合でもデータが失われる可能性があります。トピックは、定期購入なしで利用できる期間が短く設定されています。この間にトピックにデータが送信されると、データは失われます。最初にトピックを作成し、サブスクリプションを作成してから、トピックをインポート トピックに変換することで、インポート プロセス中にメッセージが失われることはありません。

    • 既存のインポート トピックの Kafka トピックを同じ名前で再作成する必要がある場合、Kafka トピックを削除して再作成することはできません。この操作により、Pub/Sub のオフセット管理が無効になり、データ損失が発生する可能性があります。この問題を軽減するには、次の手順を行います。

      • Pub/Sub インポート トピックを削除します。
      • Kafka トピックを削除します。
      • Kafka トピックを作成します。
      • Pub/Sub インポート トピックを作成します。
    • Confluent Cloud Kafka トピックのデータは、常に最も古いオフセットから読み取られます。

    Confluent Cloud インポート トピックを作成する

    トピックに関連付けられたプロパティの詳細については、トピックのプロパティをご覧ください。

    次の手順を完了していることを確認してください。

    Confluent Cloud インポート トピックを作成する手順は次のとおりです。

    コンソール

    1. Google Cloud コンソールの トピック ページに移動します。

      [トピック] に移動

    2. [トピックを作成] をクリックします。
    3. [トピック ID] フィールドに、インポート トピックの ID を入力します。トピックの命名の詳細については、命名ガイドラインをご覧ください。
    4. [デフォルトのサブスクリプションを追加する] を選択します。
    5. [取り込みを有効にする] を選択します。
    6. 取り込み元として [Confluent Cloud] を選択します。
    7. 次の詳細情報を入力します。
      1. ブートストラップ サーバー: Pub/Sub に取り込む Kafka トピックを含むクラスタのブートストラップ サーバー。形式は hostname:port です。
      2. クラスタ ID: Pub/Sub に取り込む Kafka トピックを含むクラスタの ID。
      3. トピック: Pub/Sub に取り込む Kafka トピックの名前。
      4. Identity プール ID: Confluent Cloud での認証に使用される Identity プールのプール ID。
      5. サービス アカウント: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。
    8. [トピックを作成] をクリックします。

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. gcloud pubsub topics create コマンドを実行します。
      gcloud pubsub topics create TOPIC_ID 
      --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER
      --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID
      --confluent-cloud-ingestion-topic CONFLUENT_TOPIC
      --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID
      --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      次のように置き換えます。

      • TOPIC_ID: Pub/Sub トピックの名前または ID。
      • CONFLUENT_BOOTSTRAP_SERVER: Pub/Sub に取り込む Kafka トピックを含むクラスタのブートストラップ サーバー。形式は hostname:port です。
      • CONFLUENT_CLUSTER_ID: Pub/Sub に取り込む Kafka トピックを含むクラスタの ID。
      • CONFLUENT_TOPIC: Pub/Sub に取り込む Kafka トピックの名前。
      • CONFLUENT_IDENTITY_POOL_ID: Confluent Cloud での認証に使用される Identity プールのプール ID。
      • PUBSUB_SERVICE_ACCOUNT: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。
    3. C++

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string const& bootstrap_server,
         std::string const& cluster_id, std::string const& confluent_topic,
         std::string const& identity_pool_id,
         std::string const& gcp_service_account) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                                    ->mutable_confluent_cloud();
        confluent_cloud->set_bootstrap_server(bootstrap_server);
        confluent_cloud->set_cluster_id(cluster_id);
        confluent_cloud->set_topic(confluent_topic);
        confluent_cloud->set_identity_pool_id(identity_pool_id);
        confluent_cloud->set_gcp_service_account(gcp_service_account);
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。まだ v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 コードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

      import (
      	"context"
      	"fmt"
      	"io"
      
      	"cloud.google.com/go/pubsub"
      )
      
      func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      
      	// // Confluent Cloud ingestion settings.
      	// bootstrapServer := "bootstrap-server"
      	// clusterID := "cluster-id"
      	// confluentTopic := "confluent-topic"
      	// poolID := "identity-pool-id"
      	// gcpSA := "gcp-service-account"
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	cfg := &pubsub.TopicConfig{
      		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
      			Source: &pubsub.IngestionDataSourceConfluentCloud{
      				BootstrapServer:   bootstrapServer,
      				ClusterID:         clusterID,
      				Topic:             confluentTopic,
      				IdentityPoolID:    poolID,
      				GCPServiceAccount: gcpSA,
      			},
      		},
      	}
      	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", t)
      	return nil
      }
      

      Java

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      
      public class CreateTopicWithConfluentCloudIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Confluent Cloud ingestion settings.
          String bootstrapServer = "bootstrap-server";
          String clusterId = "cluster-id";
          String confluentTopic = "confluent-topic";
          String identityPoolId = "identity-pool-id";
          String gcpServiceAccount = "gcp-service-account";
      
          createTopicWithConfluentCloudIngestionExample(
              projectId,
              topicId,
              bootstrapServer,
              clusterId,
              confluentTopic,
              identityPoolId,
              gcpServiceAccount);
        }
      
        public static void createTopicWithConfluentCloudIngestionExample(
            String projectId,
            String topicId,
            String bootstrapServer,
            String clusterId,
            String confluentTopic,
            String identityPoolId,
            String gcpServiceAccount)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
      
            IngestionDataSourceSettings.ConfluentCloud confluentCloud =
                IngestionDataSourceSettings.ConfluentCloud.newBuilder()
                    .setBootstrapServer(bootstrapServer)
                    .setClusterId(clusterId)
                    .setTopic(confluentTopic)
                    .setIdentityPoolId(identityPoolId)
                    .setGcpServiceAccount(gcpServiceAccount)
                    .build();
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bootstrapServer = 'url:port';
      // const clusterId = 'YOUR_CLUSTER_ID';
      // const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
      // const identityPoolId = 'pool-ID';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithConfluentCloudIngestion(
        topicNameOrId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      ) {
        // Creates a new topic with Confluent Cloud ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            confluentCloud: {
              bootstrapServer,
              clusterId,
              topic: confluentTopic,
              identityPoolId,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
      }

      Node.ts

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bootstrapServer = 'url:port';
      // const clusterId = 'YOUR_CLUSTER_ID';
      // const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
      // const identityPoolId = 'pool-ID';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      import {PubSub} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithConfluentCloudIngestion(
        topicNameOrId: string,
        bootstrapServer: string,
        clusterId: string,
        confluentTopic: string,
        identityPoolId: string,
        gcpServiceAccount: string,
      ) {
        // Creates a new topic with Confluent Cloud ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            confluentCloud: {
              bootstrapServer,
              clusterId,
              topic: confluentTopic,
              identityPoolId,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
      }

      Python

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

      from google.cloud import pubsub_v1
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bootstrap_server = "your-bootstrap-server"
      # cluster_id = "your-cluster-id"
      # confluent_topic = "your-confluent-topic"
      # identity_pool_id = "your-identity-pool-id"
      # gcp_service_account = "your-gcp-service-account"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
                  bootstrap_server=bootstrap_server,
                  cluster_id=cluster_id,
                  topic=confluent_topic,
                  identity_pool_id=identity_pool_id,
                  gcp_service_account=gcp_service_account,
              )
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

    問題が発生した場合は、Confluent Cloud インポート トピックのトラブルシューティングをご覧ください。

    Confluent Cloud Hubs インポート トピックを編集する

    Confluent Cloud インポート トピックの取り込みデータソースの設定を編集する手順は次のとおりです。

    コンソール

    1. Trusted Cloud コンソールで、[トピック] ページに移動します。

      [トピック] に移動

    2. Confluent Cloud インポート トピックをクリックします。

    3. [トピックの詳細] ページで、[編集] をクリックします。

    4. 変更するフィールドを更新します。

    5. [更新] をクリックします。

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

      インポート トピックの設定が失われないようにするには、トピックを更新するたびにすべての設定を含めるようにしてください。何かを省略すると、Pub/Sub は設定を元のデフォルト値にリセットします。

    2. 次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

      gcloud pubsub topics update TOPIC_ID \
         --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
         --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
         --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
         --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
         --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      次のように置き換えます。

      • TOPIC_ID: Pub/Sub トピックの名前または ID。
      • CONFLUENT_BOOTSTRAP_SERVER: Pub/Sub に取り込む Kafka トピックを含むクラスタのブートストラップ サーバー。形式は hostname:port です。
      • CONFLUENT_CLUSTER_ID: Pub/Sub に取り込む Kafka トピックを含むクラスタの ID
      • CONFLUENT_TOPIC: Pub/Sub に取り込む Kafka トピックの名前。
      • CONFLUENT_IDENTITY_POOL_ID: Confluent Cloud での認証に使用される Identity プールのプール ID。
      • CONFLUENT_IDENTITY_POOL_ID: Google Cloud でサービス アカウントを作成するで作成したサービス アカウント。

    割り当てと上限

    インポート トピックのパブリッシャーのスループットは、トピックのパブリッシュ割り当てによってバインドされます。詳細については、Pub/Sub の割り当てと上限をご覧ください。

    次のステップ

    Apache Kafka® は、Apache Software Foundation または米国その他の諸国における関連会社の商標です。