向具有架构的主题发布消息

本文档介绍了如何向具有架构的主题发布消息。

准备工作

在配置发布工作流之前,请确保您已完成以下任务:

所需的角色

如需获得向主题发布消息所需的权限,请让您的管理员为您授予主题的 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

您需要获得其他权限才能创建或更新主题和订阅。

发布具有架构的消息

您可以向与架构相关联的主题发布消息。 您必须将消息编码为创建主题时指定的架构和格式。如果消息与主题关联的架构的任何修订版本(在允许的修订版本范围内)相匹配,则该消息与该架构相匹配。系统会按顺序从最新的允许修订版本开始,直到找到匹配的修订版本或到达最旧的允许修订版本为止,对消息进行修订版本评估。Pub/Sub 会将以下属性添加到成功发布到与架构相关联的主题的消息中:

  • googclient_schemaname:用于验证的架构的名称。

  • googclient_schemaencoding:消息的编码(JSON 或二进制)。

  • googclient_schemarevisionid:用于解析和验证消息的架构的修订版本 ID。每个修订版本都有一个关联的唯一修订版本 ID。修订版本 ID 是自动生成的 8 字符 UUID。

如果消息与主题允许的任何架构修订版本都不匹配,Pub/Sub 会向发布请求返回 INVALID_ARGUMENT 错误。

Pub/Sub 仅在发布时根据架构修订版本评估消息。在发布消息后提交新的架构修订版本更改与主题关联的架构不会重新评估该消息,也不会更改任何附加的架构消息属性。

您可以使用 Google Cloud 控制台、gcloud CLI、Pub/Sub API 或 Cloud 客户端库,将消息发布到 Google Cloud 项目中具有关联架构的主题。

gcloud

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 使用 gcloud pubsub topics publish 命令发布示例消息。

    gcloud pubsub topics publish TOPIC_ID \
        --message=MESSAGE
    

    替换以下内容:

    • TOPIC_ID:您已创建的主题的名称。

    • MESSAGE:已发布到主题的消息。示例消息可以是 {"name": "Alaska", "post_abbr": "AK"}

    C++

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档

    Avro
    namespace pubsub = ::google::cloud::pubsub;
    using ::google::cloud::future;
    using ::google::cloud::StatusOr;
    [](pubsub::Publisher publisher) {
      auto constexpr kNewYork =
          R"js({ "name": "New York", "post_abbr": "NY" })js";
      auto constexpr kPennsylvania =
          R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
      std::vector<future<void>> done;
      auto handler = [](future<StatusOr<std::string>> f) {
        auto id = f.get();
        if (!id) throw std::move(id).status();
      };
      for (auto const* data : {kNewYork, kPennsylvania}) {
        done.push_back(
            publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
                .then(handler));
      }
      // Block until all messages are published.
      for (auto& d : done) d.get();
    }
    Proto
    namespace pubsub = ::google::cloud::pubsub;
    using ::google::cloud::future;
    using ::google::cloud::StatusOr;
    [](pubsub::Publisher publisher) {
      std::vector<std::pair<std::string, std::string>> states{
          {"New York", "NY"},
          {"Pennsylvania", "PA"},
      };
      std::vector<future<void>> done;
      auto handler = [](future<StatusOr<std::string>> f) {
        auto id = f.get();
        if (!id) throw std::move(id).status();
      };
      for (auto& data : states) {
        google::cloud::pubsub::samples::State state;
        state.set_name(data.first);
        state.set_post_abbr(data.second);
        done.push_back(publisher
                           .Publish(pubsub::MessageBuilder{}
                                        .SetData(state.SerializeAsString())
                                        .Build())
                           .then(handler));
      }
      // Block until all messages are published.
      for (auto& d : done) d.get();
    }

    C#

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 C# 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C# API 参考文档

    Avro
    
    using Avro.IO;
    using Avro.Specific;
    using Google.Cloud.PubSub.V1;
    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    public class PublishAvroMessagesAsyncSample
    {
        public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
        {
            TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
            PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
    
            PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
            var topic = publishApi.GetTopic(topicName);
    
            int publishedMessageCount = 0;
            var publishTasks = messageStates.Select(async state =>
            {
    
                try
                {
                    string messageId = null;
                    switch (topic.SchemaSettings.Encoding)
                    {
                        case Encoding.Binary:
                            using (var ms = new MemoryStream())
                            {
                                var encoder = new BinaryEncoder(ms);
                                var writer = new SpecificDefaultWriter(state.Schema);
                                writer.Write(state, encoder);
                                messageId = await publisher.PublishAsync(ms.ToArray());
                            }
                            break;
                        case Encoding.Json:
                            var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
                            messageId = await publisher.PublishAsync(jsonMessage);
                            break;
                    }
                    Console.WriteLine($"Published message {messageId}");
                    Interlocked.Increment(ref publishedMessageCount);
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
                }
            });
            await Task.WhenAll(publishTasks);
            return publishedMessageCount;
        }
    }
    Proto
    
    using Google.Cloud.PubSub.V1;
    using Google.Protobuf;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    public class PublishProtoMessagesAsyncSample
    {
        public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
        {
            TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
            PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
    
            PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
            var topic = publishApi.GetTopic(topicName);
    
            int publishedMessageCount = 0;
            var publishTasks = messageStates.Select(async state =>
            {
                try
                {
                    string messageId = null;
                    switch (topic.SchemaSettings.Encoding)
                    {
                        case Encoding.Binary:
                            var binaryMessage = state.ToByteString();
                            messageId = await publisher.PublishAsync(binaryMessage);
                            break;
                        case Encoding.Json:
                            var jsonMessage = JsonFormatter.Default.Format(state);
                            messageId = await publisher.PublishAsync(jsonMessage);
                            break;
                    }
                    Console.WriteLine($"Published message {messageId}");
                    Interlocked.Increment(ref publishedMessageCount);
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
                }
            });
            await Task.WhenAll(publishTasks);
            return publishedMessageCount;
        }
    }

    Go

    以下示例使用 Go Pub/Sub 客户端库的主要版本 (v2)。如果您仍在使用 v1 库,请参阅迁移到 v2 的指南。如需查看 v1 代码示例的列表,请参阅 已弃用的代码示例

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档

    Avro
    import (
    	"context"
    	"fmt"
    	"io"
    	"os"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    	"github.com/linkedin/goavro/v2"
    )
    
    func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    
    	avroSource, err := os.ReadFile(avscFile)
    	if err != nil {
    		return fmt.Errorf("os.ReadFile err: %w", err)
    	}
    	codec, err := goavro.NewCodec(string(avroSource))
    	if err != nil {
    		return fmt.Errorf("goavro.NewCodec err: %w", err)
    	}
    	record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}
    
    	// Get the topic encoding type.
    	req := &pubsubpb.GetTopicRequest{
    		Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    	}
    	t, err := client.TopicAdminClient.GetTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("got err in GetTopic: %w", err)
    	}
    	encoding := t.SchemaSettings.Encoding
    
    	var msg []byte
    	switch encoding {
    	case pubsubpb.Encoding_BINARY:
    		msg, err = codec.BinaryFromNative(nil, record)
    		if err != nil {
    			return fmt.Errorf("codec.BinaryFromNative err: %w", err)
    		}
    	case pubsubpb.Encoding_JSON:
    		msg, err = codec.TextualFromNative(nil, record)
    		if err != nil {
    			return fmt.Errorf("codec.TextualFromNative err: %w", err)
    		}
    	default:
    		return fmt.Errorf("invalid encoding: %v", encoding)
    	}
    
    	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
    	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
    	// If a topic ID is provided, the project ID from the client is used.
    	publisher := client.Publisher(topicID)
    	result := publisher.Publish(ctx, &pubsub.Message{
    		Data: msg,
    	})
    	_, err = result.Get(ctx)
    	if err != nil {
    		return fmt.Errorf("result.Get: %w", err)
    	}
    	fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
    	return nil
    }
    
    Proto
    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/pubsub/v2"
    	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
    	statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
    	"google.golang.org/protobuf/encoding/protojson"
    	"google.golang.org/protobuf/proto"
    )
    
    func publishProtoMessages(w io.Writer, projectID, topicID string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %w", err)
    	}
    
    	state := &statepb.State{
    		Name:     "Alaska",
    		PostAbbr: "AK",
    	}
    
    	// Get the topic encoding type.
    	req := &pubsubpb.GetTopicRequest{
    		Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
    	}
    	t, err := client.TopicAdminClient.GetTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("got err in GetTopic: %w", err)
    	}
    	encoding := t.SchemaSettings.Encoding
    
    	var msg []byte
    	switch encoding {
    	case pubsubpb.Encoding_BINARY:
    		msg, err = proto.Marshal(state)
    		if err != nil {
    			return fmt.Errorf("proto.Marshal err: %w", err)
    		}
    	case pubsubpb.Encoding_JSON:
    		msg, err = protojson.Marshal(state)
    		if err != nil {
    			return fmt.Errorf("protojson.Marshal err: %w", err)
    		}
    	default:
    		return fmt.Errorf("invalid encoding: %v", encoding)
    	}
    
    	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
    	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
    	// If a topic ID is provided, the project ID from the client is used.
    	publisher := client.Publisher(topicID)
    	result := publisher.Publish(ctx, &pubsub.Message{
    		Data: msg,
    	})
    	_, err = result.Get(ctx)
    	if err != nil {
    		return fmt.Errorf("result.Get: %w", err)
    	}
    	fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

    Avro
    
    import com.google.api.core.ApiFuture;
    import com.google.cloud.pubsub.v1.Publisher;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.protobuf.ByteString;
    import com.google.pubsub.v1.Encoding;
    import com.google.pubsub.v1.PubsubMessage;
    import com.google.pubsub.v1.TopicName;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import org.apache.avro.io.Encoder;
    import org.apache.avro.io.EncoderFactory;
    import utilities.State;
    
    public class PublishAvroRecordsExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        // Use a topic created with an Avro schema.
        String topicId = "your-topic-id";
    
        publishAvroRecordsExample(projectId, topicId);
      }
    
      public static void publishAvroRecordsExample(String projectId, String topicId)
          throws IOException, ExecutionException, InterruptedException {
    
        Encoding encoding = null;
    
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // Get the topic encoding type.
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
        }
    
        // Instantiate an avro-tools-generated class defined in `us-states.avsc`.
        State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
    
        Publisher publisher = null;
    
        block:
        try {
          publisher = Publisher.newBuilder(topicName).build();
    
          // Prepare to serialize the object to the output stream.
          ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
    
          Encoder encoder = null;
    
          // Prepare an appropriate encoder for publishing to the topic.
          switch (encoding) {
            case BINARY:
              System.out.println("Preparing a BINARY encoder...");
              encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /* reuse= */ null);
              break;
    
            case JSON:
              System.out.println("Preparing a JSON encoder...");
              encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
              break;
    
            default:
              break block;
          }
    
          // Encode the object and write it to the output stream.
          state.customEncode(encoder);
          encoder.flush();
    
          // Publish the encoded object as a Pub/Sub message.
          ByteString data = ByteString.copyFrom(byteStream.toByteArray());
          PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
          System.out.println("Publishing message: " + message);
    
          ApiFuture<String> future = publisher.publish(message);
          System.out.println("Published message ID: " + future.get());
    
        } finally {
          if (publisher != null) {
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
          }
        }
      }
    }
    Proto
    
    import com.google.api.core.ApiFuture;
    import com.google.cloud.pubsub.v1.Publisher;
    import com.google.cloud.pubsub.v1.TopicAdminClient;
    import com.google.protobuf.ByteString;
    import com.google.protobuf.util.JsonFormat;
    import com.google.pubsub.v1.Encoding;
    import com.google.pubsub.v1.PubsubMessage;
    import com.google.pubsub.v1.TopicName;
    import java.io.IOException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import utilities.StateProto.State;
    
    public class PublishProtobufMessagesExample {
    
      public static void main(String... args) throws Exception {
        // TODO(developer): Replace these variables before running the sample.
        String projectId = "your-project-id";
        // Use a topic created with a proto schema.
        String topicId = "your-topic-id";
    
        publishProtobufMessagesExample(projectId, topicId);
      }
    
      public static void publishProtobufMessagesExample(String projectId, String topicId)
          throws IOException, ExecutionException, InterruptedException {
    
        Encoding encoding = null;
    
        TopicName topicName = TopicName.of(projectId, topicId);
    
        // Get the topic encoding type.
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
          encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
        }
    
        Publisher publisher = null;
    
        // Instantiate a protoc-generated class defined in `us-states.proto`.
        State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
    
        block:
        try {
          publisher = Publisher.newBuilder(topicName).build();
    
          PubsubMessage.Builder message = PubsubMessage.newBuilder();
    
          // Prepare an appropriately formatted message based on topic encoding.
          switch (encoding) {
            case BINARY:
              message.setData(state.toByteString());
              System.out.println("Publishing a BINARY-formatted message:\n" + message);
              break;
    
            case JSON:
              String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
              message.setData(ByteString.copyFromUtf8(jsonString));
              System.out.println("Publishing a JSON-formatted message:\n" + message);
              break;
    
            default:
              break block;
          }
    
          // Publish the message.
          ApiFuture<String> future = publisher.publish(message.build());
          System.out.println("Published message ID: " + future.get());
    
        } finally {
          if (publisher != null) {
            publisher.shutdown();
            publisher.awaitTermination(1, TimeUnit.MINUTES);
          }
        }
      }
    }

    Node.js

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

    Avro
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub, Encodings} = require('@google-cloud/pubsub');
    
    // And the Apache Avro library
    const avro = require('avro-js');
    const fs = require('fs');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function publishAvroRecords(topicNameOrId) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema encoding.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Make an encoder using the official avro-js library.
      const definition = fs
        .readFileSync('system-test/fixtures/provinces.avsc')
        .toString();
      const type = avro.parse(definition);
    
      // Encode the message.
      const province = {
        name: 'Ontario',
        post_abbr: 'ON',
      };
      let dataBuffer;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = type.toBuffer(province);
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(type.toString(province));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publish(dataBuffer);
      console.log(`Avro record ${messageId} published.`);
    }
    Protocol Buffer
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    const {PubSub, Encodings} = require('@google-cloud/pubsub');
    
    // And the protobufjs library
    const protobuf = require('protobufjs');
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    async function publishProtobufMessages(topicNameOrId) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Encode the message.
      const province = {
        name: 'Ontario',
        postAbbr: 'ON',
      };
    
      // Make an encoder using the protobufjs library.
      //
      // Since we're providing the test message for a specific schema here, we'll
      // also code in the path to a sample proto definition.
      const root = await protobuf.load('system-test/fixtures/provinces.proto');
      const Province = root.lookupType('utilities.Province');
      const message = Province.create(province);
    
      let dataBuffer;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = Buffer.from(Province.encode(message).finish());
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publishMessage({data: dataBuffer});
      console.log(`Protobuf message ${messageId} published.`);
    }

    Node.js

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Node.js 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

    Avro
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub, Encodings} from '@google-cloud/pubsub';
    
    // And the Apache Avro library
    import * as avro from 'avro-js';
    import * as fs from 'fs';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    interface ProvinceObject {
      name: string;
      post_abbr: string;
    }
    
    async function publishAvroRecords(topicNameOrId: string) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema encoding.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Make an encoder using the official avro-js library.
      const definition = fs
        .readFileSync('system-test/fixtures/provinces.avsc')
        .toString();
      const type = avro.parse(definition);
    
      // Encode the message.
      const province: ProvinceObject = {
        name: 'Ontario',
        post_abbr: 'ON',
      };
      let dataBuffer: Buffer | undefined;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = type.toBuffer(province);
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(type.toString(province));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publish(dataBuffer);
      console.log(`Avro record ${messageId} published.`);
    }
    Protocol Buffer
    /**
     * TODO(developer): Uncomment this variable before running the sample.
     */
    // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
    
    // Imports the Google Cloud client library
    import {PubSub, Encodings} from '@google-cloud/pubsub';
    
    // And the protobufjs library
    import * as protobuf from 'protobufjs';
    
    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();
    
    interface ProvinceObject {
      name: string;
      postAbbr: string;
    }
    
    async function publishProtobufMessages(topicNameOrId: string) {
      // Cache topic objects (publishers) and reuse them.
      const topic = pubSubClient.topic(topicNameOrId);
    
      // Get the topic metadata to learn about its schema.
      const [topicMetadata] = await topic.getMetadata();
      const topicSchemaMetadata = topicMetadata.schemaSettings;
    
      if (!topicSchemaMetadata) {
        console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
        return;
      }
      const schemaEncoding = topicSchemaMetadata.encoding;
    
      // Encode the message.
      const province: ProvinceObject = {
        name: 'Ontario',
        postAbbr: 'ON',
      };
    
      // Make an encoder using the protobufjs library.
      //
      // Since we're providing the test message for a specific schema here, we'll
      // also code in the path to a sample proto definition.
      const root = await protobuf.load('system-test/fixtures/provinces.proto');
      const Province = root.lookupType('utilities.Province');
      const message = Province.create(province);
    
      let dataBuffer: Buffer | undefined;
      switch (schemaEncoding) {
        case Encodings.Binary:
          dataBuffer = Buffer.from(Province.encode(message).finish());
          break;
        case Encodings.Json:
          dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
          break;
        default:
          console.log(`Unknown schema encoding: ${schemaEncoding}`);
          break;
      }
      if (!dataBuffer) {
        console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
        return;
      }
    
      const messageId = await topic.publishMessage({data: dataBuffer});
      console.log(`Protobuf message ${messageId} published.`);
    }

    PHP

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 PHP 设置说明进行操作。如需了解详情,请参阅 Pub/Sub PHP API 参考文档

    Avro
    use Google\Cloud\PubSub\PubSubClient;
    use Google\Cloud\PubSub\V1\Encoding;
    
    use AvroStringIO;
    use AvroSchema;
    use AvroIODatumWriter;
    use AvroIOBinaryEncoder;
    
    /**
     * Publish a message using an AVRO schema.
     *
     * This sample uses `wikimedia/avro` for AVRO encoding.
     *
     * @param string $projectId
     * @param string $topicId
     * @param string $definitionFile
     */
    function publish_avro_records($projectId, $topicId, $definitionFile)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
    
        $definition = (string) file_get_contents($definitionFile);
    
        $messageData = [
            'name' => 'Alaska',
            'post_abbr' => 'AK',
        ];
    
        $topic = $pubsub->topic($topicId);
    
        // get the encoding type.
        $topicInfo = $topic->info();
        $encoding = '';
        if (isset($topicInfo['schemaSettings']['encoding'])) {
            $encoding = $topicInfo['schemaSettings']['encoding'];
        }
    
        // if encoding is not set, we can't continue.
        if ($encoding === '') {
            printf('Topic %s does not have schema enabled', $topicId);
            return;
        }
    
        // If you are using gRPC, encoding may be an integer corresponding to an
        // enum value on Google\Cloud\PubSub\V1\Encoding.
        if (!is_string($encoding)) {
            $encoding = Encoding::name($encoding);
        }
    
        $encodedMessageData = '';
        if ($encoding == 'BINARY') {
            // encode as AVRO binary.
            $io = new AvroStringIO();
            $schema = AvroSchema::parse($definition);
            $writer = new AvroIODatumWriter($schema);
            $encoder = new AvroIOBinaryEncoder($io);
            $writer->write($messageData, $encoder);
    
            $encodedMessageData = $io->string();
        } else {
            // encode as JSON.
            $encodedMessageData = json_encode($messageData);
        }
    
        $topic->publish(['data' => $encodedMessageData]);
    
        printf('Published message with %s encoding', $encoding);
    }
    Protocol Buffer
    use Google\Cloud\PubSub\PubSubClient;
    use Google\Cloud\PubSub\V1\Encoding;
    
    use Utilities\StateProto;
    
    /**
     * Publish a message using a protocol buffer schema.
     *
     * Relies on a proto message of the following form:
     * ```
     * syntax = "proto3";
     *
     * package utilities;
     *
     * message StateProto {
     *   string name = 1;
     *   string post_abbr = 2;
     * }
     * ```
     *
     * @param string $projectId
     * @param string $topicId
     * @return void
     */
    function publish_proto_messages($projectId, $topicId)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
    
        $messageData = new StateProto([
            'name' => 'Alaska',
            'post_abbr' => 'AK',
        ]);
    
        $topic = $pubsub->topic($topicId);
    
        // get the encoding type.
        $topicInfo = $topic->info();
        $encoding = '';
        if (isset($topicInfo['schemaSettings']['encoding'])) {
            $encoding = $topicInfo['schemaSettings']['encoding'];
        }
    
        // if encoding is not set, we can't continue.
        if ($encoding === '') {
            printf('Topic %s does not have schema enabled', $topicId);
            return;
        }
    
        // If you are using gRPC, encoding may be an integer corresponding to an
        // enum value on Google\Cloud\PubSub\V1\Encoding.
        if (!is_string($encoding)) {
            $encoding = Encoding::name($encoding);
        }
    
        $encodedMessageData = '';
        if ($encoding == 'BINARY') {
            // encode as protobuf binary.
            $encodedMessageData = $messageData->serializeToString();
        } else {
            // encode as JSON.
            $encodedMessageData = $messageData->serializeToJsonString();
        }
    
        $topic->publish(['data' => $encodedMessageData]);
    
        printf('Published message with %s encoding', $encoding);
    }

    Python

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

    Avro
    from avro.io import BinaryEncoder, DatumWriter
    import avro.schema as schema
    import io
    import json
    from google.api_core.exceptions import NotFound
    from google.cloud.pubsub import PublisherClient
    from google.pubsub_v1.types import Encoding
    
    # TODO(developer): Replace these variables before running the sample.
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
    
    publisher_client = PublisherClient()
    topic_path = publisher_client.topic_path(project_id, topic_id)
    
    # Prepare to write Avro records to the binary output stream.
    with open(avsc_file, "rb") as file:
        avro_schema = schema.parse(file.read())
    writer = DatumWriter(avro_schema)
    bout = io.BytesIO()
    
    # Prepare some data using a Python dictionary that matches the Avro schema
    record = {"name": "Alaska", "post_abbr": "AK"}
    
    try:
        # Get the topic encoding type.
        topic = publisher_client.get_topic(request={"topic": topic_path})
        encoding = topic.schema_settings.encoding
    
        # Encode the data according to the message serialization type.
        if encoding == Encoding.BINARY:
            encoder = BinaryEncoder(bout)
            writer.write(record, encoder)
            data = bout.getvalue()
            print(f"Preparing a binary-encoded message:\n{data.decode()}")
        elif encoding == Encoding.JSON:
            data_str = json.dumps(record)
            print(f"Preparing a JSON-encoded message:\n{data_str}")
            data = data_str.encode("utf-8")
        else:
            print(f"No encoding specified in {topic_path}. Abort.")
            exit(0)
    
        future = publisher_client.publish(topic_path, data)
        print(f"Published message ID: {future.result()}")
    
    except NotFound:
        print(f"{topic_id} not found.")
    Protocol Buffer
    from google.api_core.exceptions import NotFound
    from google.cloud.pubsub import PublisherClient
    from google.protobuf.json_format import MessageToJson
    from google.pubsub_v1.types import Encoding
    
    from utilities import us_states_pb2  # type: ignore
    
    # TODO(developer): Replace these variables before running the sample.
    # project_id = "your-project-id"
    # topic_id = "your-topic-id"
    
    publisher_client = PublisherClient()
    topic_path = publisher_client.topic_path(project_id, topic_id)
    
    try:
        # Get the topic encoding type.
        topic = publisher_client.get_topic(request={"topic": topic_path})
        encoding = topic.schema_settings.encoding
    
        # Instantiate a protoc-generated class defined in `us-states.proto`.
        state = us_states_pb2.StateProto()
        state.name = "Alaska"
        state.post_abbr = "AK"
    
        # Encode the data according to the message serialization type.
        if encoding == Encoding.BINARY:
            data = state.SerializeToString()
            print(f"Preparing a binary-encoded message:\n{data}")
        elif encoding == Encoding.JSON:
            json_object = MessageToJson(state)
            data = str(json_object).encode("utf-8")
            print(f"Preparing a JSON-encoded message:\n{data}")
        else:
            print(f"No encoding specified in {topic_path}. Abort.")
            exit(0)
    
        future = publisher_client.publish(topic_path, data)
        print(f"Published message ID: {future.result()}")
    
    except NotFound:
        print(f"{topic_id} not found.")

    Ruby

    以下示例使用 Ruby Pub/Sub 客户端库 v3。如果您仍在使用 v2 库,请参阅 迁移到 v3 的指南。如需查看 Ruby v2 代码示例的列表,请参阅 已弃用的代码示例

    在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档

    Avro
    # topic_id = "your-topic-id"
    # avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
    
    pubsub = Google::Cloud::PubSub.new
    topic_admin = pubsub.topic_admin
    publisher = pubsub.publisher topic_id
    record = { "name" => "Alaska", "post_abbr" => "AK" }
    
    topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
    encoding = topic.schema_settings.encoding
    
    case encoding
    when :BINARY
      require "avro"
      avro_schema = Avro::Schema.parse File.read(avsc_file)
      writer = Avro::IO::DatumWriter.new avro_schema
      buffer = StringIO.new
      encoder = Avro::IO::BinaryEncoder.new buffer
      writer.write record, encoder
      publisher.publish buffer
      puts "Published binary-encoded AVRO message."
    when :JSON
      require "json"
      publisher.publish record.to_json
      puts "Published JSON-encoded AVRO message."
    else
      raise "No encoding specified in #{topic.name}."
    end
    Protocol Buffer
    # topic_id = "your-topic-id"
    
    pubsub = Google::Cloud::PubSub.new
    topic_admin = pubsub.topic_admin
    publisher = pubsub.publisher topic_id
    state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"
    
    topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
    encoding = topic.schema_settings.encoding
    
    case encoding
    when :BINARY
      publisher.publish Utilities::StateProto.encode(state)
      puts "Published binary-encoded protobuf message."
    when :JSON
      publisher.publish Utilities::StateProto.encode_json(state)
      puts "Published JSON-encoded protobuf message."
    else
      raise "No encoding specified in #{topic.name}."
    end

后续步骤