创建 Amazon Managed Streaming for Apache Kafka 导入主题

借助 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 导入主题,您可以持续从 Amazon MSK(作为外部来源)向 Pub/Sub 注入数据。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目的地。

本文档介绍了如何创建和管理 Amazon MSK 导入主题。如需创建标准主题,请参阅创建标准主题

如需详细了解导入主题,请参阅关于导入主题

准备工作

所需的角色和权限

如需获得创建和管理 Amazon MSK 导入主题所需的权限,请让您的管理员为您授予主题或项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建和管理 Amazon MSK 导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建和管理 Amazon MSK 导入主题,需要具备以下权限:

  • 创建导入主题: pubsub.topics.create
  • 删除导入主题: pubsub.topics.delete
  • 获取导入主题: pubsub.topics.get
  • 列出导入主题: pubsub.topics.list
  • 发布到导入主题: pubsub.topics.publish and pubsub.serviceAgent
  • 更新导入主题: pubsub.topics.update
  • 获取导入主题的 IAM 政策: pubsub.topics.getIamPolicy
  • 为导入主题配置 IAM 政策 pubsub.topics.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级层和个别资源级层配置访问权限控制。

设置联合身份以访问 Amazon MSK

借助工作负载身份联合, Trusted Cloud 服务可以访问在 Trusted Cloud外部运行的工作负载。借助身份联合,您无需维护凭据或将凭据传递给 Trusted Cloud ,即可访问其他云中的资源。您可以改为使用工作负载本身的身份向 Trusted Cloud 进行身份验证并访问资源。

在 Trusted Cloud中创建服务账号

这是一个可选步骤。 如果您已有服务账号,则可以在此过程中使用该账号,而不必创建新的服务账号。 如果您使用的是现有服务账号,请前往记录服务账号的唯一 ID 以进行下一步。

对于 Amazon MSK 导入主题,Pub/Sub 使用服务账号作为身份来访问 AWS 中的资源。

如需详细了解如何创建服务账号,包括前提条件、所需角色和权限以及命名指南,请参阅创建服务账号。创建服务账号后,您可能需要等待 60 秒或更长时间才能使用该服务账号。出现这种行为的原因是读取操作是最终一致的;新服务账号可能需要一段时间才能显示。

记录服务账号唯一 ID

您需要服务账号唯一 ID 才能在 AWS 控制台中设置角色。

  1. 在 Trusted Cloud 控制台中,前往服务账号详情页面。

    前往服务账号

  2. 点击您刚刚创建的服务账号或计划使用的服务账号。

  3. 服务账号详情页面中,记录唯一 ID 编号。

    您需要在工作流程中使用该 ID,以便在 AWS 控制台中设置角色

向 Pub/Sub 服务账号添加服务账号令牌创建者角色

借助 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator),主账号可以为服务账号创建短期有效凭据。这些令牌或凭据用于模拟服务账号。

如需详细了解服务账号模拟,请参阅服务账号模拟

您还可以在此过程中添加 Pub/Sub 发布者角色 (roles/pubsub.publisher)。如需详细了解该角色以及您添加该角色的原因,请参阅向 Pub/Sub 服务账号添加 Pub/Sub 发布者角色

  1. 在 Trusted Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 点击包括 S3NS提供的角色授权复选框。

  3. 查找格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 的服务账号。

  4. 对于此服务账号,点击修改主账号按钮。

  5. 如果需要,请点击添加其他角色

  6. 搜索并点击 Service Account Token Creator 角色 (roles/iam.serviceAccountTokenCreator)。

  7. 点击保存

在 AWS 中创建政策

您需要在 AWS 中添加一项政策,以允许 Pub/Sub 向 AWS 进行身份验证,以便 Pub/Sub 可以从 Amazon MSK 中注入数据。

  • 如需了解更多方法以及如何在 AWS 中创建政策,请参阅创建 IAM 政策

如需在 AWS 中创建政策,请执行以下步骤:

  1. 登录 AWS 管理控制台并打开 IAM 控制台

  2. IAM 控制台的导航窗格中,依次点击访问权限管理 > 政策

  3. 点击创建政策

  4. 对于点击服务,请点击 MSK

  5. 对于允许的操作,依次点击读取 > GetBootstrapBrokers

    此操作授予了获取 Pub/Sub 用于连接到 MSK 集群的初始启动代理的权限。

  6. 点击添加更多权限

  7. 选择服务中,点击 Apache Kafka API for MSK

  8. 对于允许的操作,请选择以下内容:

    • 列表 > DescribeTopic

      此操作会授予权限,以允许 Pub/Sub 提取主题获取有关 Amazon MSK Kafka 主题的详细信息。

    • 读取 > ReadData

      此操作会授予从 Amazon MSK Kafka 主题读取数据的权限。

    • 撰写 > 关联

      此操作会授予连接到 Amazon MSK Kafka 集群并对其进行身份验证的权限。

  9. 对于资源,请指定集群 ARN(如果您想将政策限制为仅适用于特定集群,建议这样做)。

  10. 点击添加更多权限

  11. 选择服务中,点击 STS

  12. 对于允许的操作,依次点击写入 > AssumeRoleWithWebIdentity

    此操作会授予权限,以获取一组临时安全凭据,供 Pub/Sub 通过身份联合向 Amazon MSK 进行身份验证。

  13. 点击下一步

  14. 输入政策名称和说明。

  15. 点击创建政策

使用自定义信任政策在 AWS 中创建角色

您必须在 AWS 中创建一个角色,以便 Pub/Sub 可以向 AWS 进行身份验证,从而从 Amazon MSK 中注入数据。

  1. 登录 AWS 管理控制台并打开 IAM 控制台

  2. IAM 控制台的导航窗格中,点击 Roles

  3. 点击 Create role

  4. 对于选择可信实体,点击自定义信任政策

  5. 自定义信任政策部分中,输入或粘贴以下内容:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    <SERVICE_ACCOUNT_UNIQUE_ID> 替换为您在记录服务账号的唯一 ID 中记录的服务账号的唯一 ID。

  6. 点击下一步

  7. 对于添加权限,请搜索并点击您刚刚创建的自定义政策。

  8. 点击下一步

  9. 输入角色名称和说明。

  10. 点击 Create role

向 Pub/Sub 主账号添加 Pub/Sub 发布者角色

如需启用发布功能,您必须向 Pub/Sub 服务账号分配发布者角色,以便 Pub/Sub 能够向 Amazon MSK 导入主题发布内容。

向 Pub/Sub 服务账号添加 Pub/Sub 服务代理角色

为了允许 Pub/Sub 使用导入主题项目的发布配额,Pub/Sub 服务代理需要对导入主题项目拥有 serviceusage.services.use 权限。

如需提供此权限,建议您向 Pub/Sub 服务账号添加 Pub/Sub 服务代理角色。

如果 Pub/Sub 服务账号没有 Pub/Sub 服务代理角色,可以按如下方式授予:

  1. 在 Trusted Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 点击包括 S3NS提供的角色授权复选框。

  3. 查找格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 的服务账号。

  4. 对于此服务账号,点击修改主账号按钮。

  5. 如果需要,请点击添加其他角色

  6. 搜索并点击 Pub/Sub 服务代理角色 (roles/pubsub.serviceAgent)。

  7. 点击保存

允许发布所有主题的内容

如果您尚未创建任何 Amazon MSK 导入主题,请使用此方法。

  1. 在 Trusted Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 点击包括 S3NS提供的角色授权复选框。

  3. 查找格式为 service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com 的服务账号。

  4. 对于此服务账号,点击修改主账号按钮。

  5. 如果需要,请点击添加其他角色

  6. 搜索并点击 Pub/Sub 发布者角色 (roles/pubsub.publisher)。

  7. 点击保存

启用从单个主题发布内容

仅当 Amazon MSK 导入主题已存在时,才使用此方法。

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics add-iam-policy-binding 命令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    替换以下内容:

    • TOPIC_ID:Amazon MSK 导入主题的 ID。

    • PROJECT_NUMBER:项目编号。如需查看项目编号,请参阅标识项目

  3. 向服务账号添加服务账号用户角色

    Service Account User 角色 (roles/iam.serviceAccountUser) 包含 iam.serviceAccounts.actAs 权限,该权限允许主账号将服务账号附加到 Amazon MSK 导入主题的提取设置,并使用该服务账号进行联合身份验证。

    1. 在 Trusted Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 对于发出创建或更新主题调用请求的主账号,请点击修改主账号按钮。

    3. 如果需要,请点击添加其他角色

    4. 搜索并点击 Service account user 角色 (roles/iam.serviceAccountUser)。

    5. 点击保存

    使用 Amazon MSK 导入主题

    您可以创建新的导入主题,也可以修改现有主题。

    注意事项

    • 即使快速连续地创建主题和订阅,也可能会导致数据丢失。在订阅期结束后的短时间内,主题仍可使用。如果在此期间向主题发送任何数据,这些数据都会丢失。通过先创建主题、创建订阅,然后将主题转换为导入主题,您可以确保在导入过程中不会遗漏任何消息。

    • 如果您需要重新创建现有导入主题的 Kafka 主题(使用相同名称),则无法删除 Kafka 主题并重新创建它。此操作可能会使 Pub/Sub 的偏移管理失效,从而导致数据丢失。如需缓解此问题,请按以下步骤操作:

      • 删除 Pub/Sub 导入主题。
      • 删除 Kafka 主题。
      • 创建 Kafka 主题。
      • 创建 Pub/Sub 导入主题。
    • 来自 Amazon MSK Kafka 主题的数据始终从最早的偏移量开始读取。

    创建 Amazon MSK 导入主题

    如需详细了解与主题关联的属性,请参阅主题的属性

    确保您已完成以下步骤:

    如需创建 Amazon MSK 导入主题,请按以下步骤操作:

    控制台

    1. 在 Trusted Cloud 控制台中,前往主题页面。

      打开“主题”

    2. 点击创建主题

    3. 主题 ID 字段中,输入 Amazon MSK 导入主题的 ID。如需详细了解如何命名主题,请参阅命名指南

    4. 选择添加默认订阅

    5. 选择启用注入

    6. 对于提取来源,请选择 Amazon MSK

    7. 输入以下详细信息:

      • 集群 ARN:您要将数据提取到 Pub/Sub 中的 Amazon MSK 的 ARN。ARN 格式如下:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
      • 主题:您要提取到 Pub/Sub 中的 Amazon MSK Kafka 主题的名称。
      • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}
      • 服务账号:您在在 Trusted Cloud中创建服务账号中创建的服务账号。
    8. 点击创建主题

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 运行 gcloud pubsub topics create 命令:

      gcloud pubsub topics create TOPIC_ID \
            --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \
            --aws-msk-ingestion-topic MSK_TOPIC \
            --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \
            --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

      替换以下内容:

      • TOPIC_ID:Pub/Sub 主题的名称或 ID。
      • MSK_CLUSTER_ARN:您要将数据从 Amazon MSK 集群提取到 Pub/Sub 中的 Amazon MSK 集群的 ARN。ARN 格式如下:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
      • MSK_TOPIC:您要提取到 Pub/Sub 中的 Amazon MSK Kafka 主题的名称。
      • MSK_ROLE_ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}
      • PUBSUB_SERVICE_ACCOUNT:您在在 Google Cloud 中创建服务账号中创建的服务账号。
    3. C++

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string const& cluster_arn,
         std::string const& msk_topic, std::string const& aws_role_arn,
         std::string const& gcp_service_account) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto* aws_msk =
            request.mutable_ingestion_data_source_settings()->mutable_aws_msk();
        aws_msk->set_cluster_arn(cluster_arn);
        aws_msk->set_topic(msk_topic);
        aws_msk->set_aws_role_arn(aws_role_arn);
        aws_msk->set_gcp_service_account(gcp_service_account);
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

      import (
      	"context"
      	"fmt"
      	"io"
      
      	"cloud.google.com/go/pubsub"
      )
      
      func createTopicWithAWSMSKIngestion(w io.Writer, projectID, topicID, clusterARN, mskTopic, awsRoleARN, gcpSA string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      
      	// // AWS MSK ingestion settings.
      	// clusterARN := "cluster-arn"
      	// mskTopic := "msk-topic"
      	// awsRoleARN := "aws-role-arn"
      	// gcpSA := "gcp-service-account"
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	cfg := &pubsub.TopicConfig{
      		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
      			Source: &pubsub.IngestionDataSourceAmazonMSK{
      				ClusterARN:        clusterARN,
      				Topic:             mskTopic,
      				AWSRoleARN:        awsRoleARN,
      				GCPServiceAccount: gcpSA,
      			},
      		},
      	}
      	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Created topic with AWS MSK ingestion settings: %v\n", t)
      	return nil
      }
      

      Java

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      
      public class CreateTopicWithAwsMskIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // AWS MSK ingestion settings.
          String clusterArn = "cluster-arn";
          String mskTopic = "msk-topic";
          String awsRoleArn = "aws-role-arn";
          String gcpServiceAccount = "gcp-service-account";
      
          createTopicWithAwsMskIngestionExample(
              projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount);
        }
      
        public static void createTopicWithAwsMskIngestionExample(
            String projectId,
            String topicId,
            String clusterArn,
            String mskTopic,
            String awsRoleArn,
            String gcpServiceAccount)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
      
            IngestionDataSourceSettings.AwsMsk awsMsk =
                IngestionDataSourceSettings.AwsMsk.newBuilder()
                    .setClusterArn(clusterArn)
                    .setTopic(mskTopic)
                    .setAwsRoleArn(awsRoleArn)
                    .setGcpServiceAccount(gcpServiceAccount)
                    .build();
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const clusterArn = 'arn:aws:kafka:...';
      // const mskTopic = 'YOUR_MSK_TOPIC';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithAwsMskIngestion(
        topicNameOrId,
        clusterArn,
        mskTopic,
        awsRoleArn,
        gcpServiceAccount,
      ) {
        // Creates a new topic with AWS MSK ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsMsk: {
              clusterArn,
              topic: mskTopic,
              awsRoleArn,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
      }

      Node.ts

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const clusterArn = 'arn:aws:kafka:...';
      // const mskTopic = 'YOUR_MSK_TOPIC';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      import {PubSub} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithAwsMskIngestion(
        topicNameOrId: string,
        clusterArn: string,
        mskTopic: string,
        awsRoleArn: string,
        gcpServiceAccount: string,
      ) {
        // Creates a new topic with AWS MSK ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsMsk: {
              clusterArn,
              topic: mskTopic,
              awsRoleArn,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS MSK ingestion.`);
      }

      Python

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

      from google.cloud import pubsub_v1
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # cluster_arn = "your-cluster-arn"
      # msk_topic = "your-msk-topic"
      # aws_role_arn = "your-aws-role-arn"
      # gcp_service_account = "your-gcp-service-account"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              aws_msk=IngestionDataSourceSettings.AwsMsk(
                  cluster_arn=cluster_arn,
                  topic=msk_topic,
                  aws_role_arn=aws_role_arn,
                  gcp_service_account=gcp_service_account,
              )
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with AWS MSK Ingestion Settings")

    如需详细了解 ARN,请参阅 Amazon 资源名称 (ARN)IAM 标识符

    如果您遇到问题,请参阅排查 Amazon MSK 导入主题方面的问题

    修改 Amazon MSK 导入主题

    如需修改 Amazon MSK 导入主题的提取数据源设置,请按以下步骤操作:

    控制台

    1. 在 Trusted Cloud 控制台中,前往主题页面。

      转到“主题”

    2. 点击 Amazon MSK 导入主题。

    3. 在主题详情页面中,点击修改

    4. 更新您要更改的字段。

    5. 点击更新

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 运行 gcloud pubsub topics update 命令,并使用以下示例中提及的所有标志:

      gcloud pubsub topics update TOPIC_ID \
          --aws-msk-ingestion-cluster-arn MSK_CLUSTER_ARN \
          --aws-msk-ingestion-topic MSK_TOPIC \
          --aws-msk-ingestion-aws-role-arn MSK_ROLE_ARN \
          --aws-msk-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      替换以下内容:

      • TOPIC_ID:您的 Pub/Sub 主题的名称或 ID。
      • MSK_CLUSTER_ARN:您要将数据从 Amazon MSK 集群提取到 Pub/Sub 中的 Amazon MSK 集群的 ARN。ARN 格式如下:arn:aws:kafka:${Region}:${Account}:cluster/${ClusterName}/${ClusterId}
      • MSK_TOPIC:您要提取到 Pub/Sub 中的 Amazon MSK Kafka 主题的名称。
      • MSK_ROLE_ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}
      • PUBSUB_SERVICE_ACCOUNT:您在 Google Cloud 中创建服务账号中创建的服务账号。

    配额和限制

    导入主题的发布者吞吐量受主题的发布配额限制。如需了解详情,请参阅 Pub/Sub 配额和限制

    后续步骤

    Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。