Nachrichten in einem Thema mit einem Schema veröffentlichen

In diesem Dokument wird gezeigt, wie Sie Nachrichten in einem Thema mit einem Schema veröffentlichen.

Hinweise

Bevor Sie den Veröffentlichungs-Workflow konfigurieren, müssen Sie die folgenden Aufgaben ausführen:

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub Publisher (roles/pubsub.publisher) für das Thema zuzuweisen, damit Sie die Berechtigungen erhalten, die Sie zum Veröffentlichen von Nachrichten in einem Thema benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Sie benötigen zusätzliche Berechtigungen, um Themen und Abos zu erstellen oder zu aktualisieren.

Nachrichten mit einem Schema veröffentlichen

Sie können Nachrichten in einem Thema veröffentlichen, das mit einem Schema verknüpft ist. Sie müssen die Nachrichten in dem Schema und Format codieren, das Sie beim Erstellen des Themas angegeben haben. Eine Nachricht entspricht dem mit dem Thema verknüpften Schema, wenn sie einer der Schemaüberarbeitungen im zulässigen Überarbeitungsbereich entspricht. Nachrichten werden anhand von Überarbeitungen in der Reihenfolge von der neuesten zulässigen Überarbeitung bis entweder eine Übereinstimmung gefunden wird oder die älteste zulässige Überarbeitung erreicht ist, ausgewertet. Pub/Sub fügt einer Nachricht, die erfolgreich in einem Thema veröffentlicht wurde, das mit einem Schema verknüpft ist, die folgenden Attribute hinzu:

  • googclient_schemaname: Den Namen des Schemas, das für die Validierung verwendet wird.

  • googclient_schemaencoding: die Codierung der Nachricht, entweder JSON oder BINARY.

  • googclient_schemarevisionid: Die Revisions-ID des Schemas, das zum Parsen und Validieren der Nachricht verwendet wurde. Jeder Überarbeitung ist eine eindeutige Überarbeitungs-ID zugeordnet. Die Überarbeitungs-ID ist eine automatisch generierte achtstellige UUID.

Wenn eine Nachricht mit keiner der vom Thema zugelassenen Schemaversionen übereinstimmt, gibt Pub/Sub einen INVALID_ARGUMENT-Fehler für die Veröffentlichungsanfrage zurück.

Pub/Sub wertet Nachrichten nur zum Zeitpunkt der Veröffentlichung anhand von Schemaversionen aus. Wenn Sie eine neue Schemaversion committen oder das mit einem Thema verknüpfte Schema ändern, nachdem Sie eine Nachricht veröffentlicht haben, wird diese Nachricht nicht neu ausgewertet und es werden keine der angehängten Schema-Nachrichtenattribute geändert.

Sie können Nachrichten für ein Thema mit einem zugehörigen Schema in einem Google Cloud-Projekt über die Google Cloud Console, die gcloud CLI, die Pub/Sub API oder die Cloud-Clientbibliotheken veröffentlichen.

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Veröffentlichen Sie eine Beispielnachricht mit dem Befehl gcloud pubsub topics publish.

    gcloud pubsub topics publish TOPIC_ID \
        --message=MESSAGE
    

    Ersetzen Sie Folgendes:

    • TOPIC_ID: Name des Themas, das Sie bereits erstellt haben.

    • MESSAGE: Die Nachricht wurde im Thema veröffentlicht. Eine Beispielnachricht kann {"name": "Alaska", "post_abbr": "AK"} sein.

    C++

    Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

    Avro
    namespace pubsub = ::google::cloud::pubsub;
    using ::google::cloud::future;
    using ::google::cloud::StatusOr;
    [](pubsub::Publisher publisher) {
      auto constexpr kNewYork =
          R"js({ "name": "New York", "post_abbr": "NY" })js";
      auto constexpr kPennsylvania =
          R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
      std::vector<future<void>> done;
      auto handler = [](future<StatusOr<std::string>> f) {
        auto id = f.get();
        if (!id) throw std::move(id).status();
      };
      for (auto const* data : {kNewYork, kPennsylvania}) {
        done.push_back(
            publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
                .then(handler));
      }
      // Block until all messages are published.
      for (auto& d : done) d.get();
    }
    Proto
    namespace pubsub = ::google::cloud::pubsub;
    using ::google::cloud::future;
    using ::google::cloud::StatusOr;
    [](pubsub::Publisher publisher) {
      std::vector<std::pair<std::string, std::string>> states{
          {"New York", "NY"},
          {"Pennsylvania", "PA"},
      };
      std::vector<future<void>> done;
      auto handler = [](future<StatusOr<std::string>> f) {
        auto id = f.get();
        if (!id) throw std::move(id).status();
      };
      for (auto& data : states) {
        google::cloud::pubsub::samples::State state;
        state.set_name(data.first);
        state.set_post_abbr(data.second);
        done.push_back(publisher
                           .Publish(pubsub::MessageBuilder{}
                                        .SetData(state.SerializeAsString())
                                        .Build())
                           .then(handler));
      }
      // Block until all messages are published.
      for (auto& d : done) d.get();
    }

    C#

    Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C# API.

    Avro
    
    using Avro.IO;
    using Avro.Specific;
    using Google.Cloud.PubSub.V1;
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    public class PublishAvroMessagesAsyncSample
    {
        public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
        {
            TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
            PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
    
            PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
            var topic = publishApi.GetTopic(topicName);
    
            int publishedMessageCount = 0;
            var publishTasks = messageStates.Select(async state =>
            {
    
                try
                {
                    string messageId = null;
                    switch (topic.SchemaSettings.Encoding)
                    {
                        case Encoding.Binary:
                            using (var ms = new MemoryStream())
                            {
                                var encoder = new BinaryEncoder(ms);
                                var writer = new SpecificDefaultWriter(state.Schema);
                                writer.Write(state, encoder);
                                messageId = await publisher.PublishAsync(ms.ToArray());
                            }
                            break;
                        case Encoding.Json:
                            var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
                            messageId = await publisher.PublishAsync(jsonMessage);
                            break;
                    }
                    Console.WriteLine($"Published message {messageId}");
                    Interlocked.Increment(ref publishedMessageCount);
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
                }
            });
            await Task.WhenAll(publishTasks);
            return publishedMessageCount;
        }
    }
    Proto
    
    using Google.Cloud.PubSub.V1;
    using Google.Protobuf;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    public class PublishProtoMessagesAsyncSample
    {
        public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
        {
            TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
            PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
    
            PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
            var topic = publishApi.GetTopic(topicName);
    
            int publishedMessageCount = 0;
            var publishTasks = messageStates.Select(async state =>
            {
                try
                {
                    string messageId = null;
                    switch (topic.SchemaSettings.Encoding)
                    {
                        case Encoding.Binary:
                            var binaryMessage = state.ToByteString();
                            messageId = await publisher.PublishAsync(binaryMessage);
                            break;
                        case Encoding.Json:
                            var jsonMessage = JsonFormatter.Default.Format(state);
                            messageId = await publisher.PublishAsync(jsonMessage);
                            break;
                    }
                    Console.WriteLine($"Published message {messageId}");
                    Interlocked.Increment(ref publishedMessageCount);
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
                }
            });
            await Task.WhenAll(publishTasks);
            return publishedMessageCount;
        }
    }

    Go

    Im folgenden Beispiel wird die Hauptversion der Go Pub/Sub-Clientbibliothek (v2) verwendet. Wenn Sie noch die v1-Bibliothek verwenden, finden Sie hier den Migrationsleitfaden für v2. Eine Liste der Codebeispiele für Version 1 finden Sie unter Eingestellte Codebeispiele.

    Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

    Avro
    import (
    	"context"
    	"fmt"
    	"io"
    	"os"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    	"github.com/linkedin/goavro/v2"
    )
    
    func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    
    	avroSource, err := os.ReadFile(avscFile)
    	if err != nil {
    		return fmt.Errorf("os.ReadFile err: %w", err)
    	}
    	codec, err := goavro.NewCodec(string(avroSource))
    	if err != nil {
    		return fmt.Errorf("goavro.NewCodec err: %w", err)
    	}
    	record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}
    
    	// Get the topic encoding type.
    	req := &pubsubpb.GetTopicRequest{
    		Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    	}
    	t, err := client.TopicAdminClient.GetTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("got err in GetTopic: %w", err)
    	}
    	encoding := t.SchemaSettings.Encoding
    
    	var msg []byte
    	switch encoding {
    	case pubsubpb.Encoding_BINARY:
    		msg, err = codec.BinaryFromNative(nil, record)
    		if err != nil {
    			return fmt.Errorf("codec.BinaryFromNative err: %w", err)
    		}
    	case pubsubpb.Encoding_JSON:
    		msg, err = codec.TextualFromNative(nil, record)
    		if err != nil {
    			return fmt.Errorf("codec.TextualFromNative err: %w", err)
    		}
    	default:
    		return fmt.Errorf("invalid encoding: %v", encoding)
    	}
    
    	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
    	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
    	// If a topic ID is provided, the project ID from the client is used.
    	publisher := client.Publisher(topicID)
    	result := publisher.Publish(ctx, &pubsub.Message{
    		Data: msg,
    	})
    	_, err = result.Get(ctx)
    	if err != nil {
    		return fmt.Errorf("result.Get: %w", err)
    	}
    	fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
    	return nil
    }
    
    Proto
    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    	statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
    	"google.golang.org/protobuf/encoding/protojson"
    	"google.golang.org/protobuf/proto"
    )
    
    func publishProtoMessages(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    
    	state := &statepb.State{
    		Name:     "Alaska",
    		PostAbbr: "AK",
    	}
    
    	// Get the topic encoding type.
    	req := &pubsubpb.GetTopicRequest{
    		Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    	}
    	t, err := client.TopicAdminClient.GetTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("got err in GetTopic: %w", err)
    	}
    	encoding := t.SchemaSettings.Encoding
    
    	var msg []byte
    	switch encoding {
    	case pubsubpb.Encoding_BINARY:
    		msg, err = proto.Marshal(state)
    		if err != nil {
    			return fmt.Errorf("proto.Marshal err: %w", err)
    		}
    	case pubsubpb.Encoding_JSON:
    		msg, err = protojson.Marshal(state)
    		if err != nil {
    			return fmt.Errorf("protojson.Marshal err: %w", err)
    		}
    	default:
    		return fmt.Errorf("invalid encoding: %v", encoding)
    	}
    
    	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
    	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
    	// If a topic ID is provided, the project ID from the client is used.
    	publisher := client.Publisher(topicID)
    	result := publisher.Publish(ctx, &pubsub.Message{
    		Data: msg,
    	})
    	_, err = result.Get(ctx)
    	if err != nil {
    		return fmt.Errorf("result.Get: %w", err)
    	}
    	fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

    Avro
    
    import com.google.api.core.ApiFuture;
    import com.google.cloud.pubsub.v1.Publisher;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.protobuf.ByteString;
    import com.google.pubsub.v1.Encoding;
    import com.google.pubsub.v1.PubsubMessage;
    import com.google.pubsub.v1.TopicName;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import org.apache.avro.io.Encoder;
    import org.apache.avro.io.EncoderFactory;
    import utilities.State;
    
    public class PublishAvroRecordsExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        // Use a topic created with an Avro schema.
        String topicId = "your-topic-id";
    
        publishAvroRecordsExample(projectId, topicId);
      }
    
      public static void publishAvroRecordsExample(String projectId, String topicId)
          throws IOException, ExecutionException, InterruptedException {
    
        Encoding encoding = null;
    
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // Get the topic encoding type.
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
        }
    
        // Instantiate an avro-tools-generated class defined in `us-states.avsc`.
        State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
    
        Publisher publisher = null;
    
        block:
        try {
          publisher = Publisher.newBuilder(topicName).build();
    
          // Prepare to serialize the object to the output stream.
          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
    
          Encoder encoder = null;
    
          // Prepare an appropriate encoder for publishing to the topic.
          switch (encoding) {
            case BINARY:
              System.out.println("Preparing a BINARY encoder...");
              encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /* reuse= */ null);
              break;
    
            case JSON:
              System.out.println("Preparing a JSON encoder...");
              encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
              break;
    
            default:
              break block;
          }
    
          // Encode the object and write it to the output stream.
          state.customEncode(encoder);
          encoder.flush();
    
          // Publish the encoded object as a Pub/Sub message.
          ByteString data = ByteString.copyFrom(byteStream.toByteArray());
          PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
          System.out.println("Publishing message: " + message);
    
          ApiFuture<String> future = publisher.publish(message);
          System.out.println("Published message ID: " + future.get());
    
        } finally {
          if (publisher != null) {
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
          }
        }
      }
    }
    Proto
    
    import com.google.api.core.ApiFuture;
    import com.google.cloud.pubsub.v1.Publisher;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.protobuf.ByteString;
    import com.google.protobuf.util.JsonFormat;
    import com.google.pubsub.v1.Encoding;
    import com.google.pubsub.v1.PubsubMessage;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import utilities.StateProto.State;
    
    public class PublishProtobufMessagesExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        // Use a topic created with a proto schema.
        String topicId = "your-topic-id";
    
        publishProtobufMessagesExample(projectId, topicId);
      }
    
      public static void publishProtobufMessagesExample(String projectId, String topicId)
          throws IOException, ExecutionException, InterruptedException {
    
        Encoding encoding = null;
    
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // Get the topic encoding type.
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
        }
    
        Publisher publisher = null;
    
        // Instantiate a protoc-generated class defined in `us-states.proto`.
        State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
    
        block:
        try {
          publisher = Publisher.newBuilder(topicName).build();
    
          PubsubMessage.Builder message = PubsubMessage.newBuilder();
    
          // Prepare an appropriately formatted message based on topic encoding.
          switch (encoding) {
            case BINARY:
              message.setData(state.toByteString());
              System.out.println("Publishing a BINARY-formatted message:\n" + message);
              break;
    
            case JSON:
              String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
              message.setData(ByteString.copyFromUtf8(jsonString));
              System.out.println("Publishing a JSON-formatted message:\n" + message);
              break;
    
            default:
              break block;
          }
    
          // Publish the message.
          ApiFuture<String> future = publisher.publish(message.build());
          System.out.println("Published message ID: " + future.get());
    
        } finally {
          if (publisher != null) {
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
          }
        }
      }
    }

    Node.js

    Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

    Avro
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub, Encodings} = require('@google-cloud/pubsub');
    
    // And the Apache Avro library
    const avro = require('avro-js');
    const fs = require('fs');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function publishAvroRecords(topicNameOrId) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema encoding.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Make an encoder using the official avro-js library.
      const definition = fs
        .readFileSync('system-test/fixtures/provinces.avsc')
        .toString();
      const type = avro.parse(definition);
    
      // Encode the message.
      const province = {
        name: 'Ontario',
        post_abbr: 'ON',
      };
      let dataBuffer;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = type.toBuffer(province);
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(type.toString(province));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publish(dataBuffer);
      console.log(`Avro record ${messageId} published.`);
    }
    Protocol Buffer
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub, Encodings} = require('@google-cloud/pubsub');
    
    // And the protobufjs library
    const protobuf = require('protobufjs');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function publishProtobufMessages(topicNameOrId) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Encode the message.
      const province = {
        name: 'Ontario',
        postAbbr: 'ON',
      };
    
      // Make an encoder using the protobufjs library.
      //
      // Since we're providing the test message for a specific schema here, we'll
      // also code in the path to a sample proto definition.
      const root = await protobuf.load('system-test/fixtures/provinces.proto');
      const Province = root.lookupType('utilities.Province');
      const message = Province.create(province);
    
      let dataBuffer;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = Buffer.from(Province.encode(message).finish());
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publishMessage({data: dataBuffer});
      console.log(`Protobuf message ${messageId} published.`);
    }

    Node.js

    Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

    Avro
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub, Encodings} from '@google-cloud/pubsub';
    
    // And the Apache Avro library
    import * as avro from 'avro-js';
    import * as fs from 'fs';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    interface ProvinceObject {
      name: string;
      post_abbr: string;
    }
    
    async function publishAvroRecords(topicNameOrId: string) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema encoding.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Make an encoder using the official avro-js library.
      const definition = fs
        .readFileSync('system-test/fixtures/provinces.avsc')
        .toString();
      const type = avro.parse(definition);
    
      // Encode the message.
      const province: ProvinceObject = {
        name: 'Ontario',
        post_abbr: 'ON',
      };
      let dataBuffer: Buffer | undefined;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = type.toBuffer(province);
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(type.toString(province));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publish(dataBuffer);
      console.log(`Avro record ${messageId} published.`);
    }
    Protocol Buffer
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub, Encodings} from '@google-cloud/pubsub';
    
    // And the protobufjs library
    import * as protobuf from 'protobufjs';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    interface ProvinceObject {
      name: string;
      postAbbr: string;
    }
    
    async function publishProtobufMessages(topicNameOrId: string) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Encode the message.
      const province: ProvinceObject = {
        name: 'Ontario',
        postAbbr: 'ON',
      };
    
      // Make an encoder using the protobufjs library.
      //
      // Since we're providing the test message for a specific schema here, we'll
      // also code in the path to a sample proto definition.
      const root = await protobuf.load('system-test/fixtures/provinces.proto');
      const Province = root.lookupType('utilities.Province');
      const message = Province.create(province);
    
      let dataBuffer: Buffer | undefined;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = Buffer.from(Province.encode(message).finish());
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publishMessage({data: dataBuffer});
      console.log(`Protobuf message ${messageId} published.`);
    }

    PHP

    Folgen Sie der Einrichtungsanleitung für PHP unter Schnellstart: Clientbibliotheken verwenden, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub PHP API.

    Avro
    use Google\Cloud\PubSub\PubSubClient;
    use Google\Cloud\PubSub\V1\Encoding;
    
    use AvroStringIO;
    use AvroSchema;
    use AvroIODatumWriter;
    use AvroIOBinaryEncoder;
    
    /**
     * Publish a message using an AVRO schema.
     *
     * This sample uses `wikimedia/avro` for AVRO encoding.
     *
     * @param string $projectId
     * @param string $topicId
     * @param string $definitionFile
     */
    function publish_avro_records($projectId, $topicId, $definitionFile)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
    
        $definition = (string) file_get_contents($definitionFile);
    
        $messageData = [
            'name' => 'Alaska',
            'post_abbr' => 'AK',
        ];
    
        $topic = $pubsub->topic($topicId);
    
        // get the encoding type.
        $topicInfo = $topic->info();
        $encoding = '';
        if (isset($topicInfo['schemaSettings']['encoding'])) {
            $encoding = $topicInfo['schemaSettings']['encoding'];
        }
    
        // if encoding is not set, we can't continue.
        if ($encoding === '') {
            printf('Topic %s does not have schema enabled', $topicId);
            return;
        }
    
        // If you are using gRPC, encoding may be an integer corresponding to an
        // enum value on Google\Cloud\PubSub\V1\Encoding.
        if (!is_string($encoding)) {
            $encoding = Encoding::name($encoding);
        }
    
        $encodedMessageData = '';
        if ($encoding == 'BINARY') {
            // encode as AVRO binary.
            $io = new AvroStringIO();
            $schema = AvroSchema::parse($definition);
            $writer = new AvroIODatumWriter($schema);
            $encoder = new AvroIOBinaryEncoder($io);
            $writer->write($messageData, $encoder);
    
            $encodedMessageData = $io->string();
        } else {
            // encode as JSON.
            $encodedMessageData = json_encode($messageData);
        }
    
        $topic->publish(['data' => $encodedMessageData]);
    
        printf('Published message with %s encoding', $encoding);
    }
    Protocol Buffer
    use Google\Cloud\PubSub\PubSubClient;
    use Google\Cloud\PubSub\V1\Encoding;
    
    use Utilities\StateProto;
    
    /**
     * Publish a message using a protocol buffer schema.
     *
     * Relies on a proto message of the following form:
     * ```
     * syntax = "proto3";
     *
     * package utilities;
     *
     * message StateProto {
     *   string name = 1;
     *   string post_abbr = 2;
     * }
     * ```
     *
     * @param string $projectId
     * @param string $topicId
     * @return void
     */
    function publish_proto_messages($projectId, $topicId)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
    
        $messageData = new StateProto([
            'name' => 'Alaska',
            'post_abbr' => 'AK',
        ]);
    
        $topic = $pubsub->topic($topicId);
    
        // get the encoding type.
        $topicInfo = $topic->info();
        $encoding = '';
        if (isset($topicInfo['schemaSettings']['encoding'])) {
            $encoding = $topicInfo['schemaSettings']['encoding'];
        }
    
        // if encoding is not set, we can't continue.
        if ($encoding === '') {
            printf('Topic %s does not have schema enabled', $topicId);
            return;
        }
    
        // If you are using gRPC, encoding may be an integer corresponding to an
        // enum value on Google\Cloud\PubSub\V1\Encoding.
        if (!is_string($encoding)) {
            $encoding = Encoding::name($encoding);
        }
    
        $encodedMessageData = '';
        if ($encoding == 'BINARY') {
            // encode as protobuf binary.
            $encodedMessageData = $messageData->serializeToString();
        } else {
            // encode as JSON.
            $encodedMessageData = $messageData->serializeToJsonString();
        }
    
        $topic->publish(['data' => $encodedMessageData]);
    
        printf('Published message with %s encoding', $encoding);
    }

    Python

    Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

    Avro
    from avro.io import BinaryEncoder, DatumWriter
    import avro.schema as schema
    import io
    import json
    from google.api_core.exceptions import NotFound
    from google.cloud.pubsub import PublisherClient
    from google.pubsub_v1.types import Encoding
    
    # TODO(developer): Replace these variables before running the sample.
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
    
    publisher_client = PublisherClient()
    topic_path = publisher_client.topic_path(project_id, topic_id)
    
    # Prepare to write Avro records to the binary output stream.
    with open(avsc_file, "rb") as file:
        avro_schema = schema.parse(file.read())
    writer = DatumWriter(avro_schema)
    bout = io.BytesIO()
    
    # Prepare some data using a Python dictionary that matches the Avro schema
    record = {"name": "Alaska", "post_abbr": "AK"}
    
    try:
        # Get the topic encoding type.
        topic = publisher_client.get_topic(request={"topic": topic_path})
        encoding = topic.schema_settings.encoding
    
        # Encode the data according to the message serialization type.
        if encoding == Encoding.BINARY:
            encoder = BinaryEncoder(bout)
            writer.write(record, encoder)
            data = bout.getvalue()
            print(f"Preparing a binary-encoded message:\n{data.decode()}")
        elif encoding == Encoding.JSON:
            data_str = json.dumps(record)
            print(f"Preparing a JSON-encoded message:\n{data_str}")
            data = data_str.encode("utf-8")
        else:
            print(f"No encoding specified in {topic_path}. Abort.")
            exit(0)
    
        future = publisher_client.publish(topic_path, data)
        print(f"Published message ID: {future.result()}")
    
    except NotFound:
        print(f"{topic_id} not found.")
    Protocol Buffer
    from google.api_core.exceptions import NotFound
    from google.cloud.pubsub import PublisherClient
    from google.protobuf.json_format import MessageToJson
    from google.pubsub_v1.types import Encoding
    
    from utilities import us_states_pb2  # type: ignore
    
    # TODO(developer): Replace these variables before running the sample.
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    publisher_client = PublisherClient()
    topic_path = publisher_client.topic_path(project_id, topic_id)
    
    try:
        # Get the topic encoding type.
        topic = publisher_client.get_topic(request={"topic": topic_path})
        encoding = topic.schema_settings.encoding
    
        # Instantiate a protoc-generated class defined in `us-states.proto`.
        state = us_states_pb2.StateProto()
        state.name = "Alaska"
        state.post_abbr = "AK"
    
        # Encode the data according to the message serialization type.
        if encoding == Encoding.BINARY:
            data = state.SerializeToString()
            print(f"Preparing a binary-encoded message:\n{data}")
        elif encoding == Encoding.JSON:
            json_object = MessageToJson(state)
            data = str(json_object).encode("utf-8")
            print(f"Preparing a JSON-encoded message:\n{data}")
        else:
            print(f"No encoding specified in {topic_path}. Abort.")
            exit(0)
    
        future = publisher_client.publish(topic_path, data)
        print(f"Published message ID: {future.result()}")
    
    except NotFound:
        print(f"{topic_id} not found.")

    Ruby

    Im folgenden Beispiel wird die Ruby-Pub/Sub-Clientbibliothek v3 verwendet. Wenn Sie noch die v2-Bibliothek verwenden, finden Sie hier die Migrationsanleitung für v3. Eine Liste der Ruby v2-Codebeispiele finden Sie unter Eingestellte Codebeispiele.

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Ruby im Schnellstart: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

    Avro
    # topic_id = "your-topic-id"
    # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
    
    pubsub = Google::Cloud::PubSub.new
    topic_admin = pubsub.topic_admin
    publisher = pubsub.publisher topic_id
    record = { "name" => "Alaska", "post_abbr" => "AK" }
    
    topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
    encoding = topic.schema_settings.encoding
    
    case encoding
    when :BINARY
      require "avro"
      avro_schema = Avro::Schema.parse File.read(avsc_file)
      writer = Avro::IO::DatumWriter.new avro_schema
      buffer = StringIO.new
      encoder = Avro::IO::BinaryEncoder.new buffer
      writer.write record, encoder
      publisher.publish buffer
      puts "Published binary-encoded AVRO message."
    when :JSON
      require "json"
      publisher.publish record.to_json
      puts "Published JSON-encoded AVRO message."
    else
      raise "No encoding specified in #{topic.name}."
    end
    Protocol Buffer
    # topic_id = "your-topic-id"
    
    pubsub = Google::Cloud::PubSub.new
    topic_admin = pubsub.topic_admin
    publisher = pubsub.publisher topic_id
    state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"
    
    topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
    encoding = topic.schema_settings.encoding
    
    case encoding
    when :BINARY
      publisher.publish Utilities::StateProto.encode(state)
      puts "Published binary-encoded protobuf message."
    when :JSON
      publisher.publish Utilities::StateProto.encode_json(state)
      puts "Published JSON-encoded protobuf message."
    else
      raise "No encoding specified in #{topic.name}."
    end

Nächste Schritte