יצירת נושא באמצעות כלי SMT

במסמך הזה מוסבר איך ליצור נושא Pub/Sub עם Single Message Transforms (SMTs).

נושאי SMT מאפשרים לבצע שינויים קלים בנתוני ההודעות ובמאפיינים שלהן ישירות ב-Pub/Sub. התכונה הזו מאפשרת טיוב נתונים, סינון או המרת פורמט לפני שההודעות מתפרסמות בנושא.

כדי ליצור נושא עם SMT, אפשר להשתמש במסוף, ב-Google Cloud CLI, בספריית הלקוח או ב-Pub/Sub API. Cloud de Confiance

לפני שמתחילים

תפקידים והרשאות נדרשים

כדי לקבל את ההרשאה שנדרשת ליצירת נושא עם SMT, צריך לבקש מהאדמין להקצות לכם את תפקיד ה-IAM‏ Pub/Sub Editor (roles/pubsub.editor) בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

התפקיד המוגדר מראש הזה כולל את ההרשאה pubsub.topics.create, שנדרשת כדי ליצור נושא עם SMT.

יכול להיות שתוכלו לקבל את ההרשאה הזו גם בתפקידים בהתאמה אישית או בתפקידים אחרים שמוגדרים מראש.

יצירת נושא באמצעות כלי SMT

לפני שיוצרים נושא עם כלים לניהול נושאים, כדאי לעיין במסמכי התיעוד בנושא מאפיינים של נושא.

כדי ליצור Pub/Sub עם SMT אחד או יותר, מבצעים את השלבים הבאים. אפשר להפעיל עד 5 נושאי חיפוש לכל נושא.

המסוף

  1. נכנסים לדף Topics של Pub/Sub במסוף Cloud de Confiance .

    לדף Topics

  2. לוחצים על יצירת נושא.

  3. בשדה Topic ID (מזהה הנושא), מזינים מזהה לנושא. מידע נוסף על מתן שמות לנושאים זמין בהנחיות למתן שמות.

  4. בקטע טרנספורמציות, לוחצים על הוספת טרנספורמציה.

  5. בוחרים את סוג הטרנספורמציה. מידע נוסף על סוגי ה-SMT הנתמכים זמין במאמר סוגי SMT.

  6. קובעים את מאפייני ההגדרה של ה-SMT. קבוצת המאפיינים תלויה בסוג ה-SMT. מידע נוסף זמין במאמרי העזרה בנושא סוג ה-SMT הזה.

  7. זה שינוי אופציונלי. כדי לאמת את ה-SMT, לוחצים על אימות. אם ה-SMT תקין, מוצגת ההודעה "Validation passed". אחרת, תוצג הודעת שגיאה.

  8. כדי להוסיף עוד טרנספורמציה, לוחצים על הוספת טרנספורמציה וחוזרים על השלבים הקודמים.

    כדי לסדר את ה-SMT בסדר מסוים, לוחצים על העברה למעלה או על העברה למטה. כדי להסיר SMT, לוחצים על מחיקה.

  9. זה שינוי אופציונלי. כדי לבדוק SMT בהודעה לדוגמה, פועלים לפי השלבים הבאים:

    1. לוחצים על בדיקת טרנספורמציות.

    2. בחלון Test transform, בוחרים את הפונקציה שרוצים לבדוק.

    3. בחלון הזנת הודעה, מזינים הודעה לדוגמה.

    4. כדי להוסיף מאפיין להודעה, לוחצים על הוספת מאפיין ומזינים את המפתח והערך של המאפיין. אפשר להוסיף כמה מאפיינים.

    5. לוחצים על בדיקה. התוצאה של החלת ה-SMT על ההודעה מוצגת בקטע הודעת הפלט.

    6. כדי לסגור את החלון Test transforms, לוחצים על Close.

    אם יוצרים יותר מ-SMT אחד, אפשר לבדוק את כל רצף ההמרות באופן הבא:

    1. בודקים את ה-SMT הראשון ברצף, כמו שמתואר בשלבים הקודמים.
    2. בוחרים את ה-SMT הבא. הודעת הקלט מאוכלסת מראש עם הודעת הפלט מהבדיקה הקודמת.
    3. ממשיכים לבדוק את כללי ה-SMT לפי הסדר, כדי לוודא שכל הרצף פועל כמו שצריך.
  10. כדי ליצור את הנושא, לוחצים על יצירה.

gcloud

  1. במסוף Cloud de Confiance , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Cloud de Confiance המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. יוצרים קובץ YAML או JSON שמגדיר SMT אחד או יותר. הגדרת ה-YAML או ה-JSON תלויה בסוג ה-SMT. מידע נוסף זמין במאמר בנושא סוגים של SMT.

    אם הקובץ כולל יותר מ-SMT אחד, ‏ Pub/Sub מבצע אותם לפי הסדר שבו הם מופיעים.

  3. זה שינוי אופציונלי. כדי לאמת SMT, מריצים את הפקודה gcloud pubsub message-transforms validate:

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    מחליפים את מה שכתוב בשדות הבאים:

    • TRANSFORM_FILE: הנתיב לקובץ YAML או JSON שמגדיר SMT יחיד. אם יוצרים כמה SMT, צריך לאמת כל אחד מהם בנפרד.
  4. זה שינוי אופציונלי. כדי לבדוק SMT אחד או יותר על הודעת Pub/Sub לדוגמה, מריצים את הפקודה gcloud pubsub message-transforms test:

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    מחליפים את מה שכתוב בשדות הבאים:

    • TRANSFORMS_FILE: הנתיב לקובץ YAML או JSON שמגדיר SMT אחד או יותר.
    • MESSAGE: גוף ההודעה לדוגמה.
    • ATTRIBUTES: אופציונלי. רשימה מופרדת בפסיקים של מאפייני הודעה. כל מאפיין הוא צמד מפתח/ערך בפורמט KEY="VALUE".

    הפקודה מפעילה את ה-SMT לפי הסדר, והפלט מכל SMT משמש כקלט ל-SMT הבא. הפקודה מחזירה את התוצאות של כל שלב.

  5. כדי ליצור את הנושא, מריצים את הפקודה gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID \
      --message-transforms-file=TRANSFORMS_FILE
    

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: המזהה או השם של הנושא שרוצים ליצור. הנחיות למתן שם לנושא מופיעות במאמר בנושא שמות משאבים. השם של נושא לא ניתן לשינוי.
    • TRANSFORMS_FILE: הנתיב לקובץ YAML או JSON שמגדיר SMT אחד או יותר.

C#‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C# ‎ במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using System;

public class CreateTopicWithSingleMessageTransformSample
{
    public Topic CreateTopicWithSingleMessageTransform(string projectId, string topicId)
    {
        PublisherServiceApiClient publisher = PublisherServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);

        MessageTransform removeSSNTransform = new MessageTransform
        {
            JavascriptUdf = new JavaScriptUDF
            {
                FunctionName = "redactSsn",
                Code = "function redactSsn(message, metadata) {"
                + "   const data = JSON.parse(message.data);"
                + "   delete data['ssn'];"
                + "   message.data = JSON.stringify(data);"
                + "   return message;"
                + "}"
            },
        };

        Topic topic = publisher.CreateTopic(new Topic()
        {
            TopicName = topicName,
            MessageTransforms = { removeSSNTransform }
        });
        Console.WriteLine($"Topic {topic.Name} created with SMT.");

        return topic;
    }
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התחלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSmtExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    createTopicWithSmtExample(projectId, topicId);
  }

  public static void createTopicWithSmtExample(String projectId, String topicId)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created topic with SMT: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(topicName + "already exists.");
    }
  }
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של ה-API בשפת Python של Pub/Sub.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(name=topic_path, message_transforms=transforms)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with SMT")

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createTopicWithSMT creates a topic with a single message transform function applied.
func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`
	transform := &pubsubpb.MessageTransform{
		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
			JavascriptUdf: &pubsubpb.JavaScriptUDF{
				FunctionName: "redactSSN",
				Code:         code,
			},
		},
	}

	topic := &pubsubpb.Topic{
		Name:              fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		MessageTransforms: []*pubsubpb.MessageTransform{transform},
	}

	topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}

	fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
	return nil
}

המאמרים הבאים