トピックの種類を変更する

インポート トピックは標準トピックに、あるいは標準トピックをインポート トピックに変換できます。

インポート トピックを標準トピックに変換する

インポート トピックを標準トピックに変換するには、取り込み設定をクリアします。次の手順を行います。

コンソール

  1. コンソールで [**トピック**] ページに移動します。 Cloud de Confiance

    [トピック] に移動

  2. インポート トピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] チェックボックスをオフにします。

  5. [更新] をクリックします。

gcloud

  1. コンソールで Cloud Shell をアクティブにします。 Cloud de Confiance

    Cloud Shell をアクティブにする

    コンソールの下部にある Cloud de Confiance Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。

  2. gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID は、トピック ID に置き換えます。

標準トピックを Amazon Kinesis Data Streams インポート トピックに変換する

標準トピックを Amazon Kinesis Data Streams インポート トピックに変換するには、 まず、すべての 前提条件を満たしていることを確認します。

コンソール

  1. コンソールで [**トピック**] ページに移動します。 Cloud de Confiance

    [トピック] に移動

  2. インポート トピックに変換するトピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] オプションを選択します。

  5. 取り込みソースには、[Amazon Kinesis Data Streams] を選択します。

  6. 次の詳細情報を入力します。

    • Kinesis Stream ARN: Pub/Sub に取り込む予定の Kinesis Data Stream の ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}

    • Kinesis Consumer ARN: AWS Kinesis Data Stream に 登録されているコンシューマ リソースの ARN。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • AWS Role ARN: AWS ロールの ARN。ロールの ARN 形式は次のとおりです: arn:aws:iam::${Account}:role/${RoleName}

    • サービス アカウント: 作成したサービス アカウント。

  7. [更新] をクリックします。

gcloud

  1. コンソールで Cloud Shell をアクティブにします。 Cloud de Confiance

    Cloud Shell をアクティブにする

    コンソールの下部にある Cloud de Confiance Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。

  2. 次の サンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID 
    --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
    --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
    --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
    --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

    次のように置き換えます。

    • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

    • KINESIS_STREAM_ARN は、Pub/Sub に取り込む予定の Kinesis Data Streams の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

    • KINESIS_CONSUMER_ARN は、AWS Kinesis Data Streams に登録されているコンシューマ リソース の ARN です。ARN 形式は次のとおりです: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}

    • KINESIS_ROLE_ARN は AWS ロールの ARN です。ロールの ARN 形式は次のとおりです: arn:aws:iam::${Account}:role/${RoleName}

    • PUBSUB_SERVICE_ACCOUNT は、作成したサービス アカウントです。

Go

次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。v1 ライブラリをまだ使用している場合は、 v2 への移行ガイドをご覧ください。 v1 コードサンプルのリストについては、 非推奨のコードサンプルをご覧ください。

このサンプルを試す前に、 クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

func updateTopicType(w io.Writer, projectID, topic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	pbTopic := &pubsubpb.Topic{
		Name: topic,
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
					StreamArn:         streamARN,
					ConsumerArn:       consumerARN,
					AwsRoleArn:        awsRoleARN,
					GcpServiceAccount: gcpServiceAccount,
				},
			},
		},
	}
	updateReq := &pubsubpb.UpdateTopicRequest{
		Topic: pbTopic,
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"ingestion_data_source_settings"},
		},
	}
	topicCfg, err := client.TopicAdminClient.UpdateTopic(ctx, updateReq)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn,
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string,
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

ARN の詳細については、Amazon リソース名(ARN)IAM 識別子をご覧ください。

標準トピックを Cloud Storage インポート トピックに変換する

標準トピックを Cloud Storage インポート トピックに変換するには、 まず、すべての 前提条件を満たしていることを確認します。

コンソール

  1. コンソールで [**トピック**] ページに移動します。 Cloud de Confiance

    [トピック] に移動

  2. Cloud Storage インポート トピックに変換するトピックをクリックします。

  3. [トピックの詳細] ページで、[編集] をクリックします。

  4. [取り込みを有効にする] オプションを選択します。

  5. 取り込みソースには、[Google Cloud Storage] を選択します。

  6. Cloud Storage バケットの場合は、[ブラウズ] をクリックします。

    [バケットを選択] ページが開きます。次のオプションのいずれかを選択します。

    • 適切なプロジェクトから既存のバケットを選択します。

    • 作成アイコンをクリックし、画面の指示に沿って 新しいバケットを作成します。バケットを作成したら、Cloud Storage インポート トピックのバケット を選択します。

  7. バケットを指定すると、Pub/Sub は Pub/Sub サービス アカウントのバケットに対する 適切な権限を確認します。権限の問題がある場合は、権限に関連するエラー メッセージ が表示されます。

    権限の問題が発生した場合は、[権限を設定] をクリックします。詳細については、 Pub/Sub サービス アカウントに Cloud Storage 権限を付与するをご覧ください。

  8. [オブジェクト形式] で、[テキスト]、[Avro]、または [Pub/Sub Avro] を選択します。

    [テキスト] を選択した場合は、オブジェクトをメッセージに分割する [区切り文字] を指定することもできます。

    これらのオプションの詳細については、入力形式をご覧ください。

  9. 省略可。トピックの最短のオブジェクト作成時間 を指定できます。設定すると、最短のオブジェクト作成時間以降に作成されたオブジェクトのみが取り込まれます。

    詳細については、 最短のオブジェクト作成時間をご覧ください。

  10. glob パターン を指定する必要があります。バケット内のすべてのオブジェクトを取り込むには、 glob パターンとして**を使用します。指定されたパターンに一致するオブジェクトのみが取り込まれます。

    詳細については、 glob パターンに一致させるをご覧ください。

  11. 他のデフォルト設定はそのままにします。
  12. [トピックを更新] をクリックします。

gcloud

  1. コンソールで Cloud Shell をアクティブにします。 Cloud de Confiance

    Cloud Shell をアクティブにする

    コンソールの下部にある Cloud de Confiance Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。

  2. インポート トピックの設定が失われないように、トピックを更新するたびに すべての設定を含めてください。何かを省略すると、Pub/Sub は設定を元のデフォルト値にリセットします。

    次の サンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    次のように置き換えます。

    • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

    • BUCKET_NAME: 既存のバケットの名前を指定します。 例: prod_bucket。 バケット名にプロジェクト ID を含めてはいけません。 バケットを作成するには、バケットを作成するをご覧ください。

    • INPUT_FORMAT: 取り込まれるオブジェクトの形式を指定します。 textavropubsub_avro を指定できます。 これらのオプションの詳細については、 入力形式をご覧ください。

    • TEXT_DELIMITER:テキスト オブジェクトを Pub/Sub メッセージに分割する区切り文字を指定します。これは 1 文字にする必要があり、INPUT_FORMATtext の場合にのみ設定する必要があります。 デフォルトでは、改行文字(\n)に設定されます。

      gcloud CLI を使用して区切り文字を指定する場合は、改行 \n などの特殊文字の処理に注意してください。区切り文字が正しく解釈されるように、'\n' 形式を使用します。 引用符やエスケープなしで \n を使用すると、区切り文字は "n" になります。

    • MINIMUM_OBJECT_CREATE_TIME: オブジェクトが取り込まれるために作成された最小時間を指定します。これは、YYYY-MM-DDThh:mm:ssZ 形式の UTC で指定する必要があります。 例: 2024-10-14T08:30:30Z

      0001-01-01T00:00:00Z から 9999-12-31T23:59:59Z までの任意の日付(過去または未来)を指定できます。

    • MATCH_GLOB: オブジェクトを取り込むために一致させる glob パターンを 指定します。gcloud CLI を使用している場合、 * 文字を含む match glob では、* 文字をエスケープする必要があります(形式は \*\*.txt )。または、match glob 全体を引用符で囲む必要があります( "**.txt" または '**.txt')。glob パターンでサポートされている構文については、 Cloud Storage のドキュメントをご覧ください。