Melakukan streaming pesan dari Pub/Sub menggunakan Dataflow dan Cloud Storage
Dataflow adalah layanan terkelola sepenuhnya untuk mentransformasi dan memperkaya data dalam mode streaming (real-time) dan batch dengan keandalan dan kecepatan yang sama. Layanan ini menyediakan lingkungan pengembangan pipeline yang disederhanakan menggunakan Apache Beam SDK, yang memiliki serangkaian primitif analisis sesi dan windowing yang lengkap, serta ekosistem konektor sumber dan sink. Panduan memulai ini menunjukkan cara menggunakan Dataflow untuk:
- Membaca pesan yang dipublikasikan ke topik Pub/Sub
- Mengelompokkan pesan menurut stempel waktu
- Tulis pesan ke Cloud Storage
Panduan memulai ini memperkenalkan cara menggunakan Dataflow di Java dan Python. SQL juga didukung. Panduan memulai ini juga ditawarkan sebagai tutorial Google Cloud Skills Boost yang menawarkan kredensial sementara untuk membantu Anda memulai.
Anda juga dapat memulai dengan menggunakan template Dataflow berbasis UI jika Anda tidak berniat melakukan pemrosesan data kustom.
Sebelum memulai
-
Install the Google Cloud CLI.
-
Konfigurasi gcloud CLI untuk menggunakan identitas gabungan Anda.
Untuk mengetahui informasi selengkapnya, lihat Login ke gcloud CLI dengan identitas gabungan Anda.
-
Untuk melakukan inisialisasi gcloud CLI, jalankan perintah berikut:
gcloud init
-
Create or select a Trusted Cloud project.
-
Create a Trusted Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Trusted Cloud project you are creating. -
Select the Trusted Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Trusted Cloud project name.
-
-
Verify that billing is enabled for your Trusted Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Trusted Cloud by S3NS Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns-system.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns-system.iam.gserviceaccount.com --member="principal://iam.googleapis.com/locations/global/workforcePools/POOL_ID/subject/SUBJECT_ID" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service account.PROJECT_ID
: the project ID where you created the service account.POOL_ID
: a workforce identity pool ID.-
SUBJECT_ID
: a subject ID; typically the identifier for a user in a workforce identity pool. For details, see Represent workforce pool users in IAM policies.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Menyiapkan project Pub/Sub
-
Buat variabel untuk bucket, project, dan region Anda. Nama bucket Cloud Storage harus unik secara global. Pilih region Dataflow yang dekat dengan tempat Anda menjalankan perintah dalam panduan memulai ini. Nilai variabel
REGION
harus berupa nama wilayah yang valid. Untuk mengetahui informasi selengkapnya tentang region dan lokasi, lihat Lokasi Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns-system.iam.gserviceaccount.com
-
Buat bucket Cloud Storage yang dimiliki oleh project ini:
gcloud storage buckets create gs://$BUCKET_NAME
-
Buat topik Pub/Sub di project ini:
gcloud pubsub topics create $TOPIC_ID
-
Buat tugas Cloud Scheduler di project ini. Tugas memublikasikan pesan ke topik Pub/Sub dengan interval satu menit.
Jika aplikasi App Engine tidak ada untuk project, langkah ini akan membuatnya.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
Mulai tugas.
gcloud scheduler jobs run publisher-job --location=$REGION
-
Gunakan perintah berikut untuk meng-clone repositori panduan memulai dan membuka direktori kode contoh:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Streaming pesan dari Pub/Sub ke Cloud Storage
Contoh kode
Kode contoh ini menggunakan Dataflow untuk:
- Membaca pesan Pub/Sub.
- Mengelompokkan pesan ke dalam interval berukuran tetap berdasarkan stempel waktu publikasi.
Tulis pesan di setiap jendela ke file di Cloud Storage.
Java
Python
Mulai pipeline
Untuk memulai pipeline, jalankan perintah berikut:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
Perintah sebelumnya berjalan secara lokal dan meluncurkan tugas Dataflow
yang berjalan di cloud. Saat perintah menampilkan JOB_MESSAGE_DETAILED: Workers
have started successfully
, keluar dari program lokal menggunakan Ctrl+C
.
Mengamati progres tugas dan pipeline
Anda dapat mengamati progres tugas di konsol Dataflow.
Buka tampilan detail tugas untuk melihat:
- Struktur tugas
- Log tugas
- Metrik tahap
Anda mungkin harus menunggu beberapa menit untuk melihat file output di Cloud Storage.
Atau, gunakan command line di bawah untuk memeriksa file mana yang telah ditulis.
gcloud storage ls gs://${BUCKET_NAME}/samples/
Outputnya akan terlihat seperti berikut ini:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
Pembersihan
Agar tidak menimbulkan biaya pada akun Trusted Cloud Anda untuk resource yang digunakan di halaman ini, hapus project Trusted Cloud yang berisi resource tersebut.
Hapus tugas Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
Di konsol Dataflow, hentikan tugas. Membatalkan pipeline tanpa mengurasnya.
Hapus topik.
gcloud pubsub topics delete $TOPIC_ID
Hapus file yang dibuat oleh pipeline.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Hapus bucket Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
Hapus akun layanan:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
Langkah berikutnya
Jika ingin mengelompokkan pesan Pub/Sub berdasarkan stempel waktu kustom, Anda dapat menentukan stempel waktu sebagai atribut dalam pesan Pub/Sub, lalu menggunakan stempel waktu kustom dengan
withTimestampAttribute
PubsubIO.Lihat template Dataflow open source Google yang dirancang untuk streaming.
Baca selengkapnya tentang cara Dataflow terintegrasi dengan Pub/Sub.
Lihat tutorial ini yang membaca dari Pub/Sub dan menulis ke BigQuery menggunakan template Dataflow Flex.
Untuk mengetahui informasi selengkapnya tentang windowing, lihat contoh Pipeline Game Seluler Apache Beam.