このドキュメントでは、スキーマを使用してトピックにメッセージをパブリッシュする方法について説明します。
準備
パブリッシュ ワークフローを構成する前に、次のタスクを完了していることを確認してください。
必要なロール
トピックにメッセージをパブリッシュするために必要な権限を取得するには、トピックに対するPub/Sub パブリッシャー (roles/pubsub.publisher)IAM ロールを付与するよう管理者に依頼してください。ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
トピックとサブスクリプションを作成または更新するには、追加の権限が必要です。スキーマを使用してメッセージをパブリッシュする
スキーマに関連付けられているトピックにメッセージをパブリッシュできます。メッセージは、トピックを作成したときに指定したスキーマと形式でエンコードする必要があります。メッセージは、許可されたリビジョン範囲内のスキーマのリビジョンのいずれかと一致する場合、トピックに関連付けられたスキーマと一致しますメッセージは、一致が見つかる、または許可されている最も古いリビジョンに達するまで、許可されている最新のリビジョンから順にリビジョンに対して評価されます。Pub/Sub は、スキーマに関連付けられたトピックに正常にパブリッシュされたメッセージに次の属性を追加します。
googclient_schemaname: 検証に使用されるスキーマの名前。googclient_schemaencoding: メッセージのエンコード(JSON またはバイナリ)。googclient_schemarevisionid: メッセージの解析と検証に使用されるスキーマのリビジョン ID。各リビジョンには、一意のリビジョン ID が関連付けられています。リビジョン ID は、自動生成された 8 文字の UUID です。
メッセージがトピックで許可されているスキーマ リビジョンのいずれとも一致しない場合、Pub/Sub はパブリッシュ リクエストに INVALID_ARGUMENT エラーを返します。
Pub/Sub は、パブリッシュ時にスキーマ リビジョンに対してのみメッセージを評価します。 新しいスキーマ リビジョンをコミットしたり、 トピックに関連付けられたスキーマを変更しても 、メッセージをパブリッシュした後にそのメッセージは再評価されず、 アタッチされたスキーマ メッセージ属性も変更されません。
Google Cloud プロジェクトで関連付けられたスキーマを持つトピックにメッセージをパブリッシュするには、Google Cloud コンソール、gcloud CLI、Pub/Sub API、または Cloud クライアント ライブラリを使用します。
gcloud
コンソールで Cloud Shell をアクティブにします。 Cloud de Confiance
コンソールの下部にある Cloud de Confiance Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です 。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています 。セッションが初期化されるまで数秒かかることがあります。
gcloud pubsub topics publish コマンドを使用してサンプル メッセージをパブリッシュします。
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE以下を置き換えます。
TOPIC_ID: すでに作成されているトピックの名前。
MESSAGE: トピックにパブリッシュされたメッセージ。メッセージの例は
{"name": "Alaska", "post_abbr": "AK"}です。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API リファレンス ドキュメントをご覧ください。
Avronamespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto constexpr kNewYork =
R"js({ "name": "New York", "post_abbr": "NY" })js";
auto constexpr kPennsylvania =
R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto const* data : {kNewYork, kPennsylvania}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<std::pair<std::string, std::string>> states{
{"New York", "NY"},
{"Pennsylvania", "PA"},
};
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto& data : states) {
google::cloud::pubsub::samples::State state;
state.set_name(data.first);
state.set_post_abbr(data.second);
done.push_back(publisher
.Publish(pubsub::MessageBuilder{}
.SetData(state.SerializeAsString())
.Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API リファレンス ドキュメントをご覧ください。
Avro
using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishAvroMessagesAsyncSample
{
public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
using (var ms = new MemoryStream())
{
var encoder = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(state.Schema);
writer.Write(state, encoder);
messageId = await publisher.PublishAsync(ms.ToArray());
}
break;
case Encoding.Json:
var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
// PublisherClient instance should be shutdown after use.
// The TimeSpan specifies for how long to attempt to publish locally queued messages.
await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
return publishedMessageCount;
}
}
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishProtoMessagesAsyncSample
{
public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
var binaryMessage = state.ToByteString();
messageId = await publisher.PublishAsync(binaryMessage);
break;
case Encoding.Json:
var jsonMessage = JsonFormatter.Default.Format(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
// PublisherClient instance should be shutdown after use.
// The TimeSpan specifies for how long to attempt to publish locally queued messages.
await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));
return publishedMessageCount;
}
}Go
次のサンプルでは、Go Pub/Sub クライアント ライブラリのメジャー バージョン(v2)を使用します。v1 ライブラリをまだ使用している場合は、 v2 への移行ガイドをご覧ください。 v1 コードサンプルのリストについては、 非推奨のコードサンプルをご覧ください。
このサンプルを試す前に、 クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Avroimport (
"context"
"fmt"
"io"
"os"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
"github.com/linkedin/goavro/v2"
)
func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
avroSource, err := os.ReadFile(avscFile)
if err != nil {
return fmt.Errorf("os.ReadFile err: %w", err)
}
codec, err := goavro.NewCodec(string(avroSource))
if err != nil {
return fmt.Errorf("goavro.NewCodec err: %w", err)
}
record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}
// Get the topic encoding type.
req := &pubsubpb.GetTopicRequest{
Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
}
t, err := client.TopicAdminClient.GetTopic(ctx, req)
if err != nil {
return fmt.Errorf("got err in GetTopic: %w", err)
}
encoding := t.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsubpb.Encoding_BINARY:
msg, err = codec.BinaryFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.BinaryFromNative err: %w", err)
}
case pubsubpb.Encoding_JSON:
msg, err = codec.TextualFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.TextualFromNative err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
publisher := client.Publisher(topicID)
result := publisher.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
return nil
}
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub/v2"
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func publishProtoMessages(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
state := &statepb.State{
Name: "Alaska",
PostAbbr: "AK",
}
// Get the topic encoding type.
req := &pubsubpb.GetTopicRequest{
Topic: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
}
t, err := client.TopicAdminClient.GetTopic(ctx, req)
if err != nil {
return fmt.Errorf("got err in GetTopic: %w", err)
}
encoding := t.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsubpb.Encoding_BINARY:
msg, err = proto.Marshal(state)
if err != nil {
return fmt.Errorf("proto.Marshal err: %w", err)
}
case pubsubpb.Encoding_JSON:
msg, err = protojson.Marshal(state)
if err != nil {
return fmt.Errorf("protojson.Marshal err: %w", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
// client.Publisher can be passed a topic ID (e.g. "my-topic") or
// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
// If a topic ID is provided, the project ID from the client is used.
publisher := client.Publisher(topicID)
result := publisher.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %w", err)
}
fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
return nil
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API リファレンス ドキュメントをご覧ください。
Avro
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;
public class PublishAvroRecordsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with an Avro schema.
String topicId = "your-topic-id";
publishAvroRecordsExample(projectId, topicId);
}
public static void publishAvroRecordsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
// Instantiate an avro-tools-generated class defined in `us-states.avsc`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
Publisher publisher = null;
block:
try {
publisher = Publisher.newBuilder(topicName).build();
// Prepare to serialize the object to the output stream.
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
Encoder encoder = null;
// Prepare an appropriate encoder for publishing to the topic.
switch (encoding) {
case BINARY:
System.out.println("Preparing a BINARY encoder...");
encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /* reuse= */ null);
break;
case JSON:
System.out.println("Preparing a JSON encoder...");
encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
break;
default:
break block;
}
// Encode the object and write it to the output stream.
state.customEncode(encoder);
encoder.flush();
// Publish the encoded object as a Pub/Sub message.
ByteString data = ByteString.copyFrom(byteStream.toByteArray());
PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
System.out.println("Publishing message: " + message);
ApiFuture<String> future = publisher.publish(message);
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;
public class PublishProtobufMessagesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with a proto schema.
String topicId = "your-topic-id";
publishProtobufMessagesExample(projectId, topicId);
}
public static void publishProtobufMessagesExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
Publisher publisher = null;
// Instantiate a protoc-generated class defined in `us-states.proto`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
block:
try {
publisher = Publisher.newBuilder(topicName).build();
PubsubMessage.Builder message = PubsubMessage.newBuilder();
// Prepare an appropriately formatted message based on topic encoding.
switch (encoding) {
case BINARY:
message.setData(state.toByteString());
System.out.println("Publishing a BINARY-formatted message:\n" + message);
break;
case JSON:
String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
message.setData(ByteString.copyFromUtf8(jsonString));
System.out.println("Publishing a JSON-formatted message:\n" + message);
break;
default:
break block;
}
// Publish the message.
ApiFuture<String> future = publisher.publish(message.build());
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}Node.js
このサンプルを試す前に、 クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。 詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishAvroRecords(topicNameOrId) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the protobufjs library
const protobuf = require('protobufjs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishProtobufMessages(topicNameOrId) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Protobuf message ${messageId} published.`);
}Node.js
このサンプルを試す前に、 クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。 詳細については、Pub/Sub Node.js API リファレンス ドキュメントをご覧ください。
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the Apache Avro library
import * as avro from 'avro-js';
import * as fs from 'fs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
post_abbr: string;
}
async function publishAvroRecords(topicNameOrId: string) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema encoding.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
import {PubSub, Encodings} from '@google-cloud/pubsub';
// And the protobufjs library
import * as protobuf from 'protobufjs';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
interface ProvinceObject {
name: string;
postAbbr: string;
}
async function publishProtobufMessages(topicNameOrId: string) {
// Cache topic objects (publishers) and reuse them.
const topic = pubSubClient.topic(topicNameOrId);
// Get the topic metadata to learn about its schema.
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province: ProvinceObject = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer: Buffer | undefined;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publishMessage({data: dataBuffer});
console.log(`Protobuf message ${messageId} published.`);
}PHP
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API リファレンス ドキュメントをご覧ください。
Avrouse Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroIOBinaryEncoder;
/**
* Publish a message using an AVRO schema.
*
* This sample uses `wikimedia/avro` for AVRO encoding.
*
* @param string $projectId
* @param string $topicId
* @param string $definitionFile
*/
function publish_avro_records($projectId, $topicId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$definition = (string) file_get_contents($definitionFile);
$messageData = [
'name' => 'Alaska',
'post_abbr' => 'AK',
];
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as AVRO binary.
$io = new AvroStringIO();
$schema = AvroSchema::parse($definition);
$writer = new AvroIODatumWriter($schema);
$encoder = new AvroIOBinaryEncoder($io);
$writer->write($messageData, $encoder);
$encodedMessageData = $io->string();
} else {
// encode as JSON.
$encodedMessageData = json_encode($messageData);
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use Utilities\StateProto;
/**
* Publish a message using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $topicId
* @return void
*/
function publish_proto_messages($projectId, $topicId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$messageData = new StateProto([
'name' => 'Alaska',
'post_abbr' => 'AK',
]);
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as protobuf binary.
$encodedMessageData = $messageData->serializeToString();
} else {
// encode as JSON.
$encodedMessageData = $messageData->serializeToJsonString();
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API リファレンス ドキュメントをご覧ください。
Avrofrom avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
# Prepare to write Avro records to the binary output stream.
with open(avsc_file, "rb") as file:
avro_schema = schema.parse(file.read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()
# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
encoder = BinaryEncoder(bout)
writer.write(record, encoder)
data = bout.getvalue()
print(f"Preparing a binary-encoded message:\n{data.decode()}")
elif encoding == Encoding.JSON:
data_str = json.dumps(record)
print(f"Preparing a JSON-encoded message:\n{data_str}")
data = data_str.encode("utf-8")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding
from utilities import us_states_pb2 # type: ignore
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
state.name = "Alaska"
state.post_abbr = "AK"
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
data = state.SerializeToString()
print(f"Preparing a binary-encoded message:\n{data}")
elif encoding == Encoding.JSON:
json_object = MessageToJson(state)
data = str(json_object).encode("utf-8")
print(f"Preparing a JSON-encoded message:\n{data}")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")Ruby
次のサンプルでは、Ruby Pub/Sub クライアント ライブラリ v3 を使用します。v2 ライブラリをまだ使用している場合は、 v3 への移行ガイドをご覧ください。 Ruby v2 コードサンプルのリストについては、 非推奨のコードサンプルをご覧ください。
このサンプルを試す前に、 クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。 詳細については、Pub/Sub Ruby API リファレンス ドキュメントをご覧ください。
Avro# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin
publisher = pubsub.publisher topic_id
record = { "name" => "Alaska", "post_abbr" => "AK" }
topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
encoding = topic.schema_settings.encoding
case encoding
when :BINARY
require "avro"
avro_schema = Avro::Schema.parse File.read(avsc_file)
writer = Avro::IO::DatumWriter.new avro_schema
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new buffer
writer.write record, encoder
publisher.publish buffer
puts "Published binary-encoded AVRO message."
when :JSON
require "json"
publisher.publish record.to_json
puts "Published JSON-encoded AVRO message."
else
raise "No encoding specified in #{topic.name}."
end# topic_id = "your-topic-id"
pubsub = Google::Cloud::PubSub.new
topic_admin = pubsub.topic_admin
publisher = pubsub.publisher topic_id
state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"
topic = topic_admin.get_topic topic: pubsub.topic_path(topic_id)
encoding = topic.schema_settings.encoding
case encoding
when :BINARY
publisher.publish Utilities::StateProto.encode(state)
puts "Published binary-encoded protobuf message."
when :JSON
publisher.publish Utilities::StateProto.encode_json(state)
puts "Published JSON-encoded protobuf message."
else
raise "No encoding specified in #{topic.name}."
end次のステップ
Pub/Sub がメッセージ データを保存するロケーションを制限するには、Pub/Sub リソースのロケーションの制限をご覧ください。
メッセージの受信の詳細については、サブスクリプション タイプを選択するをご覧ください。