创建 Cloud Storage 导入主题

借助 Cloud Storage 导入主题,您可以持续将数据从 Cloud Storage 注入到 Pub/Sub 中。然后,您可以将数据流式传输到 Pub/Sub 支持的任何目的地。Pub/Sub 会自动检测添加到 Cloud Storage 存储桶的新对象并将其注入。

Cloud Storage 是一项用于将您的对象存储在Trusted Cloud by S3NS中的服务。对象是由任意格式的文件组成的不可变的数据段。对象存储在称为存储分区的容器中。 存储分区还可以包含托管式文件夹,用于提供对具有共享名称前缀的对象组的扩展访问权限。

如需详细了解 Cloud Storage,请参阅 Cloud Storage 文档

如需详细了解导入主题,请参阅关于导入主题

准备工作

所需的角色和权限

如需获得创建和管理 Cloud Storage 导入主题所需的权限,请让您的管理员为您授予主题或项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建和管理 Cloud Storage 导入主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建和管理 Cloud Storage 导入主题需要以下权限:

  • 创建导入主题: pubsub.topics.create
  • 删除导入主题: pubsub.topics.delete
  • 获取导入主题: pubsub.topics.get
  • 列出导入主题: pubsub.topics.list
  • 发布到导入主题: pubsub.topics.publish
  • 更新导入主题: pubsub.topics.update
  • 获取导入主题的 IAM 政策: pubsub.topics.getIamPolicy
  • 为导入主题配置 IAM 政策 pubsub.topics.setIamPolicy

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级层和个别资源级层配置访问权限控制。

消息存储政策符合存储桶位置要求

Pub/Sub 主题的消息存储政策必须与 Cloud Storage 存储桶所在的区域重叠。此政策规定了 Pub/Sub 允许存储消息数据的位置。

  • 对于位置类型为区域的存储分区:政策必须包含该特定区域。例如,如果您的存储桶位于 us-central1 区域,则消息存储政策也必须包含 us-central1

  • 对于位置类型为双区域或多区域的存储分区:政策必须包含双区域或多区域位置中的至少一个区域。例如,如果您的存储桶位于 US multi-region,则消息存储政策可以包含 us-central1us-east1US multi-region 内的任何其他区域。

    如果该政策不包含相应存储桶的区域,则主题创建会失败。例如,如果您的存储桶位于 europe-west1,但您的消息存储政策仅包含 asia-east1,您会收到错误消息。

    如果消息存储政策仅包含一个与相应存储桶的位置重叠的区域,则多区域冗余可能会受到影响。这是因为,如果该单个区域变得不可用,您的数据可能无法访问。为确保完全冗余,建议在消息存储政策中至少包含两个区域,这两个区域属于存储桶的多区域或双区域位置。

如需详细了解存储桶位置,请参阅文档

启用发布

如需启用发布功能,您必须将 Pub/Sub 发布者角色分配给 Pub/Sub 服务账号,以便 Pub/Sub 能够发布到 Cloud Storage 导入主题。

启用向所有 Cloud Storage 导入主题发布的功能

如果您的项目中没有可用的 Cloud Storage 导入主题,请选择此选项。

  1. 在 Trusted Cloud 控制台中,前往 IAM 页面。

    转到 IAM

  2. 选中包括 S3NS提供的角色授权复选框。

  3. 查找具有以下格式的 Pub/Sub 服务账号:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

  4. 对于此服务账号,点击修改主账号按钮。

  5. 如果需要,请点击添加其他角色

  6. 搜索并选择 Pub/Sub 发布者角色 (roles/pubsub.publisher)。

  7. 点击保存

启用发布到单个 Cloud Storage 导入主题的功能

如果您想授予 Pub/Sub 向特定 Cloud Storage 导入主题(该主题已存在)发布消息的权限,请按以下步骤操作:

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud pubsub topics add-iam-policy-binding 命令:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    替换以下内容:

    • TOPIC_ID 是 Cloud Storage 导入主题的 ID 或名称。

    • PROJECT_NUMBER 是项目编号。如需查看项目编号,请参阅标识项目

  3. 为 Pub/Sub 服务账号分配 Cloud Storage 角色

    如需创建 Cloud Storage 导入主题,Pub/Sub 服务账号必须有权从特定的 Cloud Storage 存储桶中读取数据。必须拥有以下权限:

    • storage.objects.list
    • storage.objects.get
    • storage.buckets.get

    如需将这些权限分配给 Pub/Sub 服务账号,请选择以下其中一种方法:

    • 在存储桶级层授予权限。在特定的 Cloud Storage 存储桶中,向 Pub/Sub 服务账号授予 Storage Legacy Object Reader (roles/storage.legacyObjectReader) 角色和 Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) 角色。

    • 如果您必须在项目级层授予角色,则可以改为在包含 Cloud Storage 存储桶的项目中授予 Storage Admin (roles/storage.admin) 角色。将此角色授予 Pub/Sub 服务账号。

    存储分区权限

    请按以下步骤操作,在存储桶级层向 Pub/Sub 服务账号授予 Storage Legacy Object Reader (roles/storage.legacyObjectReader) 角色和 Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) 角色:

    1. 在 Trusted Cloud 控制台中,前往 Cloud Storage 页面。

      转到 Cloud Storage

    2. 点击您要从中读取消息并导入到 Cloud Storage 导入主题的 Cloud Storage 存储桶。

      系统会打开存储分区详情页面。

    3. 存储桶详情页面中,点击配置标签页。

    4. 权限 > 按主账号查看标签页中,点击授予访问权限

      系统会打开授予访问权限页面。

    5. 添加主账号部分,输入您的 Pub/Sub 服务账号的名称。

      服务账号的格式为 service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com。 例如,对于 PROJECT_NUMBER112233445566 的项目,服务账号的格式为 service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

    6. 分配角色 > 选择角色下拉菜单中,输入 Object Reader,然后选择存储旧版对象读取者角色。

    7. 点击添加其他角色

    8. 选择角色下拉菜单中,输入 Bucket Reader,然后选择 Storage Legacy Bucket Reader 角色。

    9. 点击保存

    项目权限

    请按以下步骤操作,以在项目级层授予 Storage Admin (roles/storage.admin) 角色:

    1. 在 Trusted Cloud 控制台中,前往 IAM 页面。

      转到 IAM

    2. 权限 > 按主账号查看标签页中,点击授予访问权限

      系统会打开授予访问权限页面。

    3. 添加主账号部分,输入您的 Pub/Sub 服务账号的名称。

      服务账号的格式为 service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com。 例如,对于 PROJECT_NUMBER112233445566 的项目,服务账号的格式为 service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

    4. 分配角色 > 选择角色下拉菜单中,输入 Storage Admin,然后选择存储管理员角色。

    5. 点击保存

    如需详细了解 Cloud Storage IAM,请参阅 Cloud Storage Identity and Access Management

    Cloud Storage 导入主题的属性

    如需详细了解所有主题的通用属性,请参阅主题的属性

    存储桶名称

    这是 Cloud Storage 存储桶的名称,Pub/Sub 从该存储分区读取发布到 Cloud Storage 导入主题的数据。

    输入格式

    创建 Cloud Storage 导入主题时,您可以将要提取的对象格式指定为 TextAvroPub/Sub Avro

    • 文本。对象应包含纯文本数据。此输入格式会尝试注入存储桶中的所有对象,前提是该对象满足创建对象的最短时间,并且符合 glob 模式条件

      分隔符。您还可以指定将对象拆分为消息的分隔符。如果未设置,此参数默认为换行符 (\n)。分隔符只能包含一个字符。

    • Avro。对象采用 Apache Avro 二进制格式。 任何不符合有效 Apache Avro 格式的对象都不会被提取。 以下是有关 Avro 的限制:

      • 不支持 Avro 版本 1.1.0 和 1.2.0。
      • Avro 块的大小上限为 16 MB。
    • Pub/Sub Avro。对象采用 Apache Avro 二进制格式,其架构与使用 Pub/Sub Cloud Storage 订阅Avro 文件格式写入 Cloud Storage 的对象的架构相匹配。以下是有关 Pub/Sub Avro 的一些重要准则:

      • Avro 记录的数据字段用于填充生成的 Pub/Sub 消息的数据字段。

      • 如果为 Cloud Storage 订阅指定了 write_metadata 选项,则 attributes 字段中的任何值都会填充为生成的 Pub/Sub 消息的属性。

      • 如果写入 Cloud Storage 的原始消息中指定了排序键,则此字段会在生成的 Pub/Sub 消息中填充为名称为 original_message_ordering_key 的属性。

    创建对象的最短时间

    创建 Cloud Storage 导入主题时,您可以选择指定最短对象创建时间。系统只会提取在此时间戳或之后创建的对象。此时间戳必须采用 YYYY-MM-DDThh:mm:ssZ 等格式提供。从 0001-01-01T00:00:00Z9999-12-31T23:59:59Z(含)之间的任何日期(过去或未来)均有效。

    匹配 glob 模式

    创建 Cloud Storage 导入主题时,您可以选择指定匹配 glob 模式。仅注入名称与此模式匹配的对象。例如,如需注入所有以 .txt 为后缀的对象,您可以将 glob 模式指定为 **.txt

    如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档

    使用 Cloud Storage 导入主题

    您可以创建新的导入主题,也可以修改现有主题。

    注意事项

    • 即使快速连续地创建主题和订阅,也可能会导致数据丢失。在订阅期结束后的短时间内,主题仍可使用。如果在此期间向主题发送任何数据,这些数据都会丢失。通过先创建主题、创建订阅,然后将主题转换为导入主题,您可以确保在导入过程中不会遗漏任何消息。

    创建 Cloud Storage 导入主题

    如需创建 Cloud Storage 导入主题,请按以下步骤操作:

    控制台

    1. 在 Trusted Cloud 控制台中,前往主题页面。

      打开“主题”

    2. 点击创建主题

      系统会打开主题详情页面。

    3. 主题 ID 字段中,输入 Cloud Storage 导入主题的 ID。

      如需详细了解如何命名主题,请参阅命名准则

    4. 选择添加默认订阅

    5. 选择启用注入

    6. 对于提取源,请选择 Google Cloud Storage

    7. 对于 Cloud Storage 存储桶,请点击浏览

      系统会打开选择存储桶页面。从下列选项中选择一项:

      • 从任何合适的项目中选择现有存储桶。

      • 点击“创建”图标,然后按照屏幕上的说明创建新存储桶。创建存储桶后,选择 Cloud Storage 导入主题的存储桶。

    8. 指定存储桶后,Pub/Sub 会检查 Pub/Sub 服务账号是否对该存储桶拥有适当的权限。如果存在权限问题,您会看到类似以下内容的消息:

      Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

      如果您遇到权限问题,请点击设置权限。如需了解详情,请参阅向 Pub/Sub 服务账号授予 Cloud Storage 权限

    9. 对于对象格式,请选择文本AvroPub/Sub Avro

      如果您选择文本,则可以选择性地指定用于将对象拆分为消息的分隔符

      如需详细了解这些选项,请参阅输入格式

    10. 可选。您可以为主题指定创建对象的最短时间。如果设置,则仅提取在最短对象创建时间之后创建的对象。

      如需了解详情,请参阅创建对象的最短时间

    11. 您必须指定 Glob 模式。如需注入存储桶中的所有对象,请使用 ** 作为 glob 模式。如果设置,则仅注入与给定模式匹配的对象。

      如需了解详情,请参阅匹配 glob 模式

    12. 保留其他默认设置。

    13. 点击创建主题

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 运行 gcloud pubsub topics create 命令:

      gcloud pubsub topics create TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME \
          --cloud-storage-ingestion-input-format=INPUT_FORMAT \
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
          --cloud-storage-ingestion-match-glob=MATCH_GLOB
      

      在该命令中,仅 TOPIC_ID--cloud-storage-ingestion-bucket 标志和 --cloud-storage-ingestion-input-format 标志是必需的。其余标志是可选的,可以省略。

      替换以下内容:

      • TOPIC_ID:主题的名称或 ID。
      • BUCKET_NAME:指定现有存储桶的名称。 例如 prod_bucket。 存储桶名称不得包含项目 ID。 如需创建存储桶,请参阅创建存储桶
      • INPUT_FORMAT:指定所提取对象的格式。 可以是 textavropubsub_avro。 如需详细了解这些选项,请参阅输入格式
      • TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。 此值应为单个字符,并且仅当 INPUT_FORMATtext 时才应设置。 默认值为换行符 (\n)。

        使用 gcloud CLI 指定分隔符时,请密切注意换行符 \n 等特殊字符的处理方式。请使用 '\n' 格式,确保系统正确解读分隔符。 如果直接使用 \n(不带引号或转义),则分隔符为 "n"

      • MINIMUM_OBJECT_CREATE_TIME:指定对象创建的最早时间,只有在此时间之后创建的对象才会被提取。 此值应采用世界协调时间 (UTC),格式为 YYYY-MM-DDThh:mm:ssZ。 例如 2024-10-14T08:30:30Z

        0001-01-01T00:00:00Z9999-12-31T23:59:59Z(含)之间的任何日期(过去或未来)均有效。

      • MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,如果匹配 glob 包含 * 字符,则必须将 * 字符格式化为转义字符(格式为 \*\*.txt),或者整个匹配 glob 必须用英文引号 "**.txt"'**.txt' 括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档

    3. C++

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string bucket, std::string const& input_format,
         std::string text_delimiter, std::string match_glob,
         std::string const& minimum_object_create_time) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                                   ->mutable_cloud_storage();
        cloud_storage.set_bucket(std::move(bucket));
        if (input_format == "text") {
          cloud_storage.mutable_text_format()->set_delimiter(
              std::move(text_delimiter));
        } else if (input_format == "avro") {
          cloud_storage.mutable_avro_format();
        } else if (input_format == "pubsub_avro") {
          cloud_storage.mutable_pubsub_avro_format();
        } else {
          std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                       "got value: "
                    << input_format << std::endl;
          return;
        }
      
        if (!match_glob.empty()) {
          cloud_storage.set_match_glob(std::move(match_glob));
        }
      
        if (!minimum_object_create_time.empty()) {
          google::protobuf::Timestamp timestamp;
          if (!google::protobuf::util::TimeUtil::FromString(
                  minimum_object_create_time,
                  cloud_storage.mutable_minimum_object_create_time())) {
            std::cout << "Invalid minimum object create time: "
                      << minimum_object_create_time << std::endl;
          }
        }
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

      import (
      	"context"
      	"fmt"
      	"io"
      	"time"
      
      	"cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      	"google.golang.org/protobuf/types/known/timestamppb"
      )
      
      func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      	// bucket := "my-bucket"
      	// matchGlob := "**.txt"
      	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
      	// delimiter := ","
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
      	if err != nil {
      		return err
      	}
      
      	topicpb := &pubsubpb.Topic{
      		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
      				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
      					Bucket: bucket,
      					// Alternatively, can be Avro or PubSubAvro formats. See
      					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
      						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
      							Delimiter: &delimiter,
      						},
      					},
      					MatchGlob:               matchGlob,
      					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
      				},
      			},
      		},
      	}
      	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
      	return nil
      }
      

      Java

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.protobuf.util.Timestamps;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      import java.text.ParseException;
      
      public class CreateTopicWithCloudStorageIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Cloud Storage ingestion settings.
          // bucket and inputFormat are required arguments.
          String bucket = "your-bucket";
          String inputFormat = "text";
          String textDelimiter = "\n";
          String matchGlob = "**.txt";
          String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";
      
          createTopicWithCloudStorageIngestionExample(
              projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
        }
      
        public static void createTopicWithCloudStorageIngestionExample(
            String projectId,
            String topicId,
            String bucket,
            String inputFormat,
            String textDelimiter,
            String matchGlob,
            String minimumObjectCreateTime)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
                IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
            switch (inputFormat) {
              case "text":
                cloudStorageBuilder.setTextFormat(
                    IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                        .setDelimiter(textDelimiter)
                        .build());
                break;
              case "avro":
                cloudStorageBuilder.setAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
                break;
              case "pubsub_avro":
                cloudStorageBuilder.setPubsubAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
                break;
              default:
                throw new IllegalArgumentException(
                    "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
            }
      
            if (matchGlob != null && !matchGlob.isEmpty()) {
              cloudStorageBuilder.setMatchGlob(matchGlob);
            }
      
            if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
              try {
                cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
              } catch (ParseException e) {
                System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
              }
            }
      
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder()
                    .setCloudStorage(cloudStorageBuilder.build())
                    .build();
      
            TopicName topicName = TopicName.of(projectId, topicId);
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId,
        bucket,
        inputFormat,
        textDelimiter,
        matchGlob,
        minimumObjectCreateTime,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Node.ts

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId: string,
        bucket: string,
        inputFormat: string,
        textDelimiter: string,
        matchGlob: string,
        minimumObjectCreateTime: string,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata: TopicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Python

      在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

      from google.cloud import pubsub_v1
      from google.protobuf import timestamp_pb2
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bucket = "your-bucket"
      # input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
      # text_delimiter = "\n"
      # match_glob = "**.txt"
      # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
          bucket=bucket,
      )
      if input_format == "text":
          cloud_storage_settings.text_format = (
              IngestionDataSourceSettings.CloudStorage.TextFormat(
                  delimiter=text_delimiter
              )
          )
      elif input_format == "avro":
          cloud_storage_settings.avro_format = (
              IngestionDataSourceSettings.CloudStorage.AvroFormat()
          )
      elif input_format == "pubsub_avro":
          cloud_storage_settings.pubsub_avro_format = (
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
          )
      else:
          print(
              "Invalid input_format: "
              + input_format
              + "; must be in ('text', 'avro', 'pubsub_avro')"
          )
          return
      
      if match_glob:
          cloud_storage_settings.match_glob = match_glob
      
      if minimum_object_create_time:
          try:
              minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
              minimum_object_create_time_timestamp.FromJsonString(
                  minimum_object_create_time
              )
              cloud_storage_settings.minimum_object_create_time = (
                  minimum_object_create_time_timestamp
              )
          except ValueError:
              print("Invalid minimum_object_create_time: " + minimum_object_create_time)
              return
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              cloud_storage=cloud_storage_settings,
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

    如果您遇到问题,请参阅“排查 Cloud Storage 导入问题”主题

    修改 Cloud Storage 导入主题

    您可以修改 Cloud Storage 导入主题以更新其属性。

    例如,如需重新开始提取,您可以更改存储桶或更新最短对象创建时间

    如需修改 Cloud Storage 导入主题,请执行以下步骤:

    控制台

    1. 在 Trusted Cloud 控制台中,前往主题页面。

      转到“主题”

    2. 点击 Cloud Storage 导入主题。

    3. 在主题详情页面中,点击修改

    4. 更新您要更改的字段。

    5. 点击更新

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. 为避免丢失导入主题的设置,请务必在每次更新主题时包含所有设置。如果您遗漏了某些内容,Pub/Sub 会将相应设置重置为原始默认值。

      运行 gcloud pubsub topics update 命令,并使用以下示例中提及的所有标志:

      gcloud pubsub topics update TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME\
          --cloud-storage-ingestion-input-format=INPUT_FORMAT\
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
          --cloud-storage-ingestion-match-glob=MATCH_GLOB

      替换以下内容:

      • TOPIC_ID 是主题 ID 或名称。此字段无法更新。

      • BUCKET_NAME:指定现有存储桶的名称。 例如 prod_bucket。 存储桶名称不得包含项目 ID。 如需创建存储桶,请参阅创建存储桶

      • INPUT_FORMAT:指定所提取对象的格式。 可以是 textavropubsub_avro。 如需详细了解这些选项,请参阅 输入格式

      • TEXT_DELIMITER:指定用于将文本对象拆分为 Pub/Sub 消息的分隔符。 此值应为单个字符,并且仅当 INPUT_FORMATtext 时才应设置。 默认值为换行符 (\n)。

        使用 gcloud CLI 指定分隔符时,请密切注意换行符 \n 等特殊字符的处理方式。使用 '\n' 格式可确保正确解读分隔符。如果直接使用 \n(不带引号或转义),则分隔符为 "n"

      • MINIMUM_OBJECT_CREATE_TIME:指定对象创建的最早时间,只有在此时间之后创建的对象才会被提取。 此值应采用世界协调时间 (UTC),格式为 YYYY-MM-DDThh:mm:ssZ。 例如 2024-10-14T08:30:30Z

        0001-01-01T00:00:00Z9999-12-31T23:59:59Z(含)之间的任何日期(过去或未来)均有效。

      • MATCH_GLOB:指定要匹配的 glob 模式,以便提取对象。使用 gcloud CLI 时,如果匹配 glob 包含 * 字符,则必须将 * 字符格式化为转义字符(格式为 \*\*.txt),或者整个匹配 glob 必须用英文引号 "**.txt"'**.txt' 括起来。如需了解 glob 模式支持的语法,请参阅 Cloud Storage 文档

    Cloud Storage 导入主题的配额和限制

    导入主题的发布者吞吐量受主题的发布配额限制。如需了解详情,请参阅 Pub/Sub 配额和限制

    后续步骤