创建主题

在 Pub/Sub 中,主题是代表消息信息流的命名资源。您必须先创建一个主题,然后才能向其发布消息或订阅该主题。Pub/Sub 支持两种主题:标准主题和导入主题。

本文档介绍如何创建 Pub/Sub 标准主题。如果您想详细了解导入主题以及如何创建导入主题,请参阅关于导入主题

如需创建主题,您可以使用 Trusted Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得创建主题所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor(roles/pubsub.editor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建主题需要以下权限:

  • 授予此权限可在项目上创建主题: pubsub.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级层和个别资源级层配置访问权限控制。您可以在一个项目中创建订阅,并将其附加到位于另一个项目中的主题。确保您拥有每个项目所需的权限。

主题的属性

创建或更新主题时,您必须指定其属性。

添加默认订阅

向 Pub/Sub 主题添加默认订阅。 创建主题后,您可以为该主题创建其他订阅。 默认订阅具有以下属性:

  • -sub 的订阅 ID
  • 拉取传送类型
  • 消息保留时长为 7 天
  • 在处于非活跃状态 31 天后过期
  • 10 秒的确认时限
  • 立即重试政策

启用注入

启用此属性后,您可以将来自外部来源的流式数据注入到主题中,以便使用 Trusted Cloud的功能。如需创建用于提取的导入主题,请参阅以下内容:

启用消息保留功能

指定 Pub/Sub 主题在发布后保留消息的时间。超过消息保留时长之后,Pub/Sub 可以随意舍弃消息,无论其确认状态为何。会对发布到主题的所有消息收取消息存储费用

  • 默认值 = 未启用
  • 最小值 = 10 分钟
  • 最大值 = 31 天

将消息数据导出到 BigQuery 中

启用此属性后,您可以创建 BigQuery 订阅,以便在收到消息时将其写入现有 BigQuery 表。您无需配置单独的订阅者客户端。如需详细了解 BigQuery 订阅,请参阅 BigQuery 订阅

将消息数据备份到 Cloud Storage

启用此属性后,您可以创建 Cloud Storage 订阅,以便在收到消息时将其写入现有的 Cloud Storage 表。您无需配置单独的订阅者客户端。如需详细了解 Cloud Storage 订阅,请参阅 Cloud Storage 订阅

转换

借助主题 SMT,您可以在 Pub/Sub 中直接对消息数据和属性进行轻量级修改。此功能可在消息发布到主题之前实现数据清理、过滤或格式转换。

如需详细了解 SMT,请参阅 SMT 概览

Google Cloud-powered encryption key

指定主题使用Google Cloud-powered encryption keys加密。默认情况下,Pub/Sub 使用 Google Cloud-powered encryption keys 加密消息,因此选择此选项可保持默认行为。Google 会自动处理密钥管理和轮换,确保您的消息始终受到最强大的加密保护。此选项无需进一步配置。 如需详细了解 Google Cloud-powered encryption keys,请参阅使用 Google Cloud-powered encryption keys进行默认加密

Cloud KMS 密钥

指定主题是否使用客户管理的加密密钥 (CMEK) 进行加密。Pub/Sub 默认使用 Google Cloud-powered encryption keys 加密消息。如果您指定此选项,Pub/Sub 将通过 CMEK 使用信封加密模式。在此方法中,Cloud KMS 不会对消息进行加密。Cloud KMS 会对 Pub/Sub 为每个主题创建的数据加密密钥 (DEK) 进行加密。Pub/Sub 会使用为相应主题生成的最新 DEK 对消息进行加密。Pub/Sub 会在消息即将传送给订阅者之前对其进行解密。如需详细了解如何创建密钥,请参阅配置消息加密

创建主题

您必须先创建一个主题,然后才能向其发布消息或订阅该主题。

控制台

如需创建主题,请按以下步骤操作:

  1. 在 Trusted Cloud 控制台中,前往 Pub/Sub 创建主题页面。

    前往“创建主题”

  2. 主题 ID 字段中,输入主题 ID。 如需详细了解如何命名主题,请参阅命名准则

  3. 保留添加默认订阅选项。

  4. 可选。请勿选择其他选项。

  5. 点击创建主题

gcloud

如需创建主题,请运行 gcloud pubsub topics create 命令:

gcloud pubsub topics create TOPIC_ID

REST

如需创建主题,请使用 projects.topics.create 方法:

必须使用 Authorization 标头中的访问令牌对请求进行身份验证。如需获取当前应用默认凭据的访问令牌,请运行以下命令:gcloud auth application-default print-access-token

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID
Authorization: Bearer ACCESS_TOKEN

其中:

  • PROJECT_ID 是项目 ID。
  • TOPIC_ID 是主题 ID。

回答:

{
"name": "projects/PROJECT_ID/topics/TOPIC_ID"
}

C++

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id) {
  auto topic = client.CreateTopic(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;

public class CreateTopicSample
{
    public Topic CreateTopic(string projectId, string topicId)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        Topic topic = null;

        try
        {
            topic = publisher.CreateTopic(topicName);
            Console.WriteLine($"Topic {topic.Name} created.");
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            Console.WriteLine($"Topic {topicName} already exists.");
        }
        return topic;
    }
}

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func create(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topic := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
	}
	t, err := client.TopicAdminClient.CreateTopic(ctx, topic)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Topic created: %v\n", t)
	return nil
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    createTopicExample(projectId, topicId);
  }

  public static void createTopicExample(String projectId, String topicId) throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);
      Topic topic = topicAdminClient.createTopic(topicName);
      System.out.println("Created topic: " + topic.getName());
    }
  }
}

Node.js

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopic(topicNameOrId) {
  // Creates a new topic
  await pubSubClient.createTopic(topicNameOrId);
  console.log(`Topic ${topicNameOrId} created.`);
}

Node.ts

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopic(topicNameOrId: string) {
  // Creates a new topic
  await pubSubClient.createTopic(topicNameOrId);
  console.log(`Topic ${topicNameOrId} created.`);
}

PHP

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 */
function create_topic($projectId, $topicName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->createTopic($topicName);

    printf('Topic created: %s' . PHP_EOL, $topic->name());
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

topic = publisher.create_topic(request={"name": topic_path})

print(f"Created topic: {topic.name}")

Ruby

以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin

topic = topic_admin.create_topic name: pubsub.topic_path(topic_id)

puts "Topic #{topic.name} created."

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。