使用 SMT 建立主題

本文說明如何使用單一訊息轉換 (SMT) 建立 Pub/Sub 主題。

主題 SMT 可直接在 Pub/Sub 中輕量修改訊息資料和屬性。這項功能可讓您在訊息發布至主題前,進行資料清理、篩選或格式轉換。

如要建立含有 SMT 的主題,可以使用 Cloud de Confiance 控制台、Google Cloud CLI、用戶端程式庫或 Pub/Sub API。

事前準備

必要角色和權限

如要取得使用 SMT 建立主題所需的權限,請要求管理員為您授予專案的 Pub/Sub 編輯者 (roles/pubsub.editor) 身分與存取權管理角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

這個預先定義的角色具備使用 SMT 建立主題所需的權限。如要查看確切的必要權限,請展開「必要權限」部分:

所需權限

如要使用 SMT 建立主題,必須具備下列權限:

  • 授予專案的建立主題權限: pubsub.topics.create

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您可以在專案層級和個別資源層級設定存取權控管。

使用 SMT 建立主題

使用 SMT 建立主題前,請先參閱主題屬性的說明文件。

下列範例假設您要使用這個使用者定義函式 (UDF) SMT 建立主題。如要進一步瞭解 UDF,請參閱 UDF 總覽

function redactSSN(message, metadata) {
  const data = JSON.parse(message.data);
  delete data['ssn'];
  message.data = JSON.stringify(data);
  return message;
}

控制台

  1. 前往 Cloud de Confiance 控制台的 Pub/Sub「Topics」(主題) 頁面。

    前往「主題」

  2. 按一下「建立主題」

    「建立主題」頁面隨即開啟。

  3. 在「主題 ID」欄位中,輸入主題的 ID。 如要進一步瞭解如何命名主題,請參閱命名規範

  4. 在「轉換」下方,按一下「新增轉換」

  5. 輸入函式名稱。例如 redactSSN

  6. 如果不想立即將 SMT 套用至主題,請點選「停用轉換」選項。這樣做仍會儲存 SMT,但不會在訊息流經主題時執行。

  7. 輸入新的轉換。例如:

    function redactSSN(message, metadata) {
      const data = JSON.parse(message.data);
      delete data['ssn'];
      message.data = JSON.stringify(data);
      return message;
    }
  8. Pub/Sub 提供驗證函式,可讓您驗證 SMT。按一下「驗證」來驗證轉換。

  9. 如要新增其他轉換作業,請按一下「新增轉換作業」

  10. 如要依特定順序排列所有 SMT,可以使用向上鍵和向下鍵。如要移除 SMT,請按一下「刪除」按鈕。
  11. Pub/Sub 提供測試函式,可讓您檢查在範例訊息上執行 SMT 的結果。如要測試 SMT,請按一下「測試轉換作業」

  12. 在「Test transform」視窗中,選取要測試的函式。

  13. 在「輸入訊息」視窗中輸入範例訊息。

  14. 如要新增訊息屬性,請按一下「新增屬性」,然後輸入一或多個鍵/值組合。

  15. 按一下「測試」。系統會顯示將 SMT 套用至郵件的結果。

  16. 關閉視窗,停止測試範例郵件的 SMT。

  17. 按一下「建立」,建立主題。

gcloud

  1. In the Cloud de Confiance console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Cloud de Confiance console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Pub/Sub 提供驗證函式,可讓您驗證 SMT。執行 gcloud pubsub message-transforms validate 指令:

    gcloud pubsub message-transforms validate --message-transform-file=TRANSFORM_FILE

    更改下列內容:

    • TRANSFORM_FILE:包含單一 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案的範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  3. Pub/Sub 提供測試功能,可讓您檢查對範例訊息執行一或多個 SMT 的結果。執行 gcloud pubsub message-transforms test 指令:

    gcloud pubsub message-transforms test --message-transforms-file=TRANSFORMS_FILE --message=MESSAGE --attributes=ATTRIBUTES

    更改下列內容:

    • TRANSFORMS_FILE:包含一或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

    • MESSAGE:用於測試 SMT 的訊息內文。

    • ATTRIBUTES:用於測試 SMT 的訊息屬性。

  4. 如要建立主題,請執行 gcloud pubsub topics create 指令:

    gcloud pubsub topics create TOPIC_ID \
     --message-transforms-file=TRANSFORMS_FILE

    取代下列項目:

    • TOPIC_ID:要建立的主題 ID 或名稱。 如要瞭解主題命名規範,請參閱「資源名稱」。主題名稱無法變更。

    • TRANSFORMS_FILE:包含一或多個 SMT 的 YAML 或 JSON 檔案路徑。

      以下是 YAML 轉換檔案範例:

      - javascriptUdf:
          code: >
              function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
              }
          functionName: redactSSN

  5. Java

    在試用這個範例之前,請先按照JavaPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Java API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    執行程式碼範例前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN 環境變數設為 s3nsapis.fr

    
    import com.google.api.gax.rpc.AlreadyExistsException;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.pubsub.v1.JavaScriptUDF;
    import com.google.pubsub.v1.MessageTransform;
    import com.google.pubsub.v1.Topic;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    
    public class CreateTopicWithSmtExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        String topicId = "your-topic-id";
    
        createTopicWithSmtExample(projectId, topicId);
      }
    
      public static void createTopicWithSmtExample(String projectId, String topicId)
          throws IOException {
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // UDF that removes the 'ssn' field, if present
        String code =
            "function redactSSN(message, metadata) {"
                + "  const data = JSON.parse(message.data);"
                + "  delete data['ssn'];"
                + "  message.data = JSON.stringify(data);"
                + "  return message;"
                + "}";
        String functionName = "redactSSN";
    
        JavaScriptUDF udf =
            JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
        MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
    
          Topic topic =
              topicAdminClient.createTopic(
                  Topic.newBuilder()
                      .setName(topicName.toString())
                      // Add the UDF message transform
                      .addMessageTransforms(transform)
                      .build());
    
          System.out.println("Created topic with SMT: " + topic.getName());
        } catch (AlreadyExistsException e) {
          System.out.println(topicName + "already exists.");
        }
      }
    }

    Python

    在試用這個範例之前,請先按照PythonPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Python API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    執行程式碼範例前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN 環境變數設為 s3nsapis.fr

    from google.cloud import pubsub_v1
    from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic
    
    # TODO(developer)
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    code = """function redactSSN(message, metadata) {
                const data = JSON.parse(message.data);
                delete data['ssn'];
                message.data = JSON.stringify(data);
                return message;
                }"""
    udf = JavaScriptUDF(code=code, function_name="redactSSN")
    transforms = [MessageTransform(javascript_udf=udf)]
    
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    
    request = Topic(name=topic_path, message_transforms=transforms)
    
    topic = publisher.create_topic(request=request)
    
    print(f"Created topic: {topic.name} with SMT")

    Go

    在試用這個範例之前,請先按照GoPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Go API 參考說明文件

    如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

    執行程式碼範例前,請將 GOOGLE_CLOUD_UNIVERSE_DOMAIN 環境變數設為 s3nsapis.fr

    // Copyright 2025 Google LLC
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    //     https://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    
    package topics
    
    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    )
    
    // createTopicWithSMT creates a topic with a single message transform function applied.
    func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    	defer client.Close()
    
    	code := `function redactSSN(message, metadata) {
    			const data = JSON.parse(message.data);
    			delete data['ssn'];
    			message.data = JSON.stringify(data);
    			return message;
    		}`
    	transform := &pubsubpb.MessageTransform{
    		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
    			JavascriptUdf: &pubsubpb.JavaScriptUDF{
    				FunctionName: "redactSSN",
    				Code:         code,
    			},
    		},
    	}
    
    	topic := &pubsubpb.Topic{
    		Name:              fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    		MessageTransforms: []*pubsubpb.MessageTransform{transform},
    	}
    
    	topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
    	if err != nil {
    		return fmt.Errorf("CreateTopic: %w", err)
    	}
    
    	fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
    	return nil
    }
    
    

後續步驟