建立 Cloud Storage 匯入主題

透過 Cloud Storage 匯入主題,您可以持續將 Cloud Storage 中的資料擷取至 Pub/Sub。然後,您可以將資料串流至 Pub/Sub 支援的任何目的地。Pub/Sub 會自動偵測新增至 Cloud Storage bucket 的物件,並擷取這些物件。

Cloud Storage 是在 Trusted Cloud by S3NS中為您儲存物件的服務。物件是不可變更的資料片段,由任何格式的檔案組成。物件會存放在名為「值區」的容器中。儲存空間也可以包含代管資料夾,您可以使用這些資料夾,為具有共用名稱前置字元的物件群組提供擴充存取權。

如要進一步瞭解 Cloud Storage,請參閱 Cloud Storage 說明文件

如要進一步瞭解匯入主題,請參閱「關於匯入主題」。

事前準備

必要角色和權限

如要取得建立及管理 Cloud Storage 匯入主題所需的權限,請要求管理員為您授予主題或專案的 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備建立及管理 Cloud Storage 匯入主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要建立及管理 Cloud Storage 匯入主題,您必須具備下列權限:

  • 建立匯入主題: pubsub.topics.create
  • 刪除匯入主題: pubsub.topics.delete
  • 取得匯入主題: pubsub.topics.get
  • 列出匯入主題: pubsub.topics.list
  • 發布至匯入主題: pubsub.topics.publish
  • 更新匯入主題: pubsub.topics.update
  • 取得匯入主題的身分與存取權管理政策: pubsub.topics.getIamPolicy
  • 設定匯入主題的 IAM 政策 pubsub.topics.setIamPolicy

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

郵件儲存政策符合值區位置規定

Pub/Sub 主題的訊息儲存政策必須與 Cloud Storage bucket 所在的區域重疊。這項政策會規定 Pub/Sub 服務可儲存訊息資料的位置。

  • 如果值區的位置類型為「地區」:政策必須包含該特定地區。舉例來說,如果值區位於 us-central1 地區,訊息儲存空間政策也必須包含 us-central1

  • 如果值區的位置類型為雙地區或多地區:政策必須包含雙地區或多地區位置中的至少一個地區。舉例來說,如果值區位於 US multi-region,訊息儲存空間政策可以包含 us-central1us-east1US multi-region 內的任何其他區域。

    如果政策未納入值區的地區,主題建立作業就會失敗。舉例來說,如果你的 bucket 位於 europe-west1,但訊息儲存空間政策只包含 asia-east1,就會收到錯誤訊息。

    如果訊息儲存空間政策只包含一個與值區位置重疊的區域,多區域備援機制可能會受到影響。這是因為如果單一區域無法使用,您可能就無法存取資料。為確保完整備援,建議您在訊息儲存政策中,納入至少兩個屬於值區多區域或雙區域位置的區域。

如要進一步瞭解值區位置,請參閱說明文件

啟用發布功能

如要啟用發布功能,您必須將 Pub/Sub 發布者角色指派給 Pub/Sub 服務帳戶,這樣 Pub/Sub 才能發布至 Cloud Storage 匯入主題。

啟用發布至所有 Cloud Storage 匯入主題

如果專案中沒有可用的 Cloud Storage 匯入主題,請選擇這個選項。

  1. 前往 Trusted Cloud 控制台的「IAM」頁面。

    前往身分與存取權管理頁面

  2. 選取「包含 S3NS提供的角色授予項目」核取方塊。

  3. 尋找格式如下的 Pub/Sub 服務帳戶:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

  4. 按一下這個服務帳戶的「編輯主體」按鈕。

  5. 視需要按一下「新增其他角色」

  6. 搜尋並選取 Pub/Sub 發布者角色 (roles/pubsub.publisher)。

  7. 按一下 [儲存]

啟用發布至單一 Cloud Storage 匯入主題

如要授予 Pub/Sub 權限,允許其發布至現有的特定 Cloud Storage 匯入主題,請按照下列步驟操作:

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud pubsub topics add-iam-policy-binding 指令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    更改下列內容:

    • TOPIC_ID 是 Cloud Storage 匯入主題的 ID 或名稱。

    • PROJECT_NUMBER 是專案編號。如要查看專案編號,請參閱「識別專案」。

  3. 將 Cloud Storage 角色指派給 Pub/Sub 服務帳戶

    如要建立 Cloud Storage 匯入主題,Pub/Sub 服務帳戶必須有權從特定 Cloud Storage bucket 讀取資料。必須具備下列權限:

    • storage.objects.list
    • storage.objects.get
    • storage.buckets.get

    如要將這些權限指派給 Pub/Sub 服務帳戶,請選擇下列其中一個程序:

    • 授予值區層級的權限。在特定 Cloud Storage 值區中,將 Storage 舊版物件讀取者 (roles/storage.legacyObjectReader) 角色和 Storage 舊版值區讀取者 (roles/storage.legacyBucketReader) 角色授予 Pub/Sub 服務帳戶。

    • 如必須在專案層級授予角色,建議您改為在包含 Cloud Storage 值區的專案中,授予「儲存空間管理員」角色 (roles/storage.admin)。將這個角色授予 Pub/Sub 服務帳戶。

    值區權限

    請按照下列步驟,在 bucket 層級將 Storage 舊版物件讀取者 (roles/storage.legacyObjectReader) 和 Storage 舊版 bucket 讀取者 (roles/storage.legacyBucketReader) 角色授予 Pub/Sub 服務帳戶:

    1. 前往 Trusted Cloud 控制台的「Cloud Storage」頁面。

      前往 Cloud Storage

    2. 按一下要從中讀取訊息的 Cloud Storage 值區,然後匯入 Cloud Storage 匯入主題。

      「Bucket details」(值區詳細資料) 頁面隨即開啟。

    3. 前往「Bucket details」(值區詳細資料) 頁面,點選「Permissions」(權限) 分頁標籤。

    4. 在「權限」 >「按照主體查看」分頁中,點選「授予存取權」

      「授予存取權」頁面隨即開啟。

    5. 在「新增主體」部分中,輸入 Pub/Sub 服務帳戶的名稱。

      服務帳戶的格式為 service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com。舉例來說,如果專案的 PROJECT_NUMBER112233445566,服務帳戶的格式為 service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

    6. 在「指派角色」 >「請選擇角色」下拉式選單中,輸入 Object Reader 並選取「Storage 舊版物件讀取者」角色。

    7. 按一下 [Add another role] (新增其他角色)

    8. 在「Select a role」(請選擇角色) 下拉式選單中輸入 Bucket Reader,然後選取「Storage Legacy Bucket Reader」(Storage 舊版 Bucket 讀取者) 角色。

    9. 按一下 [儲存]

    專案權限

    如要在專案層級授予「儲存空間管理員」角色 (roles/storage.admin),請按照下列步驟操作:

    1. 前往 Trusted Cloud 控制台的「IAM」頁面。

      前往身分與存取權管理頁面

    2. 在「權限」 >「按照主體查看」分頁中,點選「授予存取權」

      「授予存取權」頁面隨即開啟。

    3. 在「新增主體」部分中,輸入 Pub/Sub 服務帳戶的名稱。

      服務帳戶的格式為 service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com。舉例來說,如果專案的 PROJECT_NUMBER112233445566,服務帳戶的格式為 service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

    4. 在「指派角色」 >「請選擇角色」下拉式選單中,輸入 Storage Admin 並選取「Storage 管理員」角色。

    5. 按一下 [儲存]

    如要進一步瞭解 Cloud Storage IAM,請參閱「Cloud Storage Identity and Access Management」。

    Cloud Storage 匯入主題的屬性

    如要進一步瞭解所有主題的通用屬性,請參閱主題的屬性

    值區名稱

    這是 Cloud Storage bucket 的名稱,Pub/Sub 會從這個 bucket 讀取發布至 Cloud Storage 匯入主題的資料。

    輸入格式

    建立 Cloud Storage 匯入主題時,您可以將要擷取的物件格式指定為「Text」、「Avro」或「Pub/Sub Avro」

    • 文字。物件會假設包含純文字資料。只要物件符合物件建立時間下限,並符合 glob 模式條件,這個輸入格式就會嘗試擷取 bucket 中的所有物件。

      分隔符號。您也可以指定物件的分隔符,將物件分割成多則訊息。如未設定,預設為換行字元 (\n)。 分隔符號只能是單一字元。

    • Avro。物件採用 Apache Avro 二進位格式。如果物件不符合有效的 Apache Avro 格式,系統就不會擷取該物件。 以下是 Avro 的相關限制:

      • 不支援 Avro 1.1.0 和 1.2.0 版。
      • Avro 區塊的大小上限為 16 MB。
    • Pub/Sub Avro。物件採用 Apache Avro 二進位格式,且結構定義與使用 Pub/Sub Cloud Storage 訂閱項目Avro 檔案格式寫入 Cloud Storage 的物件相符。以下是 Pub/Sub Avro 的重要準則:

      • Avro 記錄的資料欄位會用於填入產生的 Pub/Sub 訊息資料欄位。

      • 如果為 Cloud Storage 訂閱項目指定 write_metadata 選項,屬性欄位中的任何值都會填入為產生的 Pub/Sub 訊息屬性。

      • 如果寫入 Cloud Storage 的原始訊息中指定了排序鍵,系統會在產生的 Pub/Sub 訊息中,以 original_message_ordering_key 名稱填入這個欄位做為屬性。

    物件建立時間下限

    建立 Cloud Storage 匯入主題時,您可以選擇指定物件建立時間下限。系統只會擷取在這個時間戳記建立或之後建立的物件。時間戳記的格式必須如 YYYY-MM-DDThh:mm:ssZ。從 0001-01-01T00:00:00Z9999-12-31T23:59:59Z (含) 的任何日期,無論是過去或未來,都有效。

    比對 glob 模式

    建立 Cloud Storage 匯入主題時,您可以選擇指定相符的 glob 模式。系統只會擷取名稱符合這個模式的物件。舉例來說,如要擷取所有以 .txt 為字尾的物件,您可以將 glob 模式指定為 **.txt

    如要瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件

    使用 Cloud Storage 匯入主題

    您可以建立新的匯入主題,或編輯現有主題。

    注意事項

    • 即使快速連續建立主題和訂閱項目,也可能導致資料遺失。在免付費期間,主題會短暫存在。如果在這段時間內有任何資料傳送至主題,這些資料都會遺失。先建立主題,然後建立訂閱項目,再將主題轉換為匯入主題,即可確保匯入程序不會遺漏任何訊息。

    建立 Cloud Storage 匯入主題

    如要建立 Cloud Storage 匯入主題,請按照下列步驟操作:

    控制台

    1. 前往 Trusted Cloud 控制台的「主題」頁面。

      前往「主題」

    2. 按一下「建立主題」

      主題詳細資料頁面隨即開啟。

    3. 在「主題 ID」欄位中,輸入 Cloud Storage 匯入主題的 ID。

      如要進一步瞭解如何命名主題,請參閱命名規範

    4. 選取「新增預設訂閱項目」

    5. 選取「啟用擷取功能」

    6. 選取「Google Cloud Storage」做為擷取來源。

    7. 按一下 Cloud Storage bucket 的「Browse」(瀏覽)

      「Select bucket」(選取值區) 頁面隨即開啟。選取下列選項之一:

      • 從任何適當的專案中選取現有 bucket。

      • 按一下建立圖示,然後按照畫面上的指示建立新的值區。建立 bucket 後,請選取 Cloud Storage 匯入主題的 bucket。

    8. 指定值區時,Pub/Sub 會檢查 Pub/Sub 服務帳戶是否具備該值區的適當權限。如果發生權限問題,您會看到類似下列內容的訊息:

      Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

      如果遇到權限問題,請按一下「設定權限」。詳情請參閱授予 Pub/Sub 服務帳戶 Cloud Storage 權限

    9. 在「物件格式」中,選取「文字」、「Avro」或「Pub/Sub Avro」

      如果選取「文字」,可以視需要指定「分隔符號」,將物件分割成訊息。

      如要進一步瞭解這些選項,請參閱「輸入格式」。

    10. (選用步驟) 您可以為主題指定「物件建立時間下限」。如果設定此值,系統只會擷取在物件建立時間下限之後建立的物件。

      詳情請參閱「物件建立時間下限」。

    11. 您必須指定 Glob 模式。如要擷取 bucket 中的所有物件,請使用 ** 做為 glob 模式。如果設定此參數,系統只會擷取符合指定模式的物件。

      詳情請參閱「比對 glob 模式」。

    12. 保留其他預設設定。

    13. 按一下「建立主題」

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 執行 gcloud pubsub topics create 指令:

      gcloud pubsub topics create TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME \
          --cloud-storage-ingestion-input-format=INPUT_FORMAT \
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
          --cloud-storage-ingestion-match-glob=MATCH_GLOB
      

      在這個指令中,只需要 TOPIC_ID--cloud-storage-ingestion-bucket 旗標和 --cloud-storage-ingestion-input-format 旗標。其餘旗標為選用,可以省略。

      更改下列內容:

      • TOPIC_ID:主題的名稱或 ID。
      • BUCKET_NAME:指定現有值區的名稱。 例如:prod_bucket。值區名稱不得包含專案 ID。 如要建立 bucket,請參閱「建立 bucket」一文。
      • INPUT_FORMAT:指定要擷取的物件格式。這可以是 textavropubsub_avro。如要進一步瞭解這些選項,請參閱「輸入格式」。
      • TEXT_DELIMITER:指定用於將文字物件分割為 Pub/Sub 訊息的分隔符。這應該是單一字元,且只有在 INPUT_FORMATtext 時才應設定。預設為換行字元 (\n)。

        使用 gcloud CLI 指定分隔符號時,請特別注意換行 \n 等特殊字元的處理方式。請使用 '\n' 格式,確保系統正確解讀分隔符。 如果直接使用 \n (不加引號或逸出),分隔符號會變成 "n"

      • MINIMUM_OBJECT_CREATE_TIME:指定物件建立時間下限,只有在此時間之後建立的物件才會擷取。格式應為世界標準時間 YYYY-MM-DDThh:mm:ssZ。 例如:2024-10-14T08:30:30Z

        0001-01-01T00:00:00Z9999-12-31T23:59:59Z (含) 的任何日期,無論是過去或未來,都有效。

      • MATCH_GLOB:指定要比對的 glob 模式,物件必須符合該模式才能擷取。使用 gcloud CLI 時,含有 * 字元的相符 glob 必須以逸出形式 \*\*.txt 格式化 * 字元,或整個相符 glob 必須以引號 "**.txt"'**.txt' 括住。如要瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件

    3. C++

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 C++ 設定操作說明進行操作。詳情請參閱 Pub/Sub C++ API 參考說明文件

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string bucket, std::string const& input_format,
         std::string text_delimiter, std::string match_glob,
         std::string const& minimum_object_create_time) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                                   ->mutable_cloud_storage();
        cloud_storage.set_bucket(std::move(bucket));
        if (input_format == "text") {
          cloud_storage.mutable_text_format()->set_delimiter(
              std::move(text_delimiter));
        } else if (input_format == "avro") {
          cloud_storage.mutable_avro_format();
        } else if (input_format == "pubsub_avro") {
          cloud_storage.mutable_pubsub_avro_format();
        } else {
          std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                       "got value: "
                    << input_format << std::endl;
          return;
        }
      
        if (!match_glob.empty()) {
          cloud_storage.set_match_glob(std::move(match_glob));
        }
      
        if (!minimum_object_create_time.empty()) {
          google::protobuf::Timestamp timestamp;
          if (!google::protobuf::util::TimeUtil::FromString(
                  minimum_object_create_time,
                  cloud_storage.mutable_minimum_object_create_time())) {
            std::cout << "Invalid minimum object create time: "
                      << minimum_object_create_time << std::endl;
          }
        }
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      以下範例使用 Go Pub/Sub 用戶端程式庫的主要版本 (v2)。如果您仍在使用第 1 版程式庫,請參閱第 2 版遷移指南。如要查看第 1 版程式碼範例清單,請參閱 已淘汰的程式碼範例

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Go 設定說明進行操作。詳情請參閱 Pub/Sub Go API 參考說明文件

      import (
      	"context"
      	"fmt"
      	"io"
      	"time"
      
      	"cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      	"google.golang.org/protobuf/types/known/timestamppb"
      )
      
      func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      	// bucket := "my-bucket"
      	// matchGlob := "**.txt"
      	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
      	// delimiter := ","
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
      	if err != nil {
      		return err
      	}
      
      	topicpb := &pubsubpb.Topic{
      		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
      				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
      					Bucket: bucket,
      					// Alternatively, can be Avro or PubSubAvro formats. See
      					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
      						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
      							Delimiter: &delimiter,
      						},
      					},
      					MatchGlob:               matchGlob,
      					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
      				},
      			},
      		},
      	}
      	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
      	return nil
      }
      

      Java

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Java 設定操作說明進行操作。詳情請參閱 Pub/Sub Java API 參考說明文件

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.protobuf.util.Timestamps;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      import java.text.ParseException;
      
      public class CreateTopicWithCloudStorageIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Cloud Storage ingestion settings.
          // bucket and inputFormat are required arguments.
          String bucket = "your-bucket";
          String inputFormat = "text";
          String textDelimiter = "\n";
          String matchGlob = "**.txt";
          String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";
      
          createTopicWithCloudStorageIngestionExample(
              projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
        }
      
        public static void createTopicWithCloudStorageIngestionExample(
            String projectId,
            String topicId,
            String bucket,
            String inputFormat,
            String textDelimiter,
            String matchGlob,
            String minimumObjectCreateTime)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
                IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
            switch (inputFormat) {
              case "text":
                cloudStorageBuilder.setTextFormat(
                    IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                        .setDelimiter(textDelimiter)
                        .build());
                break;
              case "avro":
                cloudStorageBuilder.setAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
                break;
              case "pubsub_avro":
                cloudStorageBuilder.setPubsubAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
                break;
              default:
                throw new IllegalArgumentException(
                    "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
            }
      
            if (matchGlob != null && !matchGlob.isEmpty()) {
              cloudStorageBuilder.setMatchGlob(matchGlob);
            }
      
            if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
              try {
                cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
              } catch (ParseException e) {
                System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
              }
            }
      
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder()
                    .setCloudStorage(cloudStorageBuilder.build())
                    .build();
      
            TopicName topicName = TopicName.of(projectId, topicId);
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId,
        bucket,
        inputFormat,
        textDelimiter,
        matchGlob,
        minimumObjectCreateTime,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Node.ts

      在嘗試這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Node.js 設定說明進行操作。詳情請參閱 Pub/Sub Node.js API 參考說明文件

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId: string,
        bucket: string,
        inputFormat: string,
        textDelimiter: string,
        matchGlob: string,
        minimumObjectCreateTime: string,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata: TopicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Python

      在試用這個範例之前,請先按照快速入門:使用用戶端程式庫中的 Python 設定操作說明來進行。詳情請參閱 Pub/Sub Python API 參考說明文件

      from google.cloud import pubsub_v1
      from google.protobuf import timestamp_pb2
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bucket = "your-bucket"
      # input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
      # text_delimiter = "\n"
      # match_glob = "**.txt"
      # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
          bucket=bucket,
      )
      if input_format == "text":
          cloud_storage_settings.text_format = (
              IngestionDataSourceSettings.CloudStorage.TextFormat(
                  delimiter=text_delimiter
              )
          )
      elif input_format == "avro":
          cloud_storage_settings.avro_format = (
              IngestionDataSourceSettings.CloudStorage.AvroFormat()
          )
      elif input_format == "pubsub_avro":
          cloud_storage_settings.pubsub_avro_format = (
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
          )
      else:
          print(
              "Invalid input_format: "
              + input_format
              + "; must be in ('text', 'avro', 'pubsub_avro')"
          )
          return
      
      if match_glob:
          cloud_storage_settings.match_glob = match_glob
      
      if minimum_object_create_time:
          try:
              minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
              minimum_object_create_time_timestamp.FromJsonString(
                  minimum_object_create_time
              )
              cloud_storage_settings.minimum_object_create_time = (
                  minimum_object_create_time_timestamp
              )
          except ValueError:
              print("Invalid minimum_object_create_time: " + minimum_object_create_time)
              return
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              cloud_storage=cloud_storage_settings,
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

    如果發生問題,請參閱「排解 Cloud Storage 匯入問題」主題。

    編輯 Cloud Storage 匯入主題

    您可以編輯 Cloud Storage 匯入主題,更新其屬性。

    舉例來說,如要重新啟動擷取作業,您可以變更 bucket 或更新物件建立時間下限

    如要編輯 Cloud Storage 匯入主題,請完成下列步驟:

    控制台

    1. 前往 Trusted Cloud 控制台的「主題」頁面。

      前往「主題」

    2. 按一下 Cloud Storage 匯入主題。

    3. 在主題詳細資料頁面中,按一下「編輯」

    4. 更新要變更的欄位。

    5. 按一下「更新」

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 為避免遺失匯入主題的設定,請務必在每次更新主題時加入所有設定。如果省略任何項目,Pub/Sub 會將設定重設為原始預設值。

      使用下列範例中提及的所有旗標執行 gcloud pubsub topics update 指令:

      gcloud pubsub topics update TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME\
          --cloud-storage-ingestion-input-format=INPUT_FORMAT\
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
          --cloud-storage-ingestion-match-glob=MATCH_GLOB

      更改下列內容:

      • TOPIC_ID 是主題 ID 或名稱。這個欄位無法更新。

      • BUCKET_NAME:指定現有值區的名稱。 例如:prod_bucket。值區名稱不得包含專案 ID。 如要建立 bucket,請參閱「建立 bucket」一文。

      • INPUT_FORMAT:指定要擷取的物件格式。這可以是 textavropubsub_avro。如要進一步瞭解這些選項,請參閱「 輸入格式」。

      • TEXT_DELIMITER:指定用於將文字物件分割為 Pub/Sub 訊息的分隔符。這應該是單一字元,且只有在 INPUT_FORMATtext 時才應設定。預設為換行字元 (\n)。

        使用 gcloud CLI 指定分隔符號時,請特別注意換行 \n 等特殊字元的處理方式。請使用 '\n' 格式,確保系統正確解讀分隔符。如果直接使用 \n (不加引號或逸出),分隔符號會變成 "n"

      • MINIMUM_OBJECT_CREATE_TIME:指定物件建立時間下限,只有在此時間之後建立的物件才會擷取。格式應為世界標準時間 YYYY-MM-DDThh:mm:ssZ。 例如:2024-10-14T08:30:30Z

        0001-01-01T00:00:00Z9999-12-31T23:59:59Z (含) 的任何日期,無論是過去或未來,都有效。

      • MATCH_GLOB:指定要比對的 glob 模式,物件必須符合該模式才能擷取。使用 gcloud CLI 時,含有 * 字元的相符 glob 必須以逸出形式 \*\*.txt 格式化 * 字元,或整個相符 glob 必須以引號 "**.txt"'**.txt' 括住。如要瞭解支援的 glob 模式語法,請參閱 Cloud Storage 說明文件

    Cloud Storage 匯入主題的配額與限制

    匯入主題的發布者輸送量會受限於主題的發布配額。詳情請參閱 Pub/Sub 配額與限制

    後續步驟