Valider une révision de schéma

Ce document vous explique comment valider une révision de schéma pour les sujets Pub/Sub.

Avant de commencer

Rôles et autorisations requis

Pour obtenir les autorisations nécessaires pour valider une révision de schéma et gérer des schémas, demandez à votre administrateur de vous accorder le rôle IAM Éditeur Pub/Sub (roles/pubsub.editor) dans votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour valider une révision de schéma et gérer des schémas. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Vous devez disposer des autorisations suivantes pour valider une révision de schéma et gérer des schémas :

  • Créer un schéma : pubsub.schemas.create
  • Associer un schéma à un sujet : pubsub.schemas.attach
  • Valider une révision de schéma : pubsub.schemas.commit
  • Supprimer un schéma ou une révision de schéma : pubsub.schemas.delete
  • Obtenir un schéma ou des révisions de schéma : pubsub.schemas.get
  • Lister les schémas : pubsub.schemas.list
  • Lister les révisions de schéma : pubsub.schemas.listRevisions
  • Restaurer un schéma : pubsub.schemas.rollback
  • Valider un message : pubsub.schemas.validate
  • Obtenir la stratégie IAM pour un schéma : pubsub.schemas.getIamPolicy
  • Configurer la stratégie IAM pour un schéma : pubsub.schemas.setIamPolicy

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Vous pouvez accorder des rôles et des autorisations à des principaux tels que des utilisateurs, des groupes, des domaines ou des comptes de service. Vous pouvez créer un schéma dans un projet et l'associer à un sujet situé dans un autre projet. Assurez-vous de disposer des autorisations requises pour chaque projet.

Réviser un schéma

Vous pouvez valider une révision de schéma à l'aide de la Cloud de Confiance console, de gcloud CLI, de l'API Pub/Sub, ou des bibliothèques clientes Cloud.

Voici quelques consignes à suivre pour valider une révision de schéma :

  • Vous pouvez réviser un schéma dans certaines limites :

    • Pour les schémas de tampon de protocole, vous pouvez ajouter ou supprimer des champs facultatifs. Vous ne pouvez pas ajouter ni supprimer d'autres champs. Vous ne pouvez pas non plus modifier de champ existant.

    • Pour les schémas Avro, consultez la documentation Avro pour connaître les règles de résolution des schémas. Une nouvelle révision doit suivre les règles comme s'il s'agissait à la fois du schéma du lecteur et du schéma de l'auteur.

    • Un schéma peut comporter au maximum 20 révisions à la fois. Si vous dépassez cette limite, supprimez une révision de schéma avant d'en créer une autre.

  • Chaque révision est associée à un ID de révision unique. L'ID de révision est un UUID à huit caractères généré automatiquement.

  • Lorsque vous mettez à jour la plage de révisions ou la révision d'un schéma utilisé pour la validation des sujets, l'application des modifications peut prendre quelques minutes.

Console

Pour créer une révision de schéma, procédez comme suit :

  1. Dans la Cloud de Confiance console, accédez à la page Schémas Pub/Sub.

    Accéder à la page "Schémas"

  2. Cliquez sur l'ID de schéma d'un schéma existant.

    La page Détails du schéma s'ouvre.

  3. Cliquez sur Créer une révision.

    La page Créer une révision de schéma s'ouvre.

  4. Apportez les modifications nécessaires.

    Par exemple, pour l'exemple de schéma Avro que vous avez créé dans Créer un schéma, vous pouvez ajouter un champ facultatif supplémentaire appelé Price comme suit :

     {
       "type": "record",
       "name": "Avro",
       "fields": [
         {
           "name": "ProductName",
           "type": "string",
           "default": ""
         },
         {
           "name": "SKU",
           "type": "int",
           "default": 0
         },
         {
           "name": "InStock",
           "type": "boolean",
           "default": false
         },
         {
           "name": "Price",
           "type": "double",
           "default": "0.0"
         }
       ]
     }
    
  5. Cliquez sur Valider la définition pour vérifier si la définition du schéma est correcte.

  6. Vous pouvez également valider les messages du schéma.

    1. Cliquez sur Tester le message pour tester un exemple de message.

    2. Dans la fenêtre Tester le message, sélectionnez un type d'encodage de message.

    3. Dans le corps du message, saisissez un message test.

      Voici un exemple de message pour le schéma test. Dans cet exemple, sélectionnez l'encodage de message JSON.

      {"ProductName":"GreenOnions", "SKU":34543, "Price":12, "InStock":true}
      
    4. Cliquez sur Test.

  7. Cliquez sur Valider pour enregistrer le schéma.

gcloud

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition=SCHEMA_DEFINITION

Où :

  • SCHEMA_TYPE correspond à avro ou protocol-buffer.
  • SCHEMA_DEFINITION correspond à un string contenant la définition de le schéma, formatée en fonction du type de schéma choisi.

Vous pouvez également spécifier la définition du schéma dans un fichier :

gcloud pubsub schemas commit SCHEMA_ID \
        --type=SCHEMA_TYPE \
        --definition-file=SCHEMA_DEFINITION_FILE

Où :

  • SCHEMA_TYPE correspond à avro ou protocol-buffer.
  • SCHEMA_DEFINITION_FILE correspond à un string contenant le chemin d'accès au fichier avec la définition du schéma, formatée en fonction du type de schéma choisi.

REST

Pour valider une révision de schéma, envoyez une requête POST comme suit :

POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/schemas/SCHEMA_ID:commit
Authorization: Bearer $(gcloud auth application-default print-access-token)
Content-Type: application/json --data @response-body.json

Spécifiez les champs suivants dans le corps de la requête :

{
  "definition": SCHEMA_DEFINITION
  "type": SCHEMA_TYPE
  "name": SCHEMA_NAME
}

Où :

  • SCHEMA_TYPE correspond à AVRO ou PROTOCOL_BUFFER.
  • SCHEMA_DEFINITION est une chaîne contenant la définition de le schéma, formatée en fonction du type de schéma choisi.
  • SCHEMA_NAME correspond au nom d'un schéma existant.

Le corps de la réponse doit contenir une représentation JSON d'une ressource de schéma. Exemple :

{
  "name": SCHEMA_NAME,
  "type": SCHEMA_TYPE,
  "definition": SCHEMA_DEFINITION
  "revisionId": REVISION_ID
  "revisionCreateTime": REVISION_CREATE_TIME
}

Où :

  • REVISION_ID correspond à l'ID généré par le serveur pour la révision.
  • REVISION_CREATE_TIME correspond au code temporel ISO 8601 auquel la révision a été créée.

Go

L'exemple suivant utilise la version majeure de la bibliothèque cliente Go Pub/Sub (v2). Si vous utilisez toujours la bibliothèque v1, consultez le guide de migration vers la v2. Pour afficher la liste des exemples de code v1, consultez les exemples de code obsolètes.

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub pour Go.

Avro

import (
	"context"
	"fmt"
	"io"
	"os"

	pubsub "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// commitAvroSchema commits a new Avro schema revision to an existing schema.
func commitAvroSchema(w io.Writer, projectID, schemaID, avscFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema-id"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read an Avro schema file formatted in JSON as a byte slice.
	avscSource, err := os.ReadFile(avscFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", avscFile)
	}

	schema := &pubsubpb.Schema{
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsubpb.Schema_AVRO,
		Definition: string(avscSource),
	}
	req := &pubsubpb.CommitSchemaRequest{
		Name:   fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Schema: schema,
	}
	s, err := client.CommitSchema(ctx, req)
	if err != nil {
		return fmt.Errorf("error calling CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using an Avro schema: %#v\n", s)
	return nil
}

Proto

import (
	"context"
	"fmt"
	"io"
	"os"

	pubsub "cloud.google.com/go/pubsub/v2/apiv1"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// commitProtoSchema commits a new proto schema revision to an existing schema.
func commitProtoSchema(w io.Writer, projectID, schemaID, protoFile string) error {
	// projectID := "my-project-id"
	// schemaID := "my-schema"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewSchemaClient(ctx)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}
	defer client.Close()

	// Read a proto file as a byte slice.
	protoSource, err := os.ReadFile(protoFile)
	if err != nil {
		return fmt.Errorf("error reading from file: %s", protoFile)
	}

	schema := &pubsubpb.Schema{
		// TODO(hongalex): check if name is necessary here
		Name:       fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Type:       pubsubpb.Schema_PROTOCOL_BUFFER,
		Definition: string(protoSource),
	}
	req := &pubsubpb.CommitSchemaRequest{
		Name:   fmt.Sprintf("projects/%s/schemas/%s", projectID, schemaID),
		Schema: schema,
	}
	s, err := client.CommitSchema(ctx, req)
	if err != nil {
		return fmt.Errorf("CommitSchema: %w", err)
	}
	fmt.Fprintf(w, "Committed a schema using a protobuf schema: %#v\n", s)
	return nil
}

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

Avro

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(google::pubsub::v1::Schema::AVRO);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Proto

namespace pubsub = ::google::cloud::pubsub;
[](pubsub::SchemaServiceClient client, std::string const& project_id,
   std::string const& schema_id, std::string const& schema_definition_file) {
  std::string const definition = ReadFile(schema_definition_file);

  google::pubsub::v1::CommitSchemaRequest request;
  std::string const name =
      google::cloud::pubsub::Schema(project_id, schema_id).FullName();
  request.set_name(name);
  request.mutable_schema()->set_name(name);
  request.mutable_schema()->set_type(
      google::pubsub::v1::Schema::PROTOCOL_BUFFER);
  request.mutable_schema()->set_definition(definition);
  auto schema = client.CommitSchema(request);
  if (schema.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The schema revision already exists\n";
    return;
  }
  if (!schema) throw std::move(schema).status();

  std::cout << "Schema revision successfully committed: "
            << schema->DebugString() << "\n";
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

Avro


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitAvroSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String avscFile = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json";

    commitAvroSchemaExample(projectId, schemaId, avscFile);
  }

  public static Schema commitAvroSchemaExample(String projectId, String schemaId, String avscFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read an Avro schema file formatted in JSON as a string.
    String avscSource = new String(Files.readAllBytes(Paths.get(avscFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.AVRO)
                  .setDefinition(avscSource)
                  .build());

      System.out.println("Committed a schema using an Avro schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Proto


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.pubsub.v1.ProjectName;
import com.google.pubsub.v1.Schema;
import com.google.pubsub.v1.SchemaName;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class CommitProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String schemaId = "your-schema-id";
    String protoFile = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers";

    commitProtoSchemaExample(projectId, schemaId, protoFile);
  }

  public static Schema commitProtoSchemaExample(String projectId, String schemaId, String protoFile)
      throws IOException {

    ProjectName projectName = ProjectName.of(projectId);
    SchemaName schemaName = SchemaName.of(projectId, schemaId);

    // Read a proto file as a string.
    String protoSource = new String(Files.readAllBytes(Paths.get(protoFile)));

    try (SchemaServiceClient schemaServiceClient = SchemaServiceClient.create()) {

      Schema schema =
          schemaServiceClient.commitSchema(
              schemaName.toString(),
              Schema.newBuilder()
                  .setName(schemaName.toString())
                  .setType(Schema.Type.PROTOCOL_BUFFER)
                  .setDefinition(protoSource)
                  .build());

      System.out.println("Committed a schema using a protobuf schema:\n" + schema);
      return schema;
    } catch (NotFoundException e) {
      System.out.println(schemaName + "does not exist.");
      return null;
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

Avro

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"

# Read a JSON-formatted Avro schema file as a string.
with open(avsc_file, "rb") as f:
    avsc_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(name=schema_path, type_=Schema.Type.AVRO, definition=avsc_source)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using an Avro schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Proto

from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient
from google.pubsub_v1.types import Schema

# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# schema_id = "your-schema-id"
# proto_file = "path/to/a/proto/file/(.proto)/formatted/in/protocol/buffers"

# Read a protobuf schema file as a string.
with open(proto_file, "rb") as f:
    proto_source = f.read().decode("utf-8")

schema_client = SchemaServiceClient()
schema_path = schema_client.schema_path(project_id, schema_id)
schema = Schema(
    name=schema_path, type_=Schema.Type.PROTOCOL_BUFFER, definition=proto_source
)

try:
    result = schema_client.commit_schema(
        request={"schema": schema, "name": schema_path}
    )
    print(f"Committed a schema revision using a protobuf schema file:\n{result}")
    return result
except NotFound:
    print(f"{schema_id} does not exist.")

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId, avscFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
const {PubSub, SchemaTypes} = require('@google-cloud/pubsub');

const fs = require('fs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId, protoFile) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Avro

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const avscFile = 'path/to/an/avro/schema/file/(.avsc)/formatted/in/json';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitAvroSchema(schemaNameOrId: string, avscFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(avscFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.Avro,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Proto

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const schemaNameOrId = 'YOUR_SCHEMA_NAME_OR_ID';
// const protoFile = 'path/to/a/proto/schema/file/(.proto)/formatted/in/protcol/buffers';

// Imports the Google Cloud client library
import {PubSub, SchemaTypes} from '@google-cloud/pubsub';

import * as fs from 'fs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function commitProtoSchema(schemaNameOrId: string, protoFile: string) {
  // Get the fully qualified schema name.
  const schema = pubSubClient.schema(schemaNameOrId);
  const name = await schema.getName();

  // Read the new schema definition from storage.
  const definition: string = fs.readFileSync(protoFile).toString();

  // Use the gapic client to commit the new definition.
  const schemaClient = await pubSubClient.getSchemaClient();
  const [result] = await schemaClient.commitSchema({
    name,
    schema: {
      name,
      type: SchemaTypes.ProtocolBuffer,
      definition,
    },
  });

  console.log(`Schema ${name} committed with revision ${result.revisionId}.`);
}

Une fois que vous avez validé une révision de schéma, vous pouvez afficher les détails de la nouvelle révision sur la page Schémas.

Étape suivante