變更主題類型

您可以將匯入主題轉換為標準主題,反之亦然。

將匯入的主題轉換為標準主題

如要將匯入主題轉換為標準主題,請清除擷取設定。請執行下列步驟:

控制台

  1. 前往 Trusted Cloud 控制台的「主題」頁面。

    前往「主題」

  2. 按一下要匯入的主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 清除「啟用擷取」選項。

  5. 按一下「更新」

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics update 指令:

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID 替換為主題 ID。

將標準主題轉換為 Amazon Kinesis Data Streams 匯入主題

如要將標準主題轉換為 Amazon Kinesis Data Streams 匯入主題,請先確認您符合所有必要條件

控制台

  1. 前往 Trusted Cloud 控制台的「主題」頁面。

    前往「主題」

  2. 按一下要轉換為匯入主題的主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 選取「啟用擷取」選項。

  5. 選取「Amazon Kinesis Data Streams」做為擷取來源。

  6. 輸入下列詳細資訊:

    • Kinesis Stream ARN:您打算擷取至 Pub/Sub 的 Kinesis Data Stream ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis Consumer ARN:已向 AWS Kinesis Data Stream 註冊的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS 角色 ARN:AWS 角色的 ARN。角色的 ARN 格式如下: arn:aws:iam::${Account}:role/${RoleName}

    • 服務帳戶:您建立的服務帳戶。

  7. 按一下「更新」

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用下列範例中提及的所有旗標執行 gcloud pubsub topics update 指令:

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    更改下列內容:

    • TOPIC_ID 是主題 ID 或名稱。這個欄位無法更新。

    • KINESIS_STREAM_ARN 是您打算擷取至 Pub/Sub 的 Kinesis Data Streams ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • KINESIS_CONSUMER_ARN 是註冊至 AWS Kinesis Data Streams 的消費者資源 ARN。ARN 格式如下:arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN 是 AWS 角色的 ARN。角色的 ARN 格式如下: arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT 是您建立的服務帳戶。

Go

以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.ts

在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

如要進一步瞭解 ARN,請參閱「Amazon 資源名稱 (ARN)」和「IAM 識別碼」。

將標準主題轉換為 Cloud Storage 匯入主題

如要將標準主題轉換為 Cloud Storage 匯入主題,請先確認您符合所有必要條件

控制台

  1. 前往 Trusted Cloud 控制台的「主題」頁面。

    前往「主題」

  2. 按一下要轉換為 Cloud Storage 匯入主題的主題。

  3. 在主題詳細資料頁面中,按一下「編輯」

  4. 選取「啟用擷取」選項。

  5. 選取「Google Cloud Storage」做為擷取來源。

  6. 按一下 Cloud Storage bucket 的「Browse」(瀏覽)

    「Select bucket」(選取值區) 頁面隨即開啟。選取下列選項之一:

    • 從任何適當的專案中選取現有 bucket。

    • 按一下建立圖示,然後按照畫面上的指示建立新的值區。建立 bucket 後,請選取 Cloud Storage 匯入主題的 bucket。

  7. 指定 bucket 時,Pub/Sub 會檢查 Pub/Sub 服務帳戶是否具備 bucket 的適當權限。如果發生權限問題,系統會顯示相關錯誤訊息。

    如果遇到權限問題,請按一下「設定權限」。詳情請參閱 授予 Pub/Sub 服務帳戶 Cloud Storage 權限

  8. 在「物件格式」中,選取「文字」、「Avro」或「Pub/Sub Avro」

    如果選取「文字」,可以視需要指定「分隔符號」,將物件分割成訊息。

    如要進一步瞭解這些選項,請參閱「輸入格式」。

  9. (選用步驟) 您可以為主題指定「物件建立時間下限」。如果設定此值,系統只會擷取在物件建立時間下限之後建立的物件。

    詳情請參閱「 物件建立時間下限」。

  10. 您必須指定 Glob 模式。如要擷取 bucket 中的所有物件,請使用 ** 做為 glob 模式。系統只會擷取符合指定模式的物件。

    詳情請參閱 比對 glob 模式

  11. 保留其他預設設定。
  12. 按一下「更新主題」

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 為避免遺失匯入主題的設定,請務必在每次更新主題時加入所有設定。如果省略任何項目,Pub/Sub 會將設定重設為原始預設值。

    使用下列範例中提及的所有旗標執行 gcloud pubsub topics update 指令:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    更改下列內容:

    • TOPIC_ID 是主題 ID 或名稱。這個欄位無法更新。

    • BUCKET_NAME:指定現有值區的名稱。 例如:prod_bucket。值區名稱不得包含專案 ID。 如要建立 bucket,請參閱「建立 bucket」一文。

    • INPUT_FORMAT:指定擷取物件的格式。這可以是 textavropubsub_avro。如要進一步瞭解這些選項,請參閱「輸入格式」。

    • TEXT_DELIMITER:指定用於將文字物件分割為 Pub/Sub 訊息的分隔符。這必須是單一字元,且只能在 INPUT_FORMATtext 時設定。預設為換行字元 (\n)。

      使用 gcloud CLI 指定分隔符號時,請特別注意換行 \n 等特殊字元的處理方式。請使用 '\n' 格式,確保系統正確解讀分隔符。如果直接使用 \n (不加引號或逸出),分隔符號會變成 "n"

    • MINIMUM_OBJECT_CREATE_TIME:指定物件建立時間下限,只有在此時間之後建立的物件才會擷取。格式應為世界標準時間 YYYY-MM-DDThh:mm:ssZ。 例如:2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z (含) 的任何日期,無論是過去或未來,都有效。

    • MATCH_GLOB:指定要比對的 glob 模式,物件必須符合該模式才能擷取。使用 gcloud CLI 時,含有 * 字元的相符 glob 必須以逸出形式 \*\*.txt 格式化 * 字元,或整個相符 glob 必須以引號 "**.txt"'**.txt' 括住。如要瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件