1 対多の Pub/Sub システムを構築する

このチュートリアルでは、同期 RPC ではなく、Pub/Sub を介してメッセージを送信することによって通信する、一連のアプリケーションを設定する手順について説明します。アプリケーションを分離することにより、メッセージングには以下のようなメリットがあります。

  • アプリケーションの堅牢性が強化されます。
  • 開発をシンプルにできる可能性があります。

たとえば、呼び出し元(パブリッシャー)が受信者(サブスクライバー)を起動して稼働状態にする必要がなくなります。パブリッシャーは Pub/Sub にメッセージを送信します。 パブリッシャーが、メッセージを受信する必要があるサブスクライバー アプリケーションの種類と数を把握する必要もありません。そのため 1 つ以上のサブスクライバー アプリケーションが稼働しているときは、サービスは常にサブスクライバー アプリケーションにメッセージを配信できます。

システムの概要

このチュートリアルでは、次の図に示すように、1 対多の通信を使用して 2 つのサブスクライバーに「Hello, World!」メッセージを送信するパブリッシャー アプリケーションを起動します。

トピック、トピックに関連付けられたサブスクリプション、Cloud Pub/Sub との間でメッセージを送受信するパブリッシャー アプリケーションとサブスクライバー アプリケーションの図

2 つのサブスクライバー アプリケーションには同一のコードを使用しますが、起動するタイミングは異なります。このプロセスでは、Pub/Sub で非同期通信を有効にできます。このシステムを構築するには、次の手順を行います。

  1. アプリケーションが認証に使用する IAM サービス アカウントを作成します。
  2. IAM 権限を設定します。
  3. Pub/Sub トピックとサブスクリプションを作成する
  4. 1 つのパブリッシャーと 2 つのサブスクライバーの合計 3 つの独立したアプリケーションを起動させます。

始める前に

  1. Google Cloud CLI をインストールします。

  2. フェデレーション ID(連携 ID)を使用するように gcloud CLI を構成します。

    詳細については、連携 ID を使用して gcloud CLI にログインするをご覧ください。

  3. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  4. Cloud de Confiance プロジェクトを作成または選択します

    プロジェクトの選択または作成に必要なロール

    • プロジェクトを選択する: プロジェクトの選択に特定の IAM ロールは必要ありません。ロールが付与されているプロジェクトであれば、どのプロジェクトでも選択できます。
    • プロジェクトを作成する: プロジェクトを作成するには、resourcemanager.projects.create 権限を含むプロジェクト作成者ロール(roles/resourcemanager.projectCreator)が必要です。ロールを付与する方法を確認する
    • Cloud de Confiance プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Cloud de Confiance プロジェクトの名前に置き換えます。

    • 作成した Cloud de Confiance プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、 Cloud de Confiance プロジェクトの名前に置き換えます。

  5. Cloud de Confiance プロジェクトに対して課金が有効になっていることを確認します

  6. Pub/Sub API を有効にします。

    API を有効にするために必要なロール

    API を有効にするには、serviceusage.services.enable 権限を含む Service Usage 管理者 IAM ロール(roles/serviceusage.serviceUsageAdmin)が必要です。ロールを付与する方法を確認する

    gcloud services enable pubsub.googleapis.com
  7. ユーザー アカウントのローカル認証情報を作成します。

    gcloud auth application-default login

    認証エラーが返され、外部 ID プロバイダ(IdP)を使用している場合は、 連携 ID を使用して gcloud CLI にログインしていることを確認します。

  8. ユーザー アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    次のように置き換えます。

Python をインストールする

このチュートリアルでは、Pub/Sub クライアント ライブラリを使用します。これには Python 3.7 以降が必要です。Python をインストールする手順を完了します。

Pub/Sub プロジェクトを設定する

パブリッシュするアプリケーションとサブスクライブするアプリケーション間のメッセージ フローを管理するには、トピックと 2 つの異なるサブスクリプションを作成します。

Pub/Sub トピックの作成

ID hello_topic を含むトピックを作成します。

gcloud pubsub topics create hello_topic

Pub/Sub サブスクリプションの作成

2 つのサブスクリプションを作成して、トピックに接続します。

これらのサブスクリプションは、pull サブスクリプションの一種である StreamingPull サブスクリプションです。

サブスクリプション 1

ID sub_one を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_one --topic=hello_topic

サブスクリプション 2

ID sub_two を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_two --topic=hello_topic

1 対多システムを構築する

パブリッシャーとサブスクライバーのコードをダウンロードする

  1. このチュートリアルで必要な Pub/Sub Python ファイルをダウンロードします。

     git clone https://github.com/googleapis/python-pubsub.git
  2. 次の手順に進む前に、開いているターミナルをすべて閉じます。

3 つのターミナルを設定する

  1. 各チュートリアル アプリケーション(1 つのパブリッシャーと 2 つのサブスクライバー)につき 1 つのターミナルを起動します。便宜上、このチュートリアルではこれらのターミナルを次のように呼びます。

    • publisher ターミナル
    • sub_one ターミナル
    • sub_two ターミナル
  2. パブリッシャーのターミナルで、pyenv-qs という名前の Python 仮想環境を作成して有効にします。

    Bash

    python -m venv pyenv-qs
    source pyenv-qs/bin/activate

    PowerShell

    py -m venv pyenv-qs
    .\pyenv-qs\Scripts\activate

    sub_one ターミナルと sub_two ターミナルで、次のコマンドを実行します。

    Bash

    source pyenv-qs/bin/activate

    PowerShell

    .\pyenv-qs\Scripts\activate

    activate コマンドを実行すると、コマンド プロンプトに (pyenv-qs) $ という値が表示されます。

  3. パブリッシャー ターミナルで、pip を使用して Pub/Sub Python クライアント ライブラリをインストールします。

    python -m pip install --upgrade google-cloud-pubsub
  4. 3 つのターミナルすべてで、現在のプロジェクト ID を使用して環境変数を設定します。この gcloud コマンドは、選択したプロジェクト ID を判別し変数として設定します。

    Bash

    export PROJECT=`gcloud config get-value project`

    PowerShell

    $env:PROJECT=$(gcloud config get-value project)
  5. 3 つすべてのターミナルで、サンプルコードを含むプロジェクトパスに変更します。

    cd python-pubsub/samples/snippets/quickstart/

アプリケーションを起動してメッセージ フローを確認する

サブスクライバー 1 のアプリケーションを起動する

sub_one ターミナルでサブスクライバー 1 を起動します。

Bash

python sub.py $PROJECT sub_one

PowerShell

py sub.py $env:PROJECT sub_one

起動すると、このアプリケーションはサーバーとの双方向ストリーミング接続を確立します。Pub/Sub は、その接続を介してメッセージを配信します。

サブスクライバー 1 アプリケーションは sub_one サブスクリプションでメッセージのリッスンを開始します。

パブリッシャー アプリケーションを起動する

publisher ターミナルでパブリッシャー アプリケーションを起動します。

Bash

python pub.py $PROJECT hello_topic

PowerShell

py pub.py $env:PROJECT hello_topic

パブリッシャー アプリケーションが起動すると、Pub/Sub システムが次の処理を行います。

  • パブリッシャー アプリケーションが「Hello, World!」というメッセージを Pub/Sub に送信します。既存のサブスクリプションにはこれは認識されません。このサーバーはメッセージ ID の割り当ても行います。

  • サブスクライバー 1 アプリケーションが「Hello World」というメッセージを受信して出力し、確認応答を Pub/Sub に送信します。

  • パブリッシャー アプリケーションが確認応答を出力します。この確認応答により、メッセージの処理に成功したことと、このサブスクライバーや他の sub_one サブスクライバーに再送信する必要がないことが Pub/Sub に通知されます。

Pub/Sub によりメッセージが sub_one から削除されます。

パブリッシャー アプリケーションはメッセージをパブリッシュし、メッセージ ID を割り当てます。サブスクライバー 1 アプリケーションは「Hello World」メッセージを受信し、確認応答を送信します。

サブスクライバー 2 アプリケーションを起動する

sub_two ターミナルでサブスクライバー 2 を起動します。

Bash

python sub.py $PROJECT sub_two

PowerShell

py sub.py $env:PROJECT sub_two

このサブスクライバーは、sub_two サブスクリプションに配信されたメッセージを受信します。サブスクライバー 2 では sub.py スクリプトが再利用されます。ただし、サブスクライバー 2 はパブリッシャーがトピックとサブスクリプションにメッセージを送信するまで起動されません。パブリッシャーサブスクライバー 2 を直接呼び出した場合、パブリッシャー アプリケーションはサブスクライバー 2 が起動するまで待機するかタイムアウトする必要があります。Pub/Sub は、サブスクライバー 2 へのメッセージを効率的に保存して管理します。

サブスクライバー 2 がリッスンを開始し、リッスンのため待機していたメッセージを sub_two で受信します。

これで、Pub/Sub を使用して開発を行う準備が整いました。

いかがでしたか

Cloud Pub/Sub のサポートページにその他の参考資料やリンクがありますので、こちらもご利用ください。

クリーンアップ

  1. 実行中のすべてのアプリケーションを停止します。
  2. ローカル環境からサンプルコード ディレクトリを削除します。
  3. トピックを削除します。

    gcloud pubsub topics delete hello_topic
  4. サブスクリプションを削除します。

    gcloud pubsub subscriptions delete sub_one
    gcloud pubsub subscriptions delete sub_two
  5. Cloud de Confiance コンソールの [IAM と管理] セクションでチュートリアル プロジェクトをシャットダウンします。

  6. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  7. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ

以下のことができます。

  • GitHub でチュートリアルの pub.py コードと sub.py コードを確認し、Pub/Sub の他のサンプルを閲覧できます。演習として、1 秒間隔で現地時間をパブリッシュするバージョンの pub.py を作成します。

  • メッセージのバッチ処理の方法を学びます。

  • push サブスクリプションを使用して、App Engine エンドポイントCloud Functions をトリガーするメッセージを受信します。

  • 再生を使用して、確認応答済みのメッセージを取得します。デフォルトでは、Pub/Sub はサブスクリプションから確認応答済みのメッセージを削除します。たとえば、このチュートリアルでは sub.py を再実行して、「Hello, World!」というメッセージを再度受信することはできません。再生機能を使用すると、メッセージが確認応答された後、そのメッセージを受信するようにサブスクリプションを設定できます。

  • 他の言語のクライアント ライブラリを使用します。