使用 Dataflow 和 Cloud Storage 串流 Pub/Sub 訊息

Dataflow 是一項全代管服務,能夠轉換並充實串流 (即時) 模式和批次模式的資料,讓資料維持同等的穩定與明確性,並利用 Apache Beam SDK 提供簡化的管道開發環境。這個 SDK 具備多項時間區間設定與工作階段分析基元,以及來源與接收連接器生態系統。本快速入門導覽課程說明如何使用 Dataflow 執行下列作業:

  • 讀取發布至 Pub/Sub 主題的訊息
  • 依時間戳記建立訊息視窗或分組
  • 將訊息寫入 Cloud Storage

本快速入門導覽課程將介紹如何使用 Java 和 Python 中的 Dataflow。也支援 SQL。這個快速入門導覽課程也提供 Google Cloud Skills Boost 教學課程,其中提供臨時憑證,協助您開始使用。

如果您不打算進行自訂資料處理,也可以先使用以 UI 為基礎的 Dataflow 範本

事前準備

  1. 安裝 Google Cloud CLI。

  2. 設定 gcloud CLI,使用您的聯合身分。

    詳情請參閱「使用聯合身分登入 gcloud CLI」。

  3. 執行下列指令,初始化 gcloud CLI:

    gcloud init
  4. 建立或選取 Cloud de Confiance 專案

    選取或建立專案所需的角色

    • 選取專案:選取專案時,不需要具備特定 IAM 角色,只要您已獲授角色,即可選取任何專案。
    • 建立專案:如要建立專案,您需要具備專案建立者角色 (roles/resourcemanager.projectCreator),其中包含 resourcemanager.projects.create 權限。瞭解如何授予角色
    • 建立 Cloud de Confiance 專案:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替換為您要建立的 Cloud de Confiance 專案名稱。

    • 選取您建立的 Cloud de Confiance 專案:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替換為 Cloud de Confiance 專案名稱。

  5. 確認專案已啟用計費功能 Cloud de Confiance

  6. 啟用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Storage JSON API、Pub/Sub、Resource Manager 和 Cloud Scheduler API: Cloud de Confiance by S3NS

    啟用 API 時所需的角色

    如要啟用 API,您需要具備服務使用情形管理員 IAM 角色 (roles/serviceusage.serviceUsageAdmin),其中包含 serviceusage.services.enable 權限。瞭解如何授予角色

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. 設定驗證方法:

    1. 確認您具備「建立服務帳戶」身分與存取權管理角色 (roles/iam.serviceAccountCreator) 和「專案 IAM 管理員」角色 (roles/resourcemanager.projectIamAdmin)。瞭解如何授予角色
    2. 建立服務帳戶:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 換成服務帳戶的名稱。

    3. 將角色授予服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com" --role=ROLE

      請替換下列項目:

      • SERVICE_ACCOUNT_NAME:服務帳戶名稱
      • PROJECT_ID:您建立服務帳戶的專案 ID
      • ROLE:要授予的角色
    4. 將必要角色指派給要將服務帳戶附加至其他資源的主體。

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com --member="principal://iam.googleapis.com/locations/global/workforcePools/POOL_ID/subject/SUBJECT_ID" --role=roles/iam.serviceAccountUser

      更改下列內容:

      • SERVICE_ACCOUNT_NAME:服務帳戶名稱。
      • PROJECT_ID:您建立服務帳戶的專案 ID。
      • POOL_ID:員工身分集區 ID。
      • SUBJECT_ID:主旨 ID,通常是指員工身分集區中使用者的 ID。詳情請參閱「 在 IAM 政策中代表工作團隊集區使用者」。
  8. 為使用者帳戶建立本機驗證憑證:

    gcloud auth application-default login

    如果系統傳回驗證錯誤,且您使用外部識別資訊提供者 (IdP),請確認您已 使用聯合身分登入 gcloud CLI

設定 Pub/Sub 專案

  1. 為 bucket、專案和區域建立變數。 Cloud Storage bucket 名稱在全域範圍內都不可重複。選取靠近您執行本快速入門指令的 Dataflow區域REGION 變數的值必須是有效的區域名稱。如要進一步瞭解地區和位置,請參閱「Dataflow 位置」。

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com
  2. 建立這個專案擁有的 Cloud Storage bucket:

    gcloud storage buckets create gs://$BUCKET_NAME
  3. 在本專案中建立 Pub/Sub 主題:

    gcloud pubsub topics create $TOPIC_ID
  4. 在本專案建立 Cloud Scheduler 工作,這項工作會每隔一分鐘將訊息發布至 Pub/Sub 主題。

    如果專案沒有 App Engine 應用程式,這個步驟會建立一個。

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    啟動工作。

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. 使用下列指令複製快速入門存放區,並前往程式碼範例目錄:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

將訊息從 Pub/Sub 串流至 Cloud Storage

程式碼範例

這個程式碼範例會使用 Dataflow 執行下列作業:

  • 讀取 Pub/Sub 訊息。
  • 根據發布時間戳記,建立訊息視窗或將訊息分組至固定長度的時間間隔。
  • 將每個視窗中的訊息寫入 Cloud Storage 的檔案。

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows


class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )


class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())


def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

啟動管道

如要啟動管道,請執行下列指令:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

上述指令會在本地端執行,並啟動在雲端執行的 Dataflow 工作。當指令傳回 JOB_MESSAGE_DETAILED: Workers have started successfully 時,請使用 Ctrl+C 結束本機程式。

觀察工作和管道進度

您可以在 Dataflow 控制台中查看工作進度。

前往 Dataflow 控制台

觀察工作進度

開啟工作詳細資料檢視畫面,即可查看:

  • 工作結構
  • 工作記錄
  • 階段指標

觀察工作進度

您可能需要稍候幾分鐘,才能在 Cloud Storage 看到輸出檔案。

觀察工作進度

或者,使用下列指令列檢查哪些檔案已寫入。

gcloud storage ls gs://${BUCKET_NAME}/samples/

輸出內容應如下所示:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

清除所用資源

為了避免系統向您的 Cloud de Confiance 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Cloud de Confiance 專案。

  1. 刪除 Cloud Scheduler 工作。

    gcloud scheduler jobs delete publisher-job --location=$REGION
  2. 在 Dataflow 控制台中停止工作。取消管道,但不要排空管道。

  3. 刪除主題。

    gcloud pubsub topics delete $TOPIC_ID
  4. 刪除管道建立的檔案。

    gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error
    gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
  5. 移除 Cloud Storage bucket。

    gcloud storage rm gs://${BUCKET_NAME} --recursive

  6. 刪除服務帳戶:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. 選用:撤銷您建立的驗證憑證,並刪除本機憑證檔案。

    gcloud auth application-default revoke
  8. 選用:從 gcloud CLI 撤銷憑證。

    gcloud auth revoke

後續步驟