使用 SMT 创建订阅

本文档介绍了如何使用单条消息转换 (SMT) 创建 Pub/Sub 订阅。

借助订阅 SMT,您可以直接在 Pub/Sub 内对消息数据及属性进行轻量级修改。此功能支持在将消息传送给订阅者客户端之前进行数据清理、过滤或格式转换。

如需使用 SMT 创建订阅,您可以使用 Cloud de Confiance 控制台、Google Cloud CLI、 客户端库或 Pub/Sub API。

准备工作

所需的角色和权限

如需获得使用 SMT 创建订阅所需的权限,请让您的管理员为您授予项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色可提供 使用 SMT 创建订阅所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

使用 SMT 创建订阅需要以下权限:

  • 授予在项目上创建订阅的权限: pubsub.subscriptions.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

根据订阅类型,您可能需要其他权限。 如需了解确切的权限列表,请参阅讨论创建特定订阅的文档。例如,如果您要使用 SMT 创建 BigQuery 订阅,请参阅 创建 BigQuery 订阅

如果您在与主题不同的项目中创建订阅,则必须在包含主题的项目中向包含订阅的项目的正文授予 roles/pubsub.subscriber 角色。

您可以在项目级层和个别资源级层配置访问权限控制。

使用 SMT 创建订阅

在使用 SMT 创建订阅之前,请查看 订阅的属性文档。

如需使用一个或多个 SMT 创建 Pub/Sub 订阅,请执行以下步骤。每个订阅最多可以启用 5 个 SMT。

控制台

  1. 在 Cloud de Confiance 控制台中,前往 Pub/Sub 订阅 页面。

    前往订阅页面

  2. 点击创建订阅

  3. 订阅 ID 字段中,输入订阅的 ID。如需详细了解如何命名订阅,请参阅 命名准则

  4. 转换下,点击添加转换

  5. 选择转换类型 。如需详细了解受支持的 SMT 类型,请参阅 SMT 类型

  6. 设置 SMT 的配置属性。属性集取决于 SMT 的类型。如需了解详情,请参阅相应 SMT 类型的文档。

  7. 可选。如需验证 SMT,请点击验证 。如果 SMT 有效,系统会显示 消息 "Validation passed"。否则,系统会显示错误消息。

  8. 如需添加其他转换,请点击添加转换 ,然后重复上述步骤。

    如需按特定顺序排列 SMT,请点击 上移下移。如需移除 SMT,请点击 删除

  9. 可选。如需在示例消息上测试 SMT,请执行以下步骤:

    1. 点击测试转换

    2. 测试转换 窗口中,选择要测试的函数。

    3. 输入消息 窗口中,输入示例消息。

    4. 如需向消息添加属性,请点击添加属性 ,然后输入属性的键和值。您可以添加多个属性。

    5. 点击测试 。将 SMT 应用于消息的结果会显示在输出消息 下。

    6. 如需关闭测试转换 窗口, 请点击 关闭

    如果您创建了多个 SMT,则可以按如下方式测试整个转换序列:

    1. 按照上述步骤测试序列中的第一个 SMT。
    2. 选择下一个 SMT。输入消息会预先填充上一次测试的输出消息。
    3. 继续按顺序测试 SMT,确保整个序列按预期运行。
  10. 点击创建 以创建订阅。

gcloud

  1. 在 Cloud de Confiance 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在控制台的底部启动,并显示命令行提示符。 Cloud de Confiance Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟来完成初始化。

  2. 创建用于定义一个或多个 SMT 的 YAML 或 JSON 文件。YAML 或 JSON 定义取决于 SMT 的类型。如需了解详情,请参阅 SMT 类型

    如果该文件包含多个 SMT,Pub/Sub 会按列出的顺序执行这些 SMT。

  3. 可选。如需验证 SMT,请运行 gcloud pubsub message-transforms validate 命令:

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    替换以下内容:

    • TRANSFORM_FILE:用于定义单个 SMT 的 YAML 或 JSON 文件的路径 。如果您要创建多个 SMT,则必须单独验证它们。
  4. 可选。如需在示例 Pub/Sub 消息上测试一个或多个 SMT,请运行 gcloud pubsub message-transforms test 命令:

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    替换以下内容:

    • TRANSFORMS_FILE:用于定义一个或多个 SMT 的 YAML 或 JSON 文件 的路径。
    • MESSAGE:示例消息的正文。
    • ATTRIBUTES:可选。以英文逗号分隔的消息属性列表。每个属性都是一个键值对,格式为 KEY="VALUE"

    该命令会按顺序执行 SMT,并将每个 SMT 的输出用作下一个 SMT 的输入。该命令会输出每个步骤的结果。

  5. 如需创建订阅,请运行 gcloud pubsub subscriptions create 命令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=projects/PROJECT_ID/topics/TOPIC_ID \
        --message-transforms-file=TRANSFORMS_FILE
    

    替换以下内容:

    • SUBSCRIPTION_ID:您要创建的 订阅的 ID 或名称。如需了解如何命名 订阅的准则,请参阅 资源名称。订阅的名称不可变。

    • PROJECT_ID:包含 主题的项目的 ID。

    • TOPIC_ID:要订阅的主题的 ID。

    • TRANSFORMS_FILE:用于定义一个或多个 SMT 的 YAML 或 JSON 文件的路径。

C#

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档


using Google.Cloud.PubSub.V1;
using System;

public class CreateSubscriptionWithSingleMessageTransformSample
{
    public Subscription CreateSubscriptionWithSingleMessageTransform(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);

        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        MessageTransform removeSSNTransform = new MessageTransform
        {
            JavascriptUdf = new JavaScriptUDF
            {
                FunctionName = "redactSsn",
                Code = "function redactSsn(message, metadata) {"
                + "   const data = JSON.parse(message.data);"
                + "   delete data['ssn'];"
                + "   message.data = JSON.stringify(data);"
                + "   return message;"
                + "}"
            },
        };

        var subscription = subscriber.CreateSubscription(new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            MessageTransforms = { removeSSNTransform },
        });
        Console.WriteLine($"Subscription {subscription.Name} created with SMT.");

        return subscription;
    }
}

Java

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithSmtExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithSmtExample(
      String projectId, String topicId, String subscriptionId) throws IOException {

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();

    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created subscription with SMT: " + subscription.getAllFields());
    }
  }
}

Python

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "message_transforms": transforms,
        }
    )
    print(f"Created subscription with SMT: {subscription}")

Go

以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅 迁移到 v2 的指南。 如需查看 v1 代码示例列表,请参阅 已废弃的代码示例

在尝试此示例之前,请按照 《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createSubscriptionWithSMT creates a subscription with a single message transform function applied.
func createSubscriptionWithSMT(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`

	transform := &pubsubpb.MessageTransform{
		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
			JavascriptUdf: &pubsubpb.JavaScriptUDF{
				FunctionName: "redactSSN",
				Code:         code,
			},
		},
	}

	sub := &pubsubpb.Subscription{
		Name:              fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID),
		Topic:             fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		MessageTransforms: []*pubsubpb.MessageTransform{transform},
	}
	sub, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
	return nil
}

SMT 如何与其他订阅功能交互

使用订阅 SMT 时,请考虑以下几点。

过滤

如果您的订阅同时使用 SMT 和 Pub/Sub 的 内置过滤条件,则系统会在应用 SMT 之前应用过滤条件 。这会产生以下影响:

  • 如果您的 SMT 更改了消息属性,则 Pub/Sub 过滤条件不会应用于新的属性集。
  • 您的 SMT 不会应用于被 Pub/Sub 过滤条件过滤掉的消息。
  • 如果您的 SMT 过滤掉消息,请注意对 监控订阅积压的影响。
  • 如果您将订阅连接到 Dataflow 流水线, 请勿使用订阅 SMT 过滤掉消息,因为这会中断 Dataflow 的自动扩缩。

消息排序

如果您在启用了排序的订阅上定义 SMT,并且执行 SMT 时抛出错误,则系统不会将具有相同排序键的后续消息传送给订阅者。为避免此问题,请在订阅上设置 死信主题,以 从消息积压中移除未处理的消息。

后续步骤