更改主题类型

您可以将导入主题转换为标准主题,也可以将标准主题转换为导入主题。

将导入的主题转换为标准主题

如需将导入主题转换为标准主题,请清除提取设置。执行以下步骤:

控制台

  1. 在 Trusted Cloud 控制台中,前往主题页面。

    打开“主题”

  2. 点击导入的主题。

  3. 在主题详情页面中,点击修改

  4. 清除启用提取选项。

  5. 点击更新

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics update 命令:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID 替换为主题 ID。

将标准主题转换为 Amazon Kinesis Data Streams 导入主题

如需将标准主题转换为 Amazon Kinesis Data Streams 导入主题,请先检查您是否满足所有前提条件

控制台

  1. 在 Trusted Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击要转换为导入主题的主题。

  3. 在主题详情页面中,点击修改

  4. 选择启用提取选项。

  5. 对于提取来源,请选择 Amazon Kinesis Data Streams

  6. 输入以下详细信息:

    • Kinesis 数据流 ARN:您计划注入到 Pub/Sub 中的 Kinesis 数据流的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis 使用方 ARN:已注册到 AWS Kinesis Data Stream 的使用方资源的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • 服务账号:您创建的服务账号。

  7. 点击更新

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics update 命令,并使用以下示例中提及的所有标志:

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    替换以下内容:

    • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

    • KINESIS_STREAM_ARN 是您计划注入到 Pub/Sub 中的 Kinesis Data Streams 的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是已向 AWS Kinesis Data Streams 注册的使用方资源的 ARN。ARN 格式如下: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。角色的 ARN 格式如下:arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您创建的服务账号。

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.ts

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

如需详细了解 ARN,请参阅 Amazon 资源名称 (ARN)IAM 标识符

将标准主题转换为 Cloud Storage 导入主题

如需将标准主题转换为 Cloud Storage 导入主题,请先检查您是否满足所有前提条件

控制台

  1. 在 Trusted Cloud 控制台中,前往主题页面。

    转到“主题”

  2. 点击要转换为 Cloud Storage 导入主题的主题。

  3. 在主题详情页面中,点击修改

  4. 选择启用提取选项。

  5. 对于提取源,请选择 Google Cloud Storage

  6. 对于 Cloud Storage 存储桶,请点击浏览

    系统会打开选择存储桶页面。从下列选项中选择一项:

    • 从任何合适的项目中选择现有存储桶。

    • 点击创建图标,然后按照屏幕上的说明创建新存储桶。创建存储桶后,选择 Cloud Storage 导入主题的存储桶。

  7. 指定存储桶后,Pub/Sub 会检查 Pub/Sub 服务账号是否对该存储桶拥有适当的权限。如果存在权限问题,您会看到与权限相关的错误消息。

    如果您遇到权限问题,请点击设置权限。如需了解详情,请参阅 向 Pub/Sub 服务账号授予 Cloud Storage 权限

  8. 对于对象格式,请选择文本AvroPub/Sub Avro

    如果您选择文本,可以选择性地指定用于将对象拆分为消息的分隔符

    如需详细了解这些选项,请参阅输入格式

  9. 可选。您可以为主题指定创建对象的最短时间。如果设置,则仅提取在最短对象创建时间之后创建的对象。

    如需了解详情,请参阅 创建对象的最短时间

  10. 您必须指定 Glob 模式。如需注入存储桶中的所有对象,请使用 ** 作为 glob 模式。仅注入与给定模式相匹配的对象。

    如需了解详情,请参阅 匹配 glob 模式

  11. 保留其他默认设置。
  12. 点击更新主题

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 为避免丢失导入主题的设置,请务必在每次更新主题时包含所有设置。如果您遗漏了某些内容,Pub/Sub 会将相应设置重置为原始默认值。

    运行 gcloud pubsub topics update 命令,并使用以下示例中提及的所有标志:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    替换以下内容:

    • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

    • BUCKET_NAME:指定现有存储桶的名称。 例如 prod_bucket。 存储桶名称不得包含项目 ID。 如需创建存储桶,请参阅创建存储桶

    • INPUT_FORMAT:指定所提取对象的格式。 可以是 textavropubsub_avro。 如需详细了解这些选项,请参阅输入格式

    • TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。 此值必须为单个字符,且仅当 INPUT_FORMATtext 时才必须设置此值。 默认值为换行符 (\n)。

      使用 gcloud CLI 指定分隔符时,请密切注意换行符 \n 等特殊字符的处理方式。使用 '\n' 格式可确保正确解读分隔符。如果直接使用 \n(不带引号或转义),则分隔符为 "n"

    • MINIMUM_OBJECT_CREATE_TIME:指定对象创建的最早时间,只有在此时间之后创建的对象才会被提取。 此值应采用世界协调时间 (UTC),格式为 YYYY-MM-DDThh:mm:ssZ。 例如 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z(含)之间的任何日期(过去或未来)均有效。

    • MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,如果匹配 glob 包含 * 字符,则必须将 * 字符格式化为转义字符(格式为 \*\*.txt),或者整个匹配 glob 必须用英文引号 "**.txt"'**.txt' 括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档