使用 Dataflow 和 Cloud Storage 串流處理 Pub/Sub 訊息
Dataflow 是一項全代管服務,能夠轉換並充實串流 (即時) 模式和批次模式的資料,讓資料維持同等的穩定與明確性。這個 SDK 提供豐富的時間區間設定和工作階段分析基元,以及各式來源與接收器連接器的生態系統,可簡化管道開發環境。本快速入門導覽課程說明如何使用 Dataflow 執行下列作業:
- 讀取發布至 Pub/Sub 主題的訊息
- 依時間戳記將訊息分組
- 將訊息寫入 Cloud Storage
本快速入門導覽課程將介紹如何使用 Java 和 Python 中的 Dataflow。也支援 SQL。這個快速入門導覽課程也提供 Google Cloud Skills Boost 教學課程,其中提供臨時憑證,協助您開始使用。
如果您不打算進行自訂資料處理,也可以先使用以 UI 為基礎的 Dataflow 範本。
事前準備
-
Install the Google Cloud CLI.
-
設定 gcloud CLI 以使用您的聯合身分。
詳情請參閱「 使用聯合身分登入 gcloud CLI」。
-
如要初始化 gcloud CLI,請執行下列指令:
gcloud init
-
Create or select a Trusted Cloud project.
-
Create a Trusted Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Trusted Cloud project you are creating. -
Select the Trusted Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Trusted Cloud project name.
-
-
Verify that billing is enabled for your Trusted Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Trusted Cloud by S3NS Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns-system.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns-system.iam.gserviceaccount.com --member="principal://iam.googleapis.com/locations/global/workforcePools/POOL_ID/subject/SUBJECT_ID" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service account.PROJECT_ID
: the project ID where you created the service account.POOL_ID
: a workforce identity pool ID.-
SUBJECT_ID
: a subject ID; typically the identifier for a user in a workforce identity pool. For details, see Represent workforce pool users in IAM policies.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
設定 Pub/Sub 專案
-
為 bucket、專案和區域建立變數。 Cloud Storage bucket 名稱在全域範圍內都不可重複。選取靠近您執行本快速入門指令的 Dataflow 地區。
REGION
變數的值必須是有效的區域名稱。如要進一步瞭解地區和位置,請參閱「Dataflow 位置」。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns-system.iam.gserviceaccount.com
-
建立這個專案擁有的 Cloud Storage bucket:
gcloud storage buckets create gs://$BUCKET_NAME
-
在這個專案中建立 Pub/Sub 主題:
gcloud pubsub topics create $TOPIC_ID
-
在這個專案中建立 Cloud Scheduler 工作。這項工作會以一分鐘為間隔,將訊息發布至 Pub/Sub 主題。
如果專案沒有 App Engine 應用程式,這個步驟會建立一個。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
啟動工作。
gcloud scheduler jobs run publisher-job --location=$REGION
-
使用下列指令複製快速入門存放區,並前往範例程式碼目錄:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
將 Pub/Sub 中的訊息串流至 Cloud Storage
程式碼範例
這個程式碼範例會使用 Dataflow 執行下列作業:
- 讀取 Pub/Sub 訊息。
- 依發布時間戳記,將訊息劃分為固定大小的時間間隔 (或群組)。
將每個視窗中的訊息寫入 Cloud Storage 的檔案。
Java
Python
啟動管道
如要啟動管道,請執行下列指令:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上述指令會在本地執行,並啟動在雲端執行的 Dataflow 工作。當指令傳回 JOB_MESSAGE_DETAILED: Workers
have started successfully
時,請使用 Ctrl+C
結束本機程式。
觀察工作和管道進度
您可以在 Dataflow 控制台中查看工作進度。
開啟工作詳細資料檢視畫面,即可查看:
- 工作結構
- 工作記錄
- 階段指標
您可能需要等待幾分鐘,才能在 Cloud Storage 中看到輸出檔案。
或者,您也可以使用下列指令列,檢查已寫入的檔案。
gcloud storage ls gs://${BUCKET_NAME}/samples/
輸出內容應如下所示:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
清除所用資源
如要避免系統向您的 Trusted Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Trusted Cloud 專案。
刪除 Cloud Scheduler 工作。
gcloud scheduler jobs delete publisher-job --location=$REGION
在 Dataflow 控制台中停止工作。取消管道,但不要排空管道。
刪除主題。
gcloud pubsub topics delete $TOPIC_ID
刪除管道建立的檔案。
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
移除 Cloud Storage bucket。
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
刪除服務帳戶:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
後續步驟
如要依自訂時間戳記建立 Pub/Sub 訊息視窗,您可以將時間戳記指定為 Pub/Sub 訊息中的屬性,然後搭配 PubsubIO 的
withTimestampAttribute
使用自訂時間戳記。請參閱 Google 專為串流設計的開放原始碼 Dataflow 範本。
進一步瞭解 Dataflow 如何與 Pub/Sub 整合。
請參閱這篇教學課程,瞭解如何使用 Dataflow Flex 範本從 Pub/Sub 讀取資料,並寫入 BigQuery。
如要進一步瞭解視窗化,請參閱 Apache Beam 行動遊戲管道範例。