使用 SMT 创建主题

本文档介绍了如何使用单条消息转换 (SMT) 创建 Pub/Sub 主题。

借助主题 SMT,您可以在 Pub/Sub 中直接对消息数据和属性进行轻量级修改。此功能可在消息发布到主题之前实现数据清理、过滤或格式转换。

如需创建具有 SMT 的主题,您可以使用 Trusted Cloud 控制台、Google Cloud CLI、客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得创建具有 SMT 的主题所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建具有 SMT 的主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

创建具有 SMT 的主题需要以下权限:

  • 授予在项目上创建主题的权限: pubsub.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

您可以在项目级层和个别资源级层配置访问权限控制。

创建具有 SMT 的主题

在创建包含 SMT 的主题之前,请查看主题的属性的相关文档。

以下示例假设您要创建具有此用户定义的函数 (UDF) SMT 的主题。如需详细了解 UDF,请参阅 UDF 概览

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

控制台

  1. 在 Trusted Cloud 控制台中,前往 Pub/Sub 主题页面。

    转到“主题”

  2. 点击创建主题

    系统会打开创建主题页面。

  3. 主题 ID 字段中,输入主题 ID。 如需详细了解如何命名主题,请参阅命名准则

  4. 转换下,点击添加转换

  5. 输入函数名称。例如:redactSSN

  6. 如果您不想立即将 SMT 与主题搭配使用,请点击停用转换选项。这样仍会保存 SMT,但不会在消息流经主题时执行。

  7. 输入新的转换。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
  8. Pub/Sub 提供了一个验证函数,可用于验证 SMT。点击验证以验证转换。

  9. 如果您想添加其他转换,请点击添加转换

  10. 如需按特定顺序排列所有 SMT,您可以使用向上和向下箭头。如需移除 SMT,请点击删除按钮。
  11. Pub/Sub 提供了一项测试功能,可让您检查在示例消息上运行 SMT 的结果。如需测试 SMT,请点击测试转换

  12. Test transform 窗口中,选择要测试的函数。

  13. 输入消息窗口中,输入示例消息。

  14. 如果您想添加消息属性,请点击添加属性,然后输入一个或多个键值对。

  15. 点击测试。系统会显示将 SMT 应用于消息后的结果。

  16. 关闭窗口即可停止对示例消息进行 SMT 测试。

  17. 点击创建以创建主题。

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pub/Sub 提供了一个验证函数,可用于验证 SMT。运行 gcloud pubsub message-transforms validate 命令:

    gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE

    替换以下内容:

    • TRANSFORM_FILE:包含单个 SMT 的 YAML 或 JSON 文件的路径。

      以下是 YAML 转换文件的示例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  3. Pub/Sub 提供了一项测试功能,可让您检查对示例消息运行一个或多个 SMT 的结果。运行 gcloud pubsub message-transforms test 命令:

    gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE --message=MESSAGE --attributes=ATTRIBUTES

    替换以下内容:

    • TRANSFORMS_FILE:包含一个或多个 SMT 的 YAML 或 JSON 文件的路径。

      以下是 YAML 转换文件的示例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

    • MESSAGE:用于测试 SMT 的消息正文。

    • ATTRIBUTES:用于测试 SMT 的消息属性。

  4. 如需创建主题,请运行 gcloud pubsub topics create 命令:

    gcloud pubsub topics create TOPIC_ID \
     --message-transforms-file=TRANSFORMS_FILE

    替换以下内容:

    • TOPIC_ID:您要创建的主题的 ID 或名称。 如需了解主题命名准则,请参阅资源名称。 主题的名称是不可变的。

    • TRANSFORMS_FILE:包含一个或多个 SMT 的 YAML 或 JSON 文件的路径。

      以下是 YAML 转换文件的示例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  5. Java

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Java 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    
    import com.google.api.gax.rpc.AlreadyExistsException;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicWithSmtExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicWithSmtExample(projectId, topicId);
      }
    
      public static void createTopicWithSmtExample(String projectId, String topicId)
          throws IOException {
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
    
          Topic topic =
              topicAdminClient.createTopic(
                  Topic.newBuilder()
                      .setName(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created topic with SMT: " + topic.getName());
        } catch (AlreadyExistsException e) {
          System.out.println(topicName + "already exists.");
        }
      }
    }

    Python

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Python 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    request = Topic(name=topic_path, message_transforms=transforms)
    
    topic = publisher.create_topic(request=request)
    
    print(f"Created topic: {topic.name} with SMT")

    Go

    在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Go 设置说明执行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

    如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证

    在运行代码示例之前,请将 GOOGLE_CLOUD_UNIVERSE_DOMAIN 环境变量设置为 s3nsapis.fr

    // Copyright 2025 Google LLC
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    //     https://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    
    package topics
    
    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub"
    )
    
    // createTopicWithSMT creates a topic with a single message transform function applied.
    func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    	transform := pubsub.MessageTransform{
    		Transform: pubsub.JavaScriptUDF{
    			FunctionName: "redactSSN",
    			Code:         code,
    		},
    	}
    	cfg := &pubsub.TopicConfig{
    		MessageTransforms: []pubsub.MessageTransform{transform},
    	}
    	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    
    	fmt.Fprintf(w, "Created topic with message transform: %v\n", t)
    	return nil
    }
    
    

后续步骤