创建主题

在 Pub/Sub 中,主题是代表消息信息流的命名资源。您必须先创建一个主题,然后才能向其发布消息或订阅该主题。Pub/Sub 支持两种主题:标准主题和导入主题。

本文档介绍了如何创建 Pub/Sub 标准主题。如需详细了解导入主题以及如何创建导入主题,请参阅导入主题简介

如需创建主题,您可以使用 Trusted Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得创建主题所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor(roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建主题需要以下权限:

  • 授予此权限可在项目中创建主题: pubsub.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级别和各个资源级别配置访问权限控制。您可以在一个项目中创建订阅,并将其附加到位于其他项目中的主题。确保您对每个项目拥有所需的权限。

主题的属性

创建或更新主题时,您必须指定其属性。

添加默认订阅

向 Pub/Sub 主题添加默认订阅。 您可以在创建主题后为该主题创建其他订阅。默认订阅具有以下属性:

  • -sub 的订阅 ID
  • 拉取传送类型
  • 消息保留时长为 7 天
  • 在处于非活跃状态 31 天后过期
  • 确认时限为 10 秒
  • 立即重试政策

启用注入

启用此属性后,您可以将来自外部来源的流式数据注入到主题中,以便使用 Trusted Cloud的功能。如需创建用于提取数据的导入主题,请参阅以下内容:

启用消息保留功能

指定 Pub/Sub 主题在发布后保留消息的时间。超过消息保留时长后,Pub/Sub 可以随意舍弃消息,无论其确认状态为何。系统会对发布到主题的所有消息收取消息存储费用

  • 默认 = 未启用
  • 最小值 = 10 分钟
  • 最大值 = 31 天

将消息数据导出到 BigQuery 中

启用此属性后,您可以创建 BigQuery 订阅,以便在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅方客户端。如需详细了解 BigQuery 订阅,请参阅 BigQuery 订阅

将消息数据备份到 Cloud Storage

启用此属性后,您可以创建 Cloud Storage 订阅,以便在收到消息时将其写入现有的 Cloud Storage 表。您无需配置单独的订阅方客户端。如需详细了解 Cloud Storage 订阅,请参阅 Cloud Storage 订阅

转换

借助主题 SMT,您可以直接在 Pub/Sub 中对消息数据和属性进行轻量级修改。借助此功能,您可以在消息发布到主题之前进行数据清理、过滤或格式转换。

如需详细了解 SMT,请参阅 SMT 概览

Google Cloud-powered encryption key

指定该主题是使用Google Cloud-powered encryption keys加密的。默认情况下,Pub/Sub 使用 Google Cloud-powered encryption keys 加密消息,因此选择此选项会保持默认行为。Google 会自动处理密钥管理和轮替,确保您的消息始终受最强大的可用加密保护。此选项无需进一步配置。如需详细了解 Google Cloud-powered encryption keys,请参阅使用 Google Cloud-powered encryption keys的默认加密

Cloud KMS 密钥

指定主题是否使用客户管理的加密密钥 (CMEK) 进行加密。默认情况下,Pub/Sub 使用 Google Cloud-powered encryption keys 加密消息。如果您指定此选项,Pub/Sub 将通过 CMEK 使用信封加密模式。在此方法中,Cloud KMS 不会对消息进行加密。而是由 Cloud KMS 对 Pub/Sub 为每个主题创建的数据加密密钥 (DEK) 进行加密。Pub/Sub 会使用为主题生成的最新 DEK 对消息进行加密。在消息即将传送给订阅者之前,Pub/Sub 会对消息进行解密。如需详细了解如何创建密钥,请参阅配置消息加密

创建主题

您必须先创建一个主题,然后才能向其发布消息或订阅该主题。

控制台

如需创建主题,请按以下步骤操作:

  1. 在 Trusted Cloud 控制台中,前往 Pub/Sub 创建主题页面。

    前往“创建主题”页面

  2. 主题 ID 字段中,输入主题 ID。 如需详细了解如何命名主题,请参阅命名准则

  3. 保留添加默认订阅选项。

  4. 可选。请勿选择其他选项。

  5. 点击创建主题

gcloud

如需创建主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create TOPIC_ID

REST

如需创建主题,请使用 projects.topics.create 方法:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN
    

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。
  • 回答:

    {
     "name": "projects/PROJECT_ID/topics/TOPIC_ID"
    }

    C++

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    namespace pubsub = ::google::cloud::pubsub;
    namespace pubsub_admin = ::google::cloud::pubsub_admin;
    [](pubsub_admin::TopicAdminClient client, std::string project_id,
       std::string topic_id) {
      auto topic = client.CreateTopic(
          pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
      // Note that kAlreadyExists is a possible error when the library retries.
      if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
        std::cout << "The topic already exists\n";
        return;
      }
      if (!topic) throw std::move(topic).status();
    
      std::cout << "The topic was successfully created: " << topic->DebugString()
                << "\n";
    }

    C#

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    
    using Google.Cloud.PubSub.V1;
    using Grpc.Core;
    using System;
    
    public class CreateTopicSample
    {
        public Topic CreateTopic(string projectId, string topicId)
        {
            PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
            var topicName = TopicName.FromProjectTopic(projectId, topicId);
            Topic topic = null;
    
            try
            {
                topic = publisher.CreateTopic(topicName);
                Console.WriteLine($"Topic {topic.Name} created.");
            }
            catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
            {
                Console.WriteLine($"Topic {topicName} already exists.");
            }
            return topic;
        }
    }

    Go

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub"
    )
    
    func create(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	t, err := client.CreateTopic(ctx, topicID)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    	fmt.Fprintf(w, "Topic created: %v\n", t)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicExample(projectId, topicId);
      }
    
      public static void createTopicExample(String projectId, String topicId) throws IOException {
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          TopicName topicName = TopicName.of(projectId, topicId);
          Topic topic = topicAdminClient.createTopic(topicName);
          System.out.println("Created topic: " + topic.getName());
        }
      }
    }

    Node.js

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    Node.js

    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub} from '@google-cloud/pubsub';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function createTopic(topicNameOrId: string) {
      // Creates a new topic
      await pubSubClient.createTopic(topicNameOrId);
      console.log(`Topic ${topicNameOrId} created.`);
    }

    PHP

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 PHP 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub PHP API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    use Google\Cloud\PubSub\PubSubClient;
    
    /**
     * Creates a Pub/Sub topic.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     */
    function create_topic($projectId, $topicName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->createTopic($topicName);
    
        printf('Topic created: %s' . PHP_EOL, $topic->name());
    }

    Python

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    from google.cloud import pubsub_v1
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    topic = publisher.create_topic(request={"name": topic_path})
    
    print(f"Created topic: {topic.name}")

    Ruby

    在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    # topic_id = "your-topic-id"
    
    pubsub = Google::Cloud::Pubsub.new
    
    topic = pubsub.create_topic topic_id
    
    puts "Topic #{topic.name} created."

    后续步骤

    Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。