使用 SMT 建立訂閱

本文說明如何使用單一訊息轉換 (SMT) 建立 Pub/Sub 訂閱項目。

訂閱項目 SMT 可直接在 Pub/Sub 中輕量修改訊息資料和屬性。這項功能可在訊息傳送至訂閱端之前,進行資料清理、篩選或格式轉換。

如要建立含有 SMT 的訂閱項目,可以使用 Cloud de Confiance 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API。

事前準備

必要角色和權限

如要取得使用 SMT 建立訂閱項目所需的權限,請要求管理員授予您專案的 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備使用 SMT 建立訂閱項目所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要使用 SMT 建立訂閱項目,必須具備下列權限:

  • 授予專案的建立訂閱項目權限: pubsub.subscriptions.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

視訂閱類型而定,您可能需要額外權限。 如要查看確切的權限清單,請參閱討論建立特定訂閱項目的文件。舉例來說,如果您要使用 SMT 建立 BigQuery 訂閱項目,請參閱「建立 BigQuery 訂閱項目」頁面。

如果您在與主題不同的專案中建立訂閱項目,則必須在包含主題的專案中,將 roles/pubsub.subscriber 角色授予包含訂閱項目的專案主體。

您可以在專案層級和個別資源層級設定存取權控管。

建立含有 SMT 的訂閱項目

使用 SMT 建立訂閱項目前,請先參閱「訂閱項目的屬性」說明文件。

下列範例假設您要使用這個使用者定義函式 (UDF) SMT 建立訂閱項目。如要進一步瞭解 UDF,請參閱 UDF 總覽

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

控制台

  1. 前往 Cloud de Confiance 控制台的 Pub/Sub「Subscriptions」(訂閱項目) 頁面。

    前往「訂閱」頁面

  2. 按一下「Create Subscription」 (建立訂閱項目)

    「建立訂閱」頁面隨即開啟。

  3. 在「Subscription ID」(訂閱 ID) 欄位中,輸入訂閱 ID。 如要進一步瞭解如何命名訂閱項目,請參閱命名規範

  4. 在「轉換」下方,按一下「新增轉換」

  5. 輸入函式名稱。例如 redactSSN

  6. 如果不想立即將 SMT 搭配訂閱項目使用,請按一下「停用轉換」選項。這樣做仍會儲存 SMT,但不會在訊息流經訂閱項目時執行。

  7. 輸入新的轉換。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
  8. Pub/Sub 提供驗證函式,可讓您驗證 SMT。按一下「驗證」來驗證轉換。

  9. 如要新增其他轉換作業,請按一下「新增轉換作業」

  10. 如要依特定順序排列所有 SMT,可以使用向上鍵和向下鍵。如要移除 SMT,請按一下「刪除」按鈕。
  11. Pub/Sub 提供測試函式,可讓您檢查在範例訊息上執行 SMT 的結果。如要測試 SMT,請按一下「測試轉換作業」

  12. 在「Test transform」視窗中,選取要測試的函式。

  13. 在「輸入訊息」視窗中輸入範例訊息。

  14. 如要新增訊息屬性,請按一下「新增屬性」,然後輸入一或多個鍵/值組合。

  15. 按一下「測試」。系統會顯示將 SMT 套用至郵件的結果。

  16. 關閉視窗,停止測試範例郵件的 SMT。

  17. 按一下「建立」即可建立訂閱項目。

gcloud

  1. In the Cloud de Confiance console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Cloud de Confiance console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pub/Sub 提供驗證函式,可讓您驗證 SMT。執行 gcloud pubsub message-transforms validate 指令:

    gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE

    更改下列內容:

    • TRANSFORM_FILE:包含單一 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案的範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  3. Pub/Sub 提供測試功能,可讓您檢查對範例訊息執行一或多個 SMT 的結果。執行 gcloud pubsub message-transforms test 指令:

    gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE --message=MESSAGE --attributes=ATTRIBUTES

    更改下列內容:

    • TRANSFORMS_FILE:包含一或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

    • MESSAGE:用於測試 SMT 的訊息內文。

    • ATTRIBUTES:用於測試 SMT 的訊息屬性。

  4. 如要建立訂閱項目,請執行 gcloud pubsub subscriptions create 指令:

    gcloud pubsub subscriptions create SUBSCRIPTION_ID \
        --topic=TOPIC_NAME \
        --message-transforms-file=TRANSFORMS_FILE

    取代下列項目:

    • SUBSCRIPTION_ID:要建立的訂閱項目 ID 或名稱。 如要瞭解如何命名訂閱項目,請參閱「資源名稱」。訂閱項目名稱無法變更。

    • TOPIC_NAME:要訂閱的主題名稱,格式為 projects/PROJECT_ID/topics/TOPIC_ID

    • TRANSFORMS_FILE:包含一或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  5. Java

    在試用這個範例之前,請先按照JavaPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Java API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    執行程式碼範例前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN 環境變數設為 s3nsapis.fr

    import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.ProjectSubscriptionName;
    import com.google.pubsub.v1.ProjectTopicName;
    import com.google.pubsub.v1.Subscription;
    import java.io.IOException;
    
    public class CreateSubscriptionWithSmtExample {
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
        String subscriptionId = "your-subscription-id";
    
        createSubscriptionWithSmtExample(projectId, topicId, subscriptionId);
      }
    
      public static void createSubscriptionWithSmtExample(
          String projectId, String topicId, String subscriptionId) throws IOException {
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
    
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
    
          ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
          ProjectSubscriptionName subscriptionName =
              ProjectSubscriptionName.of(projectId, subscriptionId);
    
          Subscription subscription =
              subscriptionAdminClient.createSubscription(
                  Subscription.newBuilder()
                      .setName(subscriptionName.toString())
                      .setTopic(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created subscription with SMT: " + subscription.getAllFields());
        }
      }
    }

    Python

    在試用這個範例之前,請先按照PythonPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Python API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    執行程式碼範例前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN 環境變數設為 s3nsapis.fr

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform
    
    # TODO(developer): Choose an existing topic.
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # subscription_id = "your-subscription-id"
    
    publisher = pubsub_v1.PublisherClient()
    subscriber = pubsub_v1.SubscriberClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    subscription_path = subscriber.subscription_path(project_id, subscription_id)
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    with subscriber:
        subscription = subscriber.create_subscription(
            request={
                "name": subscription_path,
                "topic": topic_path,
                "message_transforms": transforms,
            }
        )
        print(f"Created subscription with SMT: {subscription}")

    Go

    在試用這個範例之前,請先按照GoPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Go API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    執行程式碼範例前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN 環境變數設為 s3nsapis.fr

    // Copyright 2025 Google LLC
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    //     https://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    
    package subscriptions
    
    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    // createSubscriptionWithSMT creates a subscription with a single message transform function applied.
    func createSubscriptionWithSMT(w io.Writer, projectID, topicID, subID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	// subID := "my-sub"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    
    	transform := &pubsubpb.MessageTransform{
    		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
    			JavascriptUdf: &pubsubpb.JavaScriptUDF{
    				FunctionName: "redactSSN",
    				Code:         code,
    			},
    		},
    	}
    
    	sub := &pubsubpb.Subscription{
    		Name:              fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subID),
    		Topic:             fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    		MessageTransforms: []*pubsubpb.MessageTransform{transform},
    	}
    	sub, err = client.SubscriptionAdminClient.CreateSubscription(ctx, sub)
    	if err != nil {
    		return fmt.Errorf("CreateSubscription: %w", err)
    	}
    	fmt.Fprintf(w, "Created subscription with message transform: %v\n", sub)
    	return nil
    }
    
    

SMT 如何與其他訂閱功能互動

如果訂閱項目同時使用 SMT 和 Pub/Sub 的內建篩選器,系統會先套用篩選器,再套用 SMT。這會造成下列影響:

  • 如果 SMT 變更了訊息屬性,Pub/Sub 篩選器就不會套用至新的屬性集。
  • 如果訊息遭到 Pub/Sub 篩選器篩除,系統就不會套用 SMT。

如果 SMT 篩除郵件,請注意這對監控訂閱項目待處理事項的影響。如果將訂閱項目饋送至 Dataflow 管道,請勿使用 SMT 篩除訊息,否則會中斷 Dataflow 的自動調度資源功能。

後續步驟