建立 Amazon Kinesis Data Streams 匯入主題

Amazon Kinesis Data Streams 匯入主題可讓您從 Amazon Kinesis Data Streams 持續擷取資料,做為外部來源並匯入 Pub/Sub。接著,您就可以將資料串流至 Pub/Sub 支援的任何目的地。

如要進一步瞭解匯入主題,請參閱「關於匯入主題」。

事前準備

必要角色和權限

如要取得建立及管理 Amazon Kinesis Data Streams 匯入主題所需的權限,請要求管理員授予您主題或專案的 Pub/Sub 編輯者 (roles/pubsub.editor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備建立及管理 Amazon Kinesis Data Streams 匯入主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要建立及管理 Amazon Kinesis Data Streams 匯入主題,您必須具備下列權限:

  • 建立匯入主題: pubsub.topics.create
  • 刪除匯入主題: pubsub.topics.delete
  • 取得匯入主題: pubsub.topics.get
  • 列出匯入主題: pubsub.topics.list
  • 發布至匯入主題: pubsub.topics.publish and pubsub.serviceAgent
  • 更新匯入主題: pubsub.topics.update
  • 取得匯入主題的身分與存取權管理政策: pubsub.topics.getIamPolicy
  • 設定匯入主題的 IAM 政策 pubsub.topics.setIamPolicy

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

設定聯合身分,存取 Amazon Kinesis Data Streams

Workload Identity 聯盟可讓 Trusted Cloud 服務存取 Trusted Cloud外部執行的工作負載。使用身分聯盟時,您不需要維護或傳遞憑證,即可存取其他雲端中的資源。 Trusted Cloud 您可以改用工作負載本身的 ID 向 Trusted Cloud 進行驗證,並存取資源。

在 Trusted Cloud中建立服務帳戶

這個步驟可以省略。如果您已有服務帳戶,可以在這個程序中使用該帳戶,不必建立新的服務帳戶。如果您使用現有的服務帳戶,請前往「記錄服務帳戶的專屬 ID」進行下一個步驟。

如果是 Amazon Kinesis Data Streams 匯入主題,Pub/Sub 會使用服務帳戶做為身分,從 AWS 存取資源。

如要進一步瞭解如何建立服務帳戶,包括必要條件、必要角色和權限,以及命名規範,請參閱「建立服務帳戶」。建立服務帳戶後,您可能需要等待 60 秒以上,才能使用該服務帳戶。這種行為的發生,是因為讀取作業最終會保持一致性,因此需要一段時間,新服務帳戶才會顯示。

記下服務帳戶專屬 ID

您需要服務帳戶專屬 ID,才能在 AWS 中設定角色。

  1. 前往 Trusted Cloud 控制台的「Service account」(服務帳戶) 詳細資料頁面。

    前往服務帳戶

  2. 按一下您剛建立的服務帳戶,或打算使用的服務帳戶。

  3. 在「服務帳戶詳細資料」頁面中,記下專屬 ID 編號。

    您需要這個 ID 才能在工作流程中設定 AWS 中的角色

為 Pub/Sub 服務帳戶新增服務帳戶憑證建立者角色

服務帳戶憑證建立者角色 (roles/iam.serviceAccountTokenCreator) 可讓主體為服務帳戶建立短期憑證。這些權杖或憑證可用於模擬服務帳戶。

如要進一步瞭解服務帳戶模擬功能,請參閱服務帳戶模擬功能

您也可以在這個程序中新增 Pub/Sub 發布者角色 (roles/pubsub.publisher)。如要進一步瞭解角色和新增原因,請參閱「將 Pub/Sub 發布者角色新增至 Pub/Sub 服務帳戶」。

  1. 前往 Trusted Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 按一下「包含 S3NS提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 的服務帳戶。

  4. 按一下這個服務帳戶的「編輯主體」按鈕。

  5. 視需要按一下「新增其他角色」

  6. 搜尋並點選「服務帳戶憑證建立者角色」 (roles/iam.serviceAccountTokenCreator)。

  7. 按一下 [儲存]

在 AWS 中建立政策

您需要在 AWS 中設定政策,讓 Pub/Sub 向 AWS 進行驗證,以便從 Amazon Kinesis Data Streams 擷取資料。

如要在 AWS 中建立政策,請按照下列步驟操作:

  1. 登入 AWS 管理主控台,並開啟 IAM 控制台

  2. IAM 的控制台導覽窗格中,依序點選「Access Management」(存取權管理) >「Policies」(政策)

  3. 按一下「建立政策」

  4. 在「Select a service」(選取服務) 中,按一下「Kinesis」

  5. 在「允許的動作」中,按一下下列項目:

    • List > ListShards

      這項動作會授予列出串流中分片的權限,並提供每個分片的相關資訊。

    • 「Read」(讀取) >「SubscribeToShard」(訂閱分片)SubscribeToShard

      這項動作會授予權限,可透過強化型扇出功能監聽特定分片。

    • 「Read」(讀取) >「DescribeStreamConsumer

      這項動作會授予權限,可取得已註冊串流消費者的說明。

    這些權限涵蓋從串流讀取資料。Pub/Sub 僅支援使用串流 SubscribeToShard API,透過強化型扇出功能從 Kinesis 串流讀取資料。

  6. 如要限制政策只能用於特定串流或消費者 (建議),請為資源指定消費者 ARN串流 ARN

  7. 按一下「新增更多權限」

  8. 在「Select a service」(選取服務) 中,按一下「STS」

  9. 在「允許的動作」中,依序點選「寫入」 >「AssumeRoleWithWebIdentity」AssumeRoleWithWebIdentity

    這項動作會授予權限,讓 Pub/Sub 透過身分同盟向 Amazon Kinesis Data Streams 進行驗證,並取得一組暫時安全憑證。

  10. 點選「下一步」

  11. 輸入政策名稱和說明。

  12. 按一下「建立政策」

使用自訂信任政策在 AWS 中建立角色

您必須在 AWS 中建立角色,Pub/Sub 才能向 AWS 驗證身分,從 Amazon Kinesis Data Streams 擷取資料。

  1. 登入 AWS 管理主控台,並開啟 IAM 控制台

  2. IAM 控制台的導覽窗格中,按一下「角色」

  3. 按一下「建立角色」

  4. 在「選取信任的實體」中,按一下「自訂信任政策」

  5. 在「Custom trust policy」(自訂信任政策) 部分,輸入或貼上下列內容:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID> 替換為您在「記錄服務帳戶專屬 ID」中記錄的服務帳戶專屬 ID。

  6. 點選「下一步」

  7. 在「新增權限」中,搜尋並點選您剛建立的自訂政策。

  8. 點選「下一步」

  9. 輸入角色名稱和說明。

  10. 按一下「建立角色」

將 Pub/Sub 發布者角色新增至 Pub/Sub 主體

如要啟用發布功能,您必須將發布者角色指派給 Pub/Sub 服務帳戶,Pub/Sub 才能發布至 Amazon Kinesis Data Streams 匯入主題。

為 Pub/Sub 服務帳戶新增 Pub/Sub 服務代理人角色

如要允許 Pub/Sub 使用匯入主題專案的發布配額,Pub/Sub 服務代理需要匯入主題專案的 serviceusage.services.use 權限。

如要提供這項權限,建議您將 Pub/Sub 服務代理人角色新增至 Pub/Sub 服務帳戶。

如果 Pub/Sub 服務帳戶沒有 Pub/Sub 服務代理人角色,可以按照下列步驟授予:

  1. 前往 Trusted Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 按一下「包含 S3NS提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 的服務帳戶。

  4. 按一下這個服務帳戶的「編輯主體」按鈕。

  5. 視需要按一下「新增其他角色」

  6. 搜尋並點選「Pub/Sub Service Agent role」(Pub/Sub 服務代理角色) (roles/pubsub.serviceAgent)。

  7. 按一下 [儲存]

允許從所有主題發布內容

如果您尚未建立任何 Amazon Kinesis Data Streams 匯入主題,請使用這個方法。

  1. 前往 Trusted Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 按一下「包含 S3NS提供的角色授予項目」核取方塊。

  3. 找出格式為 service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 的服務帳戶。

  4. 按一下這個服務帳戶的「編輯主體」按鈕。

  5. 視需要按一下「新增其他角色」

  6. 搜尋並按一下 Pub/Sub 發布者角色 (roles/pubsub.publisher)。

  7. 按一下 [儲存]

啟用從單一主題發布的功能

只有在 Amazon Kinesis Data Streams 匯入主題已存在時,才使用這個方法。

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics add-iam-policy-binding 指令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    更改下列內容:

    • TOPIC_ID:Amazon Kinesis Data Streams 匯入主題的主題 ID。

    • PROJECT_NUMBER:專案編號。如要查看專案編號,請參閱「識別專案」。

  3. 將服務帳戶使用者角色新增至服務帳戶

    服務帳戶使用者角色 (roles/iam.serviceAccountUser) 包含 iam.serviceAccounts.actAs 權限,可讓主體將服務帳戶附加至 Amazon Kinesis Data Streams 匯入主題的擷取設定,並將該服務帳戶用於同盟身分。

    1. 前往 Trusted Cloud 控制台的「IAM」頁面。

      前往身分與存取權管理頁面

    2. 針對發出建立或更新主題呼叫的主體,按一下「編輯主體」按鈕。

    3. 視需要按一下「新增其他角色」

    4. 搜尋並按一下「服務帳戶使用者角色」(roles/iam.serviceAccountUser)。

    5. 按一下 [儲存]

    使用 Amazon Kinesis Data Streams 主題

    您可以建立新的匯入主題,或編輯現有主題。

    注意事項

    • 即使快速連續建立主題和訂閱項目,也可能導致資料遺失。主題存在但沒有訂閱項目的時間很短。如果在這段時間內有任何資料傳送至主題,這些資料都會遺失。先建立主題,然後建立訂閱項目,再將主題轉換為匯入主題,即可確保匯入過程中不會遺漏任何訊息。

    建立 Amazon Kinesis Data Streams 匯入主題

    如要進一步瞭解與主題相關聯的屬性,請參閱「主題的屬性」。

    請確認你已完成下列程序:

    如要建立 Amazon Kinesis Data Streams 匯入主題,請按照下列步驟操作:

    控制台

    1. 前往 Trusted Cloud 控制台的「主題」頁面。

      前往「主題」

    2. 按一下「建立主題」

    3. 在「主題 ID」欄位中,輸入 Amazon Kinesis Data Streams 匯入主題的 ID。

      如要進一步瞭解如何命名主題,請參閱命名規範

    4. 選取「新增預設訂閱項目」

    5. 選取「啟用擷取功能」

    6. 選取「Amazon Kinesis Data Streams」做為擷取來源。

    7. 輸入下列詳細資訊:

      • Kinesis Stream ARN:您打算擷取至 Pub/Sub 的 Kinesis Data Stream ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

      • Kinesis Consumer ARN:已向 AWS Kinesis Data Stream 註冊的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}

      • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下: arn:aws:iam::${Account}:role/${RoleName}

      • 服務帳戶:您在「在 Trusted Cloud中建立服務帳戶」中建立的服務帳戶。

    8. 按一下「建立主題」

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 執行 gcloud pubsub topics create 指令:

      gcloud pubsub topics create TOPIC_ID \
          --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN \
          --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN \
          --kinesis-ingestion-role-arn KINESIS_ROLE_ARN \
          --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

      更改下列內容:

      • TOPIC_ID:主題 ID。
      • KINESIS_STREAM_ARN:您打算擷取至 Pub/Sub 的 Kinesis Data Streams ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}
      • KINESIS_CONSUMER_ARN:已向 AWS Kinesis Data Streams 註冊的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}
      • KINESIS_ROLE_ARN:AWS 角色的 ARN。角色的 ARN 格式如下: arn:aws:iam::${Account}:role/${RoleName}
      • PUBSUB_SERVICE_ACCOUNT:您在「在 Google Cloud 中建立服務帳戶」中建立的服務帳戶。
    3. C++

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string stream_arn, std::string consumer_arn,
         std::string aws_role_arn, std::string gcp_service_account) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto* aws_kinesis =
            request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
        aws_kinesis->set_stream_arn(stream_arn);
        aws_kinesis->set_consumer_arn(consumer_arn);
        aws_kinesis->set_aws_role_arn(aws_role_arn);
        aws_kinesis->set_gcp_service_account(gcp_service_account);
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

      import (
      	"context"
      	"fmt"
      	"io"
      
      	pubsub "cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      )
      
      func createTopicWithKinesisIngestion(w io.Writer, projectID, topic string) error {
      	// projectID := "my-project-id"
      	// topicID := "projects/my-project-id/topics/my-topic"
      	streamARN := "stream-arn"
      	consumerARN := "consumer-arn"
      	awsRoleARN := "aws-role-arn"
      	gcpServiceAccount := "gcp-service-account"
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	topicpb := &pubsubpb.Topic{
      		Name: topic,
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
      				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
      					StreamArn:         streamARN,
      					ConsumerArn:       consumerARN,
      					AwsRoleArn:        awsRoleARN,
      					GcpServiceAccount: gcpServiceAccount,
      				},
      			},
      		},
      	}
      	topicpb, err = client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("failed to create topic with kinesis: %w", err)
      	}
      	fmt.Fprintf(w, "Kinesis topic created: %v\n", topicpb)
      	return nil
      }
      

      Java

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      
      public class CreateTopicWithKinesisIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Kinesis ingestion settings.
          String streamArn = "stream-arn";
          String consumerArn = "consumer-arn";
          String awsRoleArn = "aws-role-arn";
          String gcpServiceAccount = "gcp-service-account";
      
          createTopicWithKinesisIngestionExample(
              projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
        }
      
        public static void createTopicWithKinesisIngestionExample(
            String projectId,
            String topicId,
            String streamArn,
            String consumerArn,
            String awsRoleArn,
            String gcpServiceAccount)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
      
            IngestionDataSourceSettings.AwsKinesis awsKinesis =
                IngestionDataSourceSettings.AwsKinesis.newBuilder()
                    .setStreamArn(streamArn)
                    .setConsumerArn(consumerArn)
                    .setAwsRoleArn(awsRoleArn)
                    .setGcpServiceAccount(gcpServiceAccount)
                    .build();
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      // const streamArn = 'arn:aws:kinesis:...';
      // const consumerArn = 'arn:aws:kinesis:...';
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithKinesisIngestion(
        topicNameOrId,
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      ) {
        // Creates a new topic with Kinesis ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsKinesis: {
              awsRoleArn,
              gcpServiceAccount,
              streamArn,
              consumerArn,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
      }

      Node.ts

      在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      // const streamArn = 'arn:aws:kinesis:...';
      // const consumerArn = 'arn:aws:kinesis:...';
      
      // Imports the Google Cloud client library
      import {PubSub} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithKinesisIngestion(
        topicNameOrId: string,
        awsRoleArn: string,
        gcpServiceAccount: string,
        streamArn: string,
        consumerArn: string,
      ) {
        // Creates a new topic with Kinesis ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsKinesis: {
              awsRoleArn,
              gcpServiceAccount,
              streamArn,
              consumerArn,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
      }

      Python

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

      from google.cloud import pubsub_v1
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # stream_arn = "your-stream-arn"
      # consumer_arn = "your-consumer-arn"
      # aws_role_arn = "your-aws-role-arn"
      # gcp_service_account = "your-gcp-service-account"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                  stream_arn=stream_arn,
                  consumer_arn=consumer_arn,
                  aws_role_arn=aws_role_arn,
                  gcp_service_account=gcp_service_account,
              )
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

    如要進一步瞭解 ARN,請參閱「Amazon 資源名稱 (ARN)」和「IAM 識別碼」。

    如果發生問題,請參閱「排解 Amazon Kinesis Data Streams 匯入主題的問題」。

    編輯 Amazon Kinesis Data Streams 匯入主題

    您可以編輯 Amazon Kinesis Data Streams 匯入主題的擷取資料來源設定。 請執行下列步驟:

    控制台

    1. 前往 Trusted Cloud 控制台的「主題」頁面。

      前往「主題」

    2. 按一下 Amazon Kinesis Data Streams 匯入主題。

    3. 在主題詳細資料頁面中,按一下「編輯」

    4. 更新要變更的欄位。

    5. 按一下「更新」

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 為避免遺失匯入主題的設定,請務必在每次更新主題時加入所有設定。如果省略任何項目,Pub/Sub 會將設定重設為原始預設值。

      使用下列範例中提及的所有旗標執行 gcloud pubsub topics update 指令:

      gcloud pubsub topics update TOPIC_ID 
      --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
      --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
      --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
      --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      更改下列內容:

      • TOPIC_ID 是主題 ID。這個欄位無法更新。

      • KINESIS_STREAM_ARN 是您打算擷取至 Pub/Sub 的 Kinesis Data Streams ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

      • KINESIS_CONSUMER_ARN 是註冊至 AWS Kinesis Data Streams 的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}

      • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

      • PUBSUB_SERVICE_ACCOUNT 是您在「在 Trusted Cloud中建立服務帳戶」中建立的服務帳戶。

    Amazon Kinesis Data Streams 匯入主題的配額和限制

    匯入主題的發布者輸送量會受限於主題的發布配額。詳情請參閱 Pub/Sub 配額與限制

    後續步驟

    Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。