Cloud Storage インポート トピックを作成する

Cloud Storage インポート トピックを使用すると、Cloud Storage から Pub/Sub にデータを継続的に取り込むことができます。その後、Pub/Sub がサポートしている任意の宛先にデータをストリーミングできます。Pub/Sub は、Cloud Storage バケットに追加された新しいオブジェクトを自動的に検出し、取り込みます。

Cloud Storage は、Trusted Cloud by S3NSでオブジェクトを保存するためのサービスです。オブジェクトとは、任意の形式のファイルで構成される不変のデータのことです。オブジェクトはバケットと呼ばれるコンテナに保存されます。バケットにマネージド フォルダを含めることもできます。このフォルダを使用すると、共有の名前接頭辞を持つオブジェクトのグループにアクセスできます。

Cloud Storage の詳細については、Cloud Storage のドキュメントをご覧ください。

インポート トピックの詳細については、インポート トピックについてをご覧ください。

始める前に

必要なロールと権限

Cloud Storage インポート トピックの作成と管理に必要な権限を取得するには、トピックまたはプロジェクトに対する Pub/Sub 編集者 roles/pubsub.editor)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

この事前定義ロールには、Cloud Storage インポート トピックの作成と管理に必要な権限が含まれています。必要とされる正確な権限については、「必要な権限」セクションを開いてご確認ください。

必要な権限

Cloud Storage インポート トピックの作成と管理には、次の権限が必要です。

  • インポート トピックを作成する: pubsub.topics.create
  • インポート トピックを削除する: pubsub.topics.delete
  • インポート トピックを取得する: pubsub.topics.get
  • インポート トピックを一覧表示する: pubsub.topics.list
  • インポート トピックに公開する: pubsub.topics.publish
  • インポート トピックを更新する: pubsub.topics.update
  • インポート トピックの IAM ポリシーを取得する: pubsub.topics.getIamPolicy
  • インポート トピックの IAM ポリシーを構成する: pubsub.topics.setIamPolicy

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

アクセス制御は、プロジェクト レベルと個々のリソースレベルで構成できます。

メッセージ ストレージ ポリシーがバケットのロケーションに準拠している

Pub/Sub トピックのメッセージ ストレージ ポリシーは、Cloud Storage バケットが配置されているリージョンと重複している必要があります。このポリシーは、Pub/Sub がメッセージ データを保存できる場所を指定します。

  • ロケーション タイプがリージョンのバケットの場合: ポリシーにその特定のリージョンを含める必要があります。たとえば、バケットが us-central1 リージョンにある場合、メッセージ ストレージ ポリシーにも us-central1 を含める必要があります。

  • ロケーション タイプがデュアルリージョンまたはマルチリージョンのバケットの場合: ポリシーには、デュアルリージョンまたはマルチリージョン ロケーション内のリージョンを少なくとも 1 つ含める必要があります。たとえば、バケットが US multi-region にある場合、メッセージ ストレージ ポリシーには us-central1us-east1、または US multi-region 内の他のリージョンを含めることができます。

    ポリシーにバケットのリージョンが含まれていない場合、トピックの作成は失敗します。たとえば、バケットが europe-west1 にあり、メッセージ ストレージ ポリシーに asia-east1 のみが含まれている場合、エラーが発生します。

    メッセージ ストレージ ポリシーにバケットのロケーションと重複するリージョンが 1 つだけ含まれている場合、マルチリージョン冗長性が損なわれる可能性があります。これは、単一リージョンが使用できなくなると、データにアクセスできなくなる可能性があるためです。完全な冗長性を確保するには、バケットのマルチリージョンまたはデュアルリージョン ロケーションの一部であるリージョンを、メッセージ ストレージ ポリシーに少なくとも 2 つ含めることをおすすめします。

バケットのロケーションの詳細については、ドキュメントをご覧ください。

公開を有効にする

公開を有効にするには、Pub/Sub が Cloud Storage インポート トピックに公開できるように、Pub/Sub サービス アカウントに Pub/Sub パブリッシャー ロールを割り当てる必要があります。

すべての Cloud Storage インポート トピックへのパブリッシュを有効にする

プロジェクトで使用可能な Cloud Storage インポート トピックがない場合は、このオプションを選択します。

  1. Trusted Cloud コンソールで、[IAM] ページに移動します。

    IAM に移動

  2. [ S3NS提供のロール付与を含める] チェックボックスをオンにします。

  3. 次の形式の Pub/Sub サービス アカウントを探します。

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

  4. このサービス アカウントで [プリンシパルを編集] ボタンをクリックします。

  5. 必要に応じて、[別のロールを追加] をクリックします。

  6. Pub/Sub パブリッシャーのロールroles/pubsub.publisher)を検索して選択します。

  7. [保存] をクリックします。

単一の Cloud Storage インポート トピックへのパブリッシュを有効にする

すでに存在する特定の Cloud Storage インポート トピックに公開する権限を Pub/Sub に付与するには、次の操作を行います。

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics add-iam-policy-binding コマンドを実行します。

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    次のように置き換えます。

    • TOPIC_ID は、Cloud Storage インポート トピックの ID または名前です。

    • [PROJECT_NUMBER] はプロジェクト番号です。 プロジェクト番号を表示するには、プロジェクトを特定するをご覧ください。

  3. Pub/Sub サービス アカウントに Cloud Storage のロールを割り当てる

    Cloud Storage インポート トピックを作成するには、Pub/Sub サービス アカウントに、特定の Cloud Storage バケットからの読み取り権限が付与されている必要があります。次の権限が必要です。

    • storage.objects.list
    • storage.objects.get
    • storage.buckets.get

    これらの権限を Pub/Sub サービス アカウントに割り当てるには、次のいずれかの手順を選択します。

    • バケット レベルで権限を付与します。特定の Cloud Storage バケットでは、Storage Legacy Object Reader(roles/storage.legacyObjectReader)ロールと Storage Legacy Bucket Reader(roles/storage.legacyBucketReader)ロールを Pub/Sub サービス アカウントに付与します。

    • プロジェクト レベルでロールを付与する必要がある場合は、代わりに Cloud Storage バケットを含むプロジェクトでストレージ管理者(roles/storage.admin)のロールを付与できます。Pub/Sub サービス アカウントに このロールを付与します。

    バケットの権限

    次の手順を行い、バケットレベルで Storage Legacy Object Reader(roles/storage.legacyObjectReader)のロールと Storage Legacy Bucket Reader(roles/storage.legacyBucketReader)のロールを Pub/Sub サービス アカウントに付与します。

    1. Trusted Cloud コンソールで、[Cloud Storage] ページに移動します。

      [Cloud Storage] に移動

    2. メッセージを読み取って Cloud Storage インポート トピックにインポートする Cloud Storage バケットをクリックします。

      [バケットの詳細] ページが開きます。

    3. [バケットの詳細] ページで、[権限] タブをクリックします。

    4. [権限] > [プリンシパルによる表示] タブで、[アクセス権を付与] をクリックします。

      [アクセス権を付与] ページが開きます。

    5. [プリンシパルを追加] セクションに、Pub/Sub サービス アカウントの名前を入力します。

      サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com です。 たとえば、PROJECT_NUMBER=112233445566 のプロジェクトの場合、サービス アカウントの形式は service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com です。

    6. [ロールを割り当て] > [ロールを選択] プルダウンで、「Object Reader」と入力し、[Storage Legacy オブジェクト閲覧者] ロールを選択します。

    7. [別のロールを追加] をクリックします。

    8. [ロールを選択] プルダウンで、Bucket Reader を入力し、[Storage Legacy Bucket Reader] のロールを選択します。

    9. [保存] をクリックします。

    プロジェクトの権限

    次の手順を行って、プロジェクト レベルでストレージ管理者(roles/storage.admin)ロールを付与します。

    1. Trusted Cloud コンソールで、[IAM] ページに移動します。

      IAM に移動

    2. [権限] > [プリンシパルによる表示] タブで、[アクセス権を付与] をクリックします。

      [アクセス権を付与] ページが開きます。

    3. [プリンシパルを追加] セクションに、Pub/Sub サービス アカウントの名前を入力します。

      サービス アカウントの形式は service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com です。 たとえば、PROJECT_NUMBER=112233445566 のプロジェクトの場合、サービス アカウントの形式は service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com です。

    4. [ロールを割り当て] > [ロールを選択] プルダウンで、「Storage Admin」と入力し、[ストレージ管理者] ロールを選択します。

    5. [保存] をクリックします。

    Cloud Storage IAM の詳細については、Cloud Storage Identity and Access Management をご覧ください。

    Cloud Storage インポート トピックのプロパティ

    すべてのトピックに共通するプロパティの詳細については、トピックのプロパティをご覧ください。

    バケット名

    これは、Cloud Storage インポート トピックに公開されたデータを Pub/Sub が読み取る Cloud Storage バケットの名前です。

    入力形式

    Cloud Storage インポート トピックを作成するときに、取り込むオブジェクトの形式を テキストAvro、または Pub/Sub Avro として指定できます。

    • テキスト。オブジェクトは書式なしテキストのデータを含むと想定されます。この入力形式では、オブジェクトが最短のオブジェクト作成時間を満たし、glob パターンの条件に一致する限り、バケット内のすべてのオブジェクトを取り込もうとします。

      区切り文字。オブジェクトをメッセージに分割する区切り文字を指定することもできます。設定しない場合、デフォルトで改行文字(\n)になります。区切り文字は 1 文字のみにする必要があります。

    • Avro。オブジェクトは Apache Avro バイナリ形式です。有効な Apache Avro 形式ではないオブジェクトは取り込まれません。Avro に関する制限事項は次のとおりです。

      • Avro バージョン 1.1.0 と 1.2.0 はサポートされていません。
      • Avro ブロックの最大サイズは 16 MB です。
    • Pub/Sub Avro。オブジェクトは Apache Avro バイナリ形式で、Pub/Sub の Avro ファイル形式Cloud Storage サブスクリプションを使用して Cloud Storage に書き込まれたオブジェクトのスキーマと一致します。Pub/Sub Avro の重要なガイドラインは次のとおりです。

      • Avro レコードのデータ フィールドは、生成された Pub/Sub メッセージのデータ フィールドに入力するために使用されます。

      • Cloud Storage サブスクリプションに write_metadata オプションが指定されている場合、属性フィールドの値は、生成された Pub/Sub メッセージの属性として入力されます。

      • Cloud Storage に書き込まれた元のメッセージで順序指定キーが指定されている場合、このフィールドは、生成された Pub/Sub メッセージで original_message_ordering_key という名前の属性として設定されます。

    最短のオブジェクト作成時間

    Cloud Storage インポート トピックの作成時に、必要に応じて最小オブジェクト作成時間を指定できます。このタイムスタンプ以降に作成されたオブジェクトのみが取り込まれます。このタイムスタンプは、YYYY-MM-DDThh:mm:ssZ のような形式で指定する必要があります。0001-01-01T00:00:00Z から 9999-12-31T23:59:59Z までの過去または未来の任意の日付が有効です。

    glob パターンを照合

    Cloud Storage インポート トピックを作成するときに、必要に応じて一致する glob パターンを指定できます。このパターンに一致する名前のオブジェクトのみが取り込まれます。たとえば、接尾辞 .txt が付いたすべてのオブジェクトを取り込むには、glob パターンを **.txt として指定します。

    glob パターンでサポートされている構文については、Cloud Storage のドキュメントをご覧ください。

    Cloud Storage のインポート トピックを使用する

    新しいインポート トピックを作成することも、既存のトピックを編集することもできます。

    考慮事項

    • トピックとサブスクリプションを別々に作成すると、連続して作成した場合でもデータが失われる可能性があります。トピックは、定期購入なしで利用できる期間が短く設定されています。この間にトピックにデータが送信されると、データは失われます。最初にトピックを作成し、サブスクリプションを作成してから、トピックをインポート トピックに変換することで、インポート プロセス中にメッセージが失われることはありません。

    Cloud Storage インポート トピックを作成する

    Cloud Storage インポート トピックを作成する手順は次のとおりです。

    コンソール

    1. Trusted Cloud コンソールで、[トピック] ページに移動します。

      [トピック] に移動

    2. [トピックを作成] をクリックします。

      トピックの詳細ページが開きます。

    3. [トピック ID] フィールドに、Cloud Storage インポート トピックの ID を入力します。

      トピックの命名の詳細については、命名ガイドラインをご覧ください。

    4. [デフォルトのサブスクリプションを追加する] を選択します。

    5. [取り込みを有効にする] を選択します。

    6. 取り込み元として [Google Cloud Storage] を選択します。

    7. Cloud Storage バケットの場合は、[ブラウズ] をクリックします。

      [バケットの選択] ページが開きます。次のオプションのいずれかを選択します。

      • 適切なプロジェクトから既存のバケットを選択します。

      • 作成アイコンをクリックし、画面の指示に沿って新しいバケットを作成します。バケットを作成したら、Cloud Storage インポート トピックのバケットを選択します。

    8. バケットを指定すると、Pub/Sub は Pub/Sub サービス アカウントのバケットに対する適切な権限を確認します。権限の問題がある場合は、次のようなメッセージが表示されます。

      Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

      権限の問題が発生した場合は、[権限を設定] をクリックします。詳細については、Pub/Sub サービス アカウントに Cloud Storage 権限を付与するをご覧ください。

    9. [オブジェクト形式] で、[テキスト]、[Avro]、[Pub/Sub Avro] のいずれかを選択します。

      [テキスト] を選択した場合は、必要に応じて、オブジェクトをメッセージに分割する 区切り文字を指定できます。

      これらのオプションの詳細については、入力形式をご覧ください。

    10. 省略可。トピックの最短のオブジェクト作成時間を指定できます。設定されている場合、最短のオブジェクト作成時間以降に作成されたオブジェクトのみが取り込まれます。

      詳細については、最短のオブジェクト作成時間をご覧ください。

    11. Glob パターンを指定する必要があります。バケット内のすべてのオブジェクトを取り込むには、glob パターンとして ** を使用します。設定すると、指定されたパターンに一致するオブジェクトのみが取り込まれます。

      詳細については、グロブ パターンを照合するをご覧ください。

    12. 他のデフォルト設定はそのままにします。

    13. [トピックを作成] をクリックします。

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. gcloud pubsub topics create コマンドを実行します。

      gcloud pubsub topics create TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME \
          --cloud-storage-ingestion-input-format=INPUT_FORMAT \
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
          --cloud-storage-ingestion-match-glob=MATCH_GLOB
      

      このコマンドでは、TOPIC_ID--cloud-storage-ingestion-bucket フラグ、--cloud-storage-ingestion-input-format フラグのみが必要です。残りのフラグはオプションであり、省略できます。

      次のように置き換えます。

      • TOPIC_ID: トピックの名前または ID。
      • BUCKET_NAME: 既存のバケットの名前を指定します。例: prod_bucketバケット名にプロジェクト ID を含めてはいけません。バケットを作成するには、バケットを作成するをご覧ください。
      • INPUT_FORMAT: 取り込まれるオブジェクトの形式を指定します。textavropubsub_avro のいずれかになります。これらのオプションの詳細については、入力形式をご覧ください。
      • TEXT_DELIMITER: テキスト オブジェクトを Pub/Sub メッセージに分割する区切り文字を指定します。これは 1 文字にする必要があり、INPUT_FORMATtext の場合にのみ設定する必要があります。デフォルトは改行文字(\n)です。

        gcloud CLI を使用して区切り文字を指定する場合は、改行 \n などの特殊文字の処理に注意してください。区切り文字が正しく解釈されるように、'\n' 形式を使用します。引用符やエスケープなしで \n を使用すると、区切り文字は "n" になります。

      • MINIMUM_OBJECT_CREATE_TIME: オブジェクトが取り込まれるために作成された最小時間を指定します。UTC で YYYY-MM-DDThh:mm:ssZ の形式にする必要があります。例: 2024-10-14T08:30:30Z

        0001-01-01T00:00:00Z から 9999-12-31T23:59:59Z までの過去または未来の任意の日付が有効です。

      • MATCH_GLOB: オブジェクトを取り込むために照合する glob パターンを指定します。gcloud CLI を使用している場合、* 文字を含む一致グロブでは、* 文字を \*\*.txt の形式でエスケープするか、一致グロブ全体を引用符 "**.txt" または '**.txt' で囲む必要があります。glob パターンでサポートされている構文については、Cloud Storage のドキュメントをご覧ください。

    3. C++

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string bucket, std::string const& input_format,
         std::string text_delimiter, std::string match_glob,
         std::string const& minimum_object_create_time) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                                   ->mutable_cloud_storage();
        cloud_storage.set_bucket(std::move(bucket));
        if (input_format == "text") {
          cloud_storage.mutable_text_format()->set_delimiter(
              std::move(text_delimiter));
        } else if (input_format == "avro") {
          cloud_storage.mutable_avro_format();
        } else if (input_format == "pubsub_avro") {
          cloud_storage.mutable_pubsub_avro_format();
        } else {
          std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                       "got value: "
                    << input_format << std::endl;
          return;
        }
      
        if (!match_glob.empty()) {
          cloud_storage.set_match_glob(std::move(match_glob));
        }
      
        if (!minimum_object_create_time.empty()) {
          google::protobuf::Timestamp timestamp;
          if (!google::protobuf::util::TimeUtil::FromString(
                  minimum_object_create_time,
                  cloud_storage.mutable_minimum_object_create_time())) {
            std::cout << "Invalid minimum object create time: "
                      << minimum_object_create_time << std::endl;
          }
        }
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用しています。まだ v1 ライブラリを使用している場合は、v2 への移行ガイドをご覧ください。v1 コードサンプルの一覧については、 非推奨のコードサンプルをご覧ください。

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

      import (
      	"context"
      	"fmt"
      	"io"
      	"time"
      
      	"cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      	"google.golang.org/protobuf/types/known/timestamppb"
      )
      
      func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      	// bucket := "my-bucket"
      	// matchGlob := "**.txt"
      	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
      	// delimiter := ","
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
      	if err != nil {
      		return err
      	}
      
      	topicpb := &pubsubpb.Topic{
      		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
      				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
      					Bucket: bucket,
      					// Alternatively, can be Avro or PubSubAvro formats. See
      					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
      						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
      							Delimiter: &delimiter,
      						},
      					},
      					MatchGlob:               matchGlob,
      					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
      				},
      			},
      		},
      	}
      	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
      	return nil
      }
      

      Java

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.protobuf.util.Timestamps;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      import java.text.ParseException;
      
      public class CreateTopicWithCloudStorageIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Cloud Storage ingestion settings.
          // bucket and inputFormat are required arguments.
          String bucket = "your-bucket";
          String inputFormat = "text";
          String textDelimiter = "\n";
          String matchGlob = "**.txt";
          String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";
      
          createTopicWithCloudStorageIngestionExample(
              projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
        }
      
        public static void createTopicWithCloudStorageIngestionExample(
            String projectId,
            String topicId,
            String bucket,
            String inputFormat,
            String textDelimiter,
            String matchGlob,
            String minimumObjectCreateTime)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
                IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
            switch (inputFormat) {
              case "text":
                cloudStorageBuilder.setTextFormat(
                    IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                        .setDelimiter(textDelimiter)
                        .build());
                break;
              case "avro":
                cloudStorageBuilder.setAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
                break;
              case "pubsub_avro":
                cloudStorageBuilder.setPubsubAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
                break;
              default:
                throw new IllegalArgumentException(
                    "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
            }
      
            if (matchGlob != null && !matchGlob.isEmpty()) {
              cloudStorageBuilder.setMatchGlob(matchGlob);
            }
      
            if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
              try {
                cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
              } catch (ParseException e) {
                System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
              }
            }
      
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder()
                    .setCloudStorage(cloudStorageBuilder.build())
                    .build();
      
            TopicName topicName = TopicName.of(projectId, topicId);
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId,
        bucket,
        inputFormat,
        textDelimiter,
        matchGlob,
        minimumObjectCreateTime,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Node.ts

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId: string,
        bucket: string,
        inputFormat: string,
        textDelimiter: string,
        matchGlob: string,
        minimumObjectCreateTime: string,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata: TopicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Python

      このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

      from google.cloud import pubsub_v1
      from google.protobuf import timestamp_pb2
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bucket = "your-bucket"
      # input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
      # text_delimiter = "\n"
      # match_glob = "**.txt"
      # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
          bucket=bucket,
      )
      if input_format == "text":
          cloud_storage_settings.text_format = (
              IngestionDataSourceSettings.CloudStorage.TextFormat(
                  delimiter=text_delimiter
              )
          )
      elif input_format == "avro":
          cloud_storage_settings.avro_format = (
              IngestionDataSourceSettings.CloudStorage.AvroFormat()
          )
      elif input_format == "pubsub_avro":
          cloud_storage_settings.pubsub_avro_format = (
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
          )
      else:
          print(
              "Invalid input_format: "
              + input_format
              + "; must be in ('text', 'avro', 'pubsub_avro')"
          )
          return
      
      if match_glob:
          cloud_storage_settings.match_glob = match_glob
      
      if minimum_object_create_time:
          try:
              minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
              minimum_object_create_time_timestamp.FromJsonString(
                  minimum_object_create_time
              )
              cloud_storage_settings.minimum_object_create_time = (
                  minimum_object_create_time_timestamp
              )
          except ValueError:
              print("Invalid minimum_object_create_time: " + minimum_object_create_time)
              return
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              cloud_storage=cloud_storage_settings,
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

    問題が発生した場合は、Cloud Storage インポート トピックのトラブルシューティングをご覧ください。

    Cloud Storage インポート トピックを編集する

    Cloud Storage インポート トピックを編集して、そのプロパティを更新できます。

    たとえば、取り込みを再開するには、バケットを変更するか、最小オブジェクト作成時間を更新します。

    Cloud Storage インポート トピックを編集するには、次の操作を行います。

    コンソール

    1. Trusted Cloud コンソールで、[トピック] ページに移動します。

      [トピック] に移動

    2. Cloud Storage インポート トピックをクリックします。

    3. [トピックの詳細] ページで、[編集] をクリックします。

    4. 変更するフィールドを更新します。

    5. [更新] をクリックします。

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. インポート トピックの設定が失われないように、トピックを更新するたびにすべての設定を含めるようにしてください。省略すると、Pub/Sub は設定を元のデフォルト値にリセットします。

      次のサンプルに記載されているフラグをすべて指定して、gcloud pubsub topics update コマンドを実行します。

      gcloud pubsub topics update TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME\
          --cloud-storage-ingestion-input-format=INPUT_FORMAT\
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
          --cloud-storage-ingestion-match-glob=MATCH_GLOB

      次のように置き換えます。

      • TOPIC_ID はトピック ID または名前です。このフィールドは更新できません。

      • BUCKET_NAME: 既存のバケットの名前を指定します。例: prod_bucketバケット名にプロジェクト ID を含めてはいけません。バケットを作成するには、バケットを作成するをご覧ください。

      • INPUT_FORMAT: 取り込まれるオブジェクトの形式を指定します。textavropubsub_avro のいずれかになります。これらのオプションの詳細については、 入力形式をご覧ください。

      • TEXT_DELIMITER: テキスト オブジェクトを Pub/Sub メッセージに分割する区切り文字を指定します。これは 1 文字にする必要があり、INPUT_FORMATtext の場合にのみ設定する必要があります。デフォルトは改行文字(\n)です。

        gcloud CLI を使用して区切り文字を指定する場合は、改行 \n などの特殊文字の処理に注意してください。区切り文字が正しく解釈されるように、'\n' 形式を使用します。引用符やエスケープなしで \n を使用すると、区切り文字は "n" になります。

      • MINIMUM_OBJECT_CREATE_TIME: オブジェクトが取り込まれるために作成された最小時間を指定します。UTC で YYYY-MM-DDThh:mm:ssZ の形式にする必要があります。例: 2024-10-14T08:30:30Z

        0001-01-01T00:00:00Z から 9999-12-31T23:59:59Z までの過去または未来の任意の日付が有効です。

      • MATCH_GLOB: オブジェクトを取り込むために照合する glob パターンを指定します。gcloud CLI を使用している場合、* 文字を含む一致グロブでは、* 文字を \*\*.txt の形式でエスケープするか、一致グロブ全体を引用符 "**.txt" または '**.txt' で囲む必要があります。glob パターンでサポートされている構文については、 Cloud Storage のドキュメントをご覧ください。

    Cloud Storage インポート トピックの割り当てと上限

    インポート トピックのパブリッシャーのスループットは、トピックのパブリッシュ割り当てによってバインドされます。詳細については、Pub/Sub の割り当てと上限をご覧ください。

    次のステップ