הזרמת הודעות מ-Pub/Sub באמצעות Dataflow ו-Cloud Storage
Dataflow הוא שירות מנוהל מלא לטרנספורמציה ולהעשרה של נתונים במצב סטרימינג (בזמן אמת) ובמצב אצווה, עם אמינות וגמישות שווים. הוא מספק סביבה פשוטה לפיתוח צינורות עיבוד נתונים באמצעות Apache Beam SDK, שכולל קבוצה עשירה של פרימיטיבים של חלונות וניתוח סשנים, וגם מערכת אקולוגית של מחברי מקורות ויעדים. במדריך למתחילים הזה נסביר איך משתמשים ב-Dataflow כדי:
- קריאת הודעות שפורסמו בנושא Pub/Sub
- קיבוץ ההודעות לפי חותמת זמן
- כתיבת ההודעות ל-Cloud Storage
במדריך למתחילים הזה נסביר איך משתמשים ב-Dataflow ב-Java וב-Python. אפשר להשתמש גם ב-SQL. המדריך הזה למתחילים זמין גם כמדריך ב-Google Cloud Skills Boost, שבו אפשר לקבל פרטי כניסה זמניים כדי להתחיל.
אפשר גם להתחיל להשתמש בתבניות של Dataflow שמבוססות על ממשק משתמש, אם לא מתכוונים לבצע עיבוד נתונים בהתאמה אישית.
לפני שמתחילים
-
התקינו את ה-CLI של Google Cloud.
-
הגדירו שה-CLI של gcloud ישתמש בזהות המאוחדת שלכם.
-
כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:
gcloud init -
יוצרים או בוחרים Cloud de Confiance פרויקט.
תפקידים שנדרשים כדי לבחור או ליצור פרויקט
- Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
-
יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (
roles/resourcemanager.projectCreator), שכולל את ההרשאהresourcemanager.projects.create. איך מקצים תפקידים
-
יוצרים Cloud de Confiance פרויקט:
gcloud projects create PROJECT_ID
מחליפים את
PROJECT_IDבשם של פרויקט Cloud de Confiance שיוצרים. -
בוחרים את הפרויקט שיצרתם: Cloud de Confiance
gcloud config set project PROJECT_ID
מחליפים את
PROJECT_IDבשם הפרויקט ב- Cloud de Confiance .
מפעילים את ממשקי ה-API של Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Cloud de Confiance by S3NS Storage JSON API, Pub/Sub, Resource Manager ו-Cloud Scheduler:
תפקידים שנדרשים להפעלת ממשקי API
כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (
roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאהserviceusage.services.enable. איך מקצים תפקידיםgcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
מגדירים את האימות:
-
מוודאים שיש לכם את תפקיד ה-IAM Create Service Accounts (
roles/iam.serviceAccountCreator) ואת תפקיד ה-IAM Project Admin (roles/resourcemanager.projectIamAdmin). איך מקצים תפקידים -
יוצרים את חשבון השירות:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
מחליפים את הערך
SERVICE_ACCOUNT_NAMEבשם שרוצים לתת לחשבון השירות. -
נותנים לחשבון השירות תפקידים. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com" --role=ROLE
מחליפים את מה שכתוב בשדות הבאים:
SERVICE_ACCOUNT_NAME: השם של חשבון השירותPROJECT_ID: מזהה הפרויקט שבו יצרתם את חשבון השירותROLE: התפקיד שאתם רוצים לתת
-
מקצים את התפקיד הנדרש לחשבון המשתמש שיצרף את חשבון השירות למשאבים אחרים.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com --member="principal://iam.googleapis.com/locations/global/workforcePools/POOL_ID/subject/SUBJECT_ID" --role=roles/iam.serviceAccountUser
מחליפים את מה שכתוב בשדות הבאים:
-
SERVICE_ACCOUNT_NAME: השם של חשבון השירות. PROJECT_ID: מזהה הפרויקט שבו יצרתם את חשבון השירות.-
POOL_ID: מזהה מאגר הזהויות של כוח העבודה. -
SUBJECT_ID: מזהה נושא. בדרך כלל זה המזהה של משתמש במאגר זהויות של כוח עבודה. לפרטים נוספים, קראו את המאמר ייצוג המשתמשים במאגרי כוח עבודה בכללי מדיניות IAM.
-
-
מוודאים שיש לכם את תפקיד ה-IAM Create Service Accounts (
-
יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:
gcloud auth application-default login
אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.
הגדרת פרויקט Pub/Sub
-
יוצרים משתנים לקטגוריה, לפרויקט ולאזור. שמות של קטגוריות של Cloud Storage חייבים להיות ייחודיים באופן גלובלי. בוחרים אזור של Dataflow שקרוב למקום שבו מריצים את הפקודות במדריך הזה. הערך של המשתנה
REGIONחייב להיות שם אזור תקין. מידע נוסף על אזורים ומיקומים זמין במאמר מיקומים ב-Dataflow.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com
-
יוצרים קטגוריה של Cloud Storage בבעלות הפרויקט הזה:
gcloud storage buckets create gs://$BUCKET_NAME
-
יצירת נושא Pub/Sub בפרויקט הזה:
gcloud pubsub topics create $TOPIC_ID
-
יוצרים משימה של Cloud Scheduler בפרויקט הזה. העבודה מפרסמת הודעה בנושא Pub/Sub במרווחי זמן של דקה.
אם לא קיימת אפליקציית App Engine בפרויקט, בשלב הזה תיצור המערכת אפליקציה כזו.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
מתחילים את העבודה.
gcloud scheduler jobs run publisher-job --location=$REGION
-
כדי לשכפל את מאגר ההפעלה המהירה ולעבור לספריית קוד לדוגמה, מריצים את הפקודות הבאות:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
הזרמת הודעות מ-Pub/Sub ל-Cloud Storage
דוגמת קוד
בדוגמת הקוד הזו נעשה שימוש ב-Dataflow כדי:
- קריאת הודעות Pub/Sub.
- חלון (או קבוצה) של הודעות במרווחי זמן קבועים לפי חותמות הזמן של הפרסום.
כתיבת ההודעות בכל חלון לקבצים ב-Cloud Storage.
Java
Python
הפעלת הפייפליין
כדי להפעיל את צינור הנתונים, מריצים את הפקודה הבאה:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
הפקודה הקודמת מופעלת באופן מקומי ומפעילה משימת Dataflow שמופעלת בענן. כשהפקודה מחזירה JOB_MESSAGE_DETAILED: Workers
have started successfully, יוצאים מהתוכנית המקומית באמצעות Ctrl+C.
מעקב אחרי ההתקדמות של משימות וצינורות עיבוד נתונים
אפשר לעקוב אחר התקדמות העבודה במסוף Dataflow.
כדי לראות את הפרטים הבאים, פותחים את תצוגת פרטי המשרה:
- מבנה התפקיד
- יומני משרות
- מדדים של שלבים
יכול להיות שתצטרכו להמתין כמה דקות עד שקבצי הפלט יופיעו ב-Cloud Storage.
אפשר גם להשתמש בשורת הפקודה שבהמשך כדי לבדוק אילו קבצים נכתבו.
gcloud storage ls gs://${BUCKET_NAME}/samples/
הפלט אמור להיראות כך:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
הסרת המשאבים
כדי לא לצבור חיובים בחשבון על המשאבים שבהם השתמשתם בדף הזה, אתם צריכים למחוק את הפרויקט יחד עם המשאבים. Cloud de Confiance Cloud de Confiance
מוחקים את המשימה ב-Cloud Scheduler.
gcloud scheduler jobs delete publisher-job --location=$REGION
במסוף Dataflow, מפסיקים את המשימה. ביטול צינור עיבוד הנתונים בלי לרוקן אותו.
למחוק את הנושא.
gcloud pubsub topics delete $TOPIC_ID
מוחקים את הקבצים שנוצרו על ידי צינור העברת הנתונים.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
מסירים את הקטגוריה של Cloud Storage.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
מוחקים את חשבון השירות:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
אם תרצו, תוכלו לבטל את פרטי הכניסה שיצרתם ולמחוק את הקובץ המקומי של פרטי הכניסה.
gcloud auth application-default revoke
-
אם רוצים, מבטלים את פרטי הכניסה של ה-CLI של gcloud.
gcloud auth revoke
המאמרים הבאים
אם רוצים להגדיר חלון להודעות Pub/Sub לפי חותמת זמן מותאמת אישית, אפשר לציין את חותמת הזמן כמאפיין בהודעת Pub/Sub, ואז להשתמש בחותמת הזמן המותאמת אישית עם
withTimestampAttributeשל PubsubIO.כדאי לעיין בתבניות Dataflow בקוד פתוח של Google שמיועדות לסטרימינג.
כדאי לעיין במדריך הזה שקורא מ-Pub/Sub וכותב ל-BigQuery באמצעות תבניות גמישות של Dataflow.
מידע נוסף על חלונות זמנים זמין בדוגמה Apache Beam Mobile Gaming Pipeline.