Creare un argomento con SMT

Questo documento spiega come creare un argomento Pub/Sub con Single Message Transforms (SMT).

Le SMT degli argomenti consentono modifiche leggere ai dati e agli attributi dei messaggi direttamente in Pub/Sub. Questa funzionalità consente la pulizia dei dati, il filtraggio o la conversione del formato prima che i messaggi vengano pubblicati nell'argomento.

Per creare un argomento con SMT, puoi utilizzare la Cloud de Confiance console, Google Cloud CLI, la libreria client o l'API Pub/Sub.

Prima di iniziare

Ruoli e autorizzazioni richiesti

Per ottenere le autorizzazioni necessarie per creare un argomento con SMT, chiedi all'amministratore di concederti il ruolo IAM Pub/Sub Editor (roles/pubsub.editor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per creare un argomento con SMT. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per creare un argomento con SMT sono necessarie le seguenti autorizzazioni:

  • Concedi l'autorizzazione per creare un argomento sul progetto: pubsub.topics.create

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Puoi configurare il controllo dell'accesso a livello di progetto e a livello di singola risorsa.

Crea un argomento con SMT

Prima di creare un argomento con SMT, consulta la documentazione relativa alle proprietà di un argomento.

Per creare un Pub/Sub con uno o più SMT, segui questi passaggi. Puoi attivare fino a 5 SMT per argomento.

Console

  1. Nella console Cloud de Confiance , vai alla pagina Argomenti di Pub/Sub.

    Vai ad Argomenti

  2. Fai clic su Crea argomento.

  3. Nel campo ID argomento, inserisci un ID per l'argomento. Per ulteriori informazioni sulla denominazione degli argomenti, consulta le linee guida per la denominazione.

  4. In Trasformazioni, fai clic su Aggiungi una trasformazione.

  5. Seleziona il tipo di trasformazione. Per ulteriori informazioni sui tipi di SMT supportati, consulta Tipi di SMT.

  6. Imposta le proprietà di configurazione per SMT. L'insieme di proprietà dipende dal tipo di SMT. Per saperne di più, consulta la documentazione relativa a questo tipo di SMT.

  7. Facoltativo. Per convalidare l'SMT, fai clic su Convalida. Se l'SMT è valido, viene visualizzato il messaggio "Validation passed". In caso contrario, viene visualizzato un messaggio di errore.

  8. Per aggiungere un'altra trasformazione, fai clic su Aggiungi una trasformazione e ripeti i passaggi precedenti.

    Per disporre gli SMT in un ordine specifico, fai clic su Sposta su o Sposta giù. Per rimuovere un SMT, fai clic su Elimina.

  9. Facoltativo. Per testare un SMT su un messaggio di esempio:

    1. Fai clic su Testa trasformazioni.

    2. Nella finestra Testa trasformazione, seleziona la funzione che vuoi testare.

    3. Nella finestra Messaggio di input, inserisci un messaggio di esempio.

    4. Per aggiungere un attributo al messaggio, fai clic su Aggiungi un attributo e inserisci la chiave e il valore dell'attributo. Puoi aggiungere più attributi.

    5. Fai clic su Test. Il risultato dell'applicazione della SMT al messaggio viene visualizzato in Messaggio di output.

    6. Per chiudere la finestra Testa trasformazioni, fai clic su Chiudi.

    Se crei più di una SMT, puoi testare l'intera sequenza di trasformazioni nel seguente modo:

    1. Testa il primo SMT della sequenza, come descritto nei passaggi precedenti.
    2. Seleziona il successivo SMT. Il messaggio di input è precompilato con il messaggio di output del test precedente.
    3. Continua a testare gli SMT in ordine per assicurarti che l'intera sequenza funzioni come previsto.
  10. Per creare l'argomento, fai clic su Crea.

gcloud

  1. Nella console Cloud de Confiance , attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della console Cloud de Confiance viene avviata una sessione di Cloud Shell e viene visualizzato un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installata e con valori già impostati per il progetto corrente. L'inizializzazione della sessione può richiedere alcuni secondi.

  2. Crea un file YAML o JSON che definisca uno o più SMT. La definizione YAML o JSON dipende dal tipo di SMT. Per saperne di più, consulta Tipi di trasformazioni SMT.

    Se il file include più di un SMT, Pub/Sub li esegue nell'ordine elencato.

  3. Facoltativo. Per convalidare un SMT, esegui il comando gcloud pubsub message-transforms validate:

    gcloud pubsub message-transforms validate \
      --message-transform-file=TRANSFORM_FILE
    

    Sostituisci quanto segue:

    • TRANSFORM_FILE: il percorso di un file YAML o JSON che definisce un singolo SMT. Se crei più SMT, devi convalidarli singolarmente.
  4. Facoltativo. Per testare uno o più SMT su un messaggio Pub/Sub di esempio, esegui il comando gcloud pubsub message-transforms test:

    gcloud pubsub message-transforms test \
      --message-transforms-file=TRANSFORMS_FILE \
      --message=MESSAGE \
      --attribute=ATTRIBUTES
    

    Sostituisci quanto segue:

    • TRANSFORMS_FILE: il percorso di un file YAML o JSON che definisce uno o più SMT.
    • MESSAGE: il corpo del messaggio di esempio.
    • ATTRIBUTES: (Facoltativo) Un elenco separato da virgole di attributi del messaggio. Ogni attributo è una coppia chiave-valore formattata come KEY="VALUE".

    Il comando esegue le SMT in ordine, utilizzando l'output di ciascuna SMT come input per la successiva. Il comando restituisce i risultati di ogni passaggio.

  5. Per creare l'argomento, esegui il comando gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID \
      --message-transforms-file=TRANSFORMS_FILE
    

    Sostituisci quanto segue:

    • TOPIC_ID: l'ID o il nome dell'argomento che vuoi creare. Per le linee guida su come denominare un argomento, consulta Nomi delle risorse. Il nome di un argomento è immutabile.
    • TRANSFORMS_FILE: il percorso di un file YAML o JSON che definisce uno o più SMT.

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Pub/Sub.


import com.google.api.gax.rpc.AlreadyExistsException;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.pubsub.v1.JavaScriptUDF;
import com.google.pubsub.v1.MessageTransform;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithSmtExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    createTopicWithSmtExample(projectId, topicId);
  }

  public static void createTopicWithSmtExample(String projectId, String topicId)
      throws IOException {
    TopicName topicName = TopicName.of(projectId, topicId);

    // UDF that removes the 'ssn' field, if present
    String code =
        "function redactSSN(message, metadata) {"
            + "  const data = JSON.parse(message.data);"
            + "  delete data['ssn'];"
            + "  message.data = JSON.stringify(data);"
            + "  return message;"
            + "}";
    String functionName = "redactSSN";

    JavaScriptUDF udf =
        JavaScriptUDF.newBuilder().setCode(code).setFunctionName(functionName).build();
    MessageTransform transform = MessageTransform.newBuilder().setJavascriptUdf(udf).build();
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  // Add the UDF message transform
                  .addMessageTransforms(transform)
                  .build());

      System.out.println("Created topic with SMT: " + topic.getName());
    } catch (AlreadyExistsException e) {
      System.out.println(topicName + "already exists.");
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Python di Pub/Sub.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import JavaScriptUDF, MessageTransform, Topic

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

code = """function redactSSN(message, metadata) {
            const data = JSON.parse(message.data);
            delete data['ssn'];
            message.data = JSON.stringify(data);
            return message;
            }"""
udf = JavaScriptUDF(code=code, function_name="redactSSN")
transforms = [MessageTransform(javascript_udf=udf)]

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

request = Topic(name=topic_path, message_transforms=transforms)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with SMT")

Go

L'esempio seguente utilizza la versione principale della libreria client Go Pub/Sub (v2). Se utilizzi ancora la libreria v1, consulta la guida alla migrazione alla v2. Per visualizzare un elenco di esempi di codice della versione 1, consulta gli esempi di codice deprecati.

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createTopicWithSMT creates a topic with a single message transform function applied.
func createTopicWithSMT(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	code := `function redactSSN(message, metadata) {
			const data = JSON.parse(message.data);
			delete data['ssn'];
			message.data = JSON.stringify(data);
			return message;
		}`
	transform := &pubsubpb.MessageTransform{
		Transform: &pubsubpb.MessageTransform_JavascriptUdf{
			JavascriptUdf: &pubsubpb.JavaScriptUDF{
				FunctionName: "redactSSN",
				Code:         code,
			},
		},
	}

	topic := &pubsubpb.Topic{
		Name:              fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		MessageTransforms: []*pubsubpb.MessageTransform{transform},
	}

	topic, err = client.TopicAdminClient.CreateTopic(ctx, topic)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}

	fmt.Fprintf(w, "Created topic with message transform: %v\n", topic)
	return nil
}

Passaggi successivi