הזרמת הודעות מ-Pub/Sub באמצעות Dataflow ו-Cloud Storage

Dataflow הוא שירות מנוהל מלא לטרנספורמציה ולהעשרה של נתונים במצב סטרימינג (בזמן אמת) ובמצב אצווה, עם אמינות וגמישות שווים. הוא מספק סביבה פשוטה לפיתוח צינורות עיבוד נתונים באמצעות Apache Beam SDK, שכולל קבוצה עשירה של פרימיטיבים של חלונות וניתוח סשנים, וגם מערכת אקולוגית של מחברי מקורות ויעדים. במדריך למתחילים הזה נסביר איך משתמשים ב-Dataflow כדי:

  • קריאת הודעות שפורסמו בנושא Pub/Sub
  • קיבוץ ההודעות לפי חותמת זמן
  • כתיבת ההודעות ל-Cloud Storage

במדריך למתחילים הזה נסביר איך משתמשים ב-Dataflow ב-Java וב-Python. אפשר להשתמש גם ב-SQL. המדריך הזה למתחילים זמין גם כמדריך ב-Google Cloud Skills Boost, שבו אפשר לקבל פרטי כניסה זמניים כדי להתחיל.

אפשר גם להתחיל להשתמש בתבניות של Dataflow שמבוססות על ממשק משתמש, אם לא מתכוונים לבצע עיבוד נתונים בהתאמה אישית.

לפני שמתחילים

  1. התקינו את ה-CLI של Google Cloud.

  2. הגדירו שה-CLI של gcloud ישתמש בזהות המאוחדת שלכם.

    איך נכנסים ל-CLI של gcloud באמצעות הזהות המאוחדת?

  3. כדי לאתחל את ה-CLI של gcloud, הריצו את הפקודה הבאה:

    gcloud init
  4. יוצרים או בוחרים Cloud de Confiance פרויקט.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים
    • יוצרים Cloud de Confiance פרויקט:

      gcloud projects create PROJECT_ID

      מחליפים את PROJECT_ID בשם של פרויקט Cloud de Confiance שיוצרים.

    • בוחרים את הפרויקט שיצרתם: Cloud de Confiance

      gcloud config set project PROJECT_ID

      מחליפים את PROJECT_ID בשם הפרויקט ב- Cloud de Confiance .

  5. מוודאים שהחיוב מופעל בפרויקט Cloud de Confiance .

  6. מפעילים את ממשקי ה-API של Dataflow,‏ Compute Engine,‏ Cloud Logging,‏ Cloud Storage,‏ Cloud de Confiance by S3NS Storage JSON API,‏ Pub/Sub,‏ Resource Manager ו-Cloud Scheduler:

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    gcloud services enable dataflow.googleapis.com  compute.googleapis.com  logging.googleapis.com  storage-component.googleapis.com  storage-api.googleapis.com  pubsub.googleapis.com  cloudresourcemanager.googleapis.com  cloudscheduler.googleapis.com
  7. מגדירים את האימות:

    1. מוודאים שיש לכם את תפקיד ה-IAM ‏Create Service Accounts ‏(roles/iam.serviceAccountCreator) ואת תפקיד ה-IAM ‏Project Admin ‏(roles/resourcemanager.projectIamAdmin). איך מקצים תפקידים
    2. יוצרים את חשבון השירות:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      מחליפים את הערך SERVICE_ACCOUNT_NAME בשם שרוצים לתת לחשבון השירות.

    3. נותנים לחשבון השירות תפקידים. מריצים את הפקודה הבאה לכל אחד מהתפקידים הבאים ב-IAM:‏ roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com" --role=ROLE

      מחליפים את מה שכתוב בשדות הבאים:

      • SERVICE_ACCOUNT_NAME: השם של חשבון השירות
      • PROJECT_ID: מזהה הפרויקט שבו יצרתם את חשבון השירות
      • ROLE: התפקיד שאתם רוצים לתת
    4. מקצים את התפקיד הנדרש לחשבון המשתמש שיצרף את חשבון השירות למשאבים אחרים.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com --member="principal://iam.googleapis.com/locations/global/workforcePools/POOL_ID/subject/SUBJECT_ID" --role=roles/iam.serviceAccountUser

      מחליפים את מה שכתוב בשדות הבאים:

      • SERVICE_ACCOUNT_NAME: השם של חשבון השירות.
      • PROJECT_ID: מזהה הפרויקט שבו יצרתם את חשבון השירות.
      • POOL_ID: מזהה מאגר הזהויות של כוח העבודה.
      • SUBJECT_ID: מזהה נושא. בדרך כלל זה המזהה של משתמש במאגר זהויות של כוח עבודה. לפרטים נוספים, קראו את המאמר ייצוג המשתמשים במאגרי כוח עבודה בכללי מדיניות IAM.
  8. יוצרים פרטי כניסה לאימות מקומי עבור חשבון המשתמש:

    gcloud auth application-default login

    אם מוחזרת שגיאת אימות ואתם משתמשים בספק זהויות חיצוני (IdP), ודאו ש נכנסתם ל-CLI של gcloud באמצעות המאגר המאוחד לניהול זהויות.

הגדרת פרויקט Pub/Sub

  1. יוצרים משתנים לקטגוריה, לפרויקט ולאזור. שמות של קטגוריות של Cloud Storage חייבים להיות ייחודיים באופן גלובלי. בוחרים אזור של Dataflow שקרוב למקום שבו מריצים את הפקודות במדריך הזה. הערך של המשתנה REGION חייב להיות שם אזור תקין. מידע נוסף על אזורים ומיקומים זמין במאמר מיקומים ב-Dataflow.

    BUCKET_NAME=BUCKET_NAME
    PROJECT_ID=$(gcloud config get-value project)
    TOPIC_ID=TOPIC_ID
    REGION=DATAFLOW_REGION
    SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.s3ns.iam.gserviceaccount.com
  2. יוצרים קטגוריה של Cloud Storage בבעלות הפרויקט הזה:

    gcloud storage buckets create gs://$BUCKET_NAME
  3. יצירת נושא Pub/Sub בפרויקט הזה:

    gcloud pubsub topics create $TOPIC_ID
  4. יוצרים משימה של Cloud Scheduler בפרויקט הזה. העבודה מפרסמת הודעה בנושא Pub/Sub במרווחי זמן של דקה.

    אם לא קיימת אפליקציית App Engine בפרויקט, בשלב הזה תיצור המערכת אפליקציה כזו.

    gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \
        --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION

    מתחילים את העבודה.

    gcloud scheduler jobs run publisher-job --location=$REGION
  5. כדי לשכפל את מאגר ההפעלה המהירה ולעבור לספריית קוד לדוגמה, מריצים את הפקודות הבאות:

    Java

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/pubsub/streaming-analytics

    Python

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    cd python-docs-samples/pubsub/streaming-analytics
    pip install -r requirements.txt  # Install Apache Beam dependencies

הזרמת הודעות מ-Pub/Sub ל-Cloud Storage

דוגמת קוד

בדוגמת הקוד הזו נעשה שימוש ב-Dataflow כדי:

  • קריאת הודעות Pub/Sub.
  • חלון (או קבוצה) של הודעות במרווחי זמן קבועים לפי חותמות הזמן של הפרסום.
  • כתיבת ההודעות בכל חלון לקבצים ב-Cloud Storage.

Java


import java.io.IOException;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

public class PubSubToGcs {
  /*
   * Define your own configuration options. Add your own arguments to be processed
   * by the command-line parser, and specify default values for them.
   */
  public interface PubSubToGcsOptions extends StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    String getInputTopic();

    void setInputTopic(String value);

    @Description("Output file's window size in number of minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Path of the output file including its filename prefix.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  public static void main(String[] args) throws IOException {
    // The maximum number of shards when writing output.
    int numShards = 1;

    PubSubToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class);

    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    pipeline
        // 1) Read string messages from a Pub/Sub topic.
        .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
        // 2) Group the messages into fixed-sized minute intervals.
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        // 3) Write one file to GCS for every window of messages.
        .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline and wait until it finishes running.
    pipeline.run().waitUntilFinish();
  }
}

Python

import argparse
from datetime import datetime
import logging
import random

from apache_beam import (
    DoFn,
    GroupByKey,
    io,
    ParDo,
    Pipeline,
    PTransform,
    WindowInto,
    WithKeys,
)
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows


class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Pub/Sub messages based on publish time
    and outputs a list of tuples, each containing a message and its publish time.
    """

    def __init__(self, window_size, num_shards=5):
        # Set window size to 60 seconds.
        self.window_size = int(window_size * 60)
        self.num_shards = num_shards

    def expand(self, pcoll):
        return (
            pcoll
            # Bind window info to each element using element timestamp (or publish time).
            | "Window into fixed intervals"
            >> WindowInto(FixedWindows(self.window_size))
            | "Add timestamp to windowed elements" >> ParDo(AddTimestamp())
            # Assign a random key to each windowed element based on the number of shards.
            | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1))
            # Group windowed elements by key. All the elements in the same window must fit
            # memory for this. If not, you need to use `beam.util.BatchElements`.
            | "Group by key" >> GroupByKey()
        )


class AddTimestamp(DoFn):
    def process(self, element, publish_time=DoFn.TimestampParam):
        """Processes each windowed element by extracting the message body and its
        publish time into a tuple.
        """
        yield (
            element.decode("utf-8"),
            datetime.utcfromtimestamp(float(publish_time)).strftime(
                "%Y-%m-%d %H:%M:%S.%f"
            ),
        )


class WriteToGCS(DoFn):
    def __init__(self, output_path):
        self.output_path = output_path

    def process(self, key_value, window=DoFn.WindowParam):
        """Write messages in a batch to Google Cloud Storage."""

        ts_format = "%H:%M"
        window_start = window.start.to_utc_datetime().strftime(ts_format)
        window_end = window.end.to_utc_datetime().strftime(ts_format)
        shard_id, batch = key_value
        filename = "-".join([self.output_path, window_start, window_end, str(shard_id)])

        with io.gcsio.GcsIO().open(filename=filename, mode="w") as f:
            for message_body, publish_time in batch:
                f.write(f"{message_body},{publish_time}\n".encode())


def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None):
    # Set `save_main_session` to True so DoFns can access globally imported modules.
    pipeline_options = PipelineOptions(
        pipeline_args, streaming=True, save_main_session=True
    )

    with Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam
            # binds the publish time returned by the Pub/Sub server for each message
            # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`.
            # https://beam.apache.org/releases/pydoc/current/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
            | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic)
            | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards)
            | "Write to GCS" >> ParDo(WriteToGCS(output_path))
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--input_topic",
        help="The Cloud Pub/Sub topic to read from."
        '"projects/<PROJECT_ID>/topics/<TOPIC_ID>".',
    )
    parser.add_argument(
        "--window_size",
        type=float,
        default=1.0,
        help="Output file's window size in minutes.",
    )
    parser.add_argument(
        "--output_path",
        help="Path of the output GCS file including the prefix.",
    )
    parser.add_argument(
        "--num_shards",
        type=int,
        default=5,
        help="Number of shards to use when writing windowed elements to GCS.",
    )
    known_args, pipeline_args = parser.parse_known_args()

    run(
        known_args.input_topic,
        known_args.output_path,
        known_args.window_size,
        known_args.num_shards,
        pipeline_args,
    )

הפעלת הפייפליין

כדי להפעיל את צינור הנתונים, מריצים את הפקודה הבאה:

Java

mvn compile exec:java \
  -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \
  -Dexec.cleanupDaemonThreads=false \
  -Dexec.args=" \
    --project=$PROJECT_ID \
    --region=$REGION \
    --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \
    --output=gs://$BUCKET_NAME/samples/output \
    --gcpTempLocation=gs://$BUCKET_NAME/temp \
    --runner=DataflowRunner \
    --windowSize=2 \
    --serviceAccount=$SERVICE_ACCOUNT"

Python

python PubSubToGCS.py \
  --project=$PROJECT_ID \
  --region=$REGION \
  --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \
  --output_path=gs://$BUCKET_NAME/samples/output \
  --runner=DataflowRunner \
  --window_size=2 \
  --num_shards=2 \
  --temp_location=gs://$BUCKET_NAME/temp \
  --service_account_email=$SERVICE_ACCOUNT

הפקודה הקודמת מופעלת באופן מקומי ומפעילה משימת Dataflow שמופעלת בענן. כשהפקודה מחזירה JOB_MESSAGE_DETAILED: Workers have started successfully, יוצאים מהתוכנית המקומית באמצעות Ctrl+C.

מעקב אחרי ההתקדמות של משימות וצינורות עיבוד נתונים

אפשר לעקוב אחר התקדמות העבודה במסוף Dataflow.

כניסה למסוף Dataflow

מעקב אחרי התקדמות העבודה

כדי לראות את הפרטים הבאים, פותחים את תצוגת פרטי המשרה:

  • מבנה התפקיד
  • יומני משרות
  • מדדים של שלבים

מעקב אחרי התקדמות העבודה

יכול להיות שתצטרכו להמתין כמה דקות עד שקבצי הפלט יופיעו ב-Cloud Storage.

מעקב אחרי התקדמות העבודה

אפשר גם להשתמש בשורת הפקודה שבהמשך כדי לבדוק אילו קבצים נכתבו.

gcloud storage ls gs://${BUCKET_NAME}/samples/

הפלט אמור להיראות כך:

Java

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1
gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1

Python

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0
gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

הסרת המשאבים

כדי לא לצבור חיובים בחשבון על המשאבים שבהם השתמשתם בדף הזה, אתם צריכים למחוק את הפרויקט יחד עם המשאבים. Cloud de Confiance Cloud de Confiance

  1. מוחקים את המשימה ב-Cloud Scheduler.

    gcloud scheduler jobs delete publisher-job --location=$REGION
  2. במסוף Dataflow, מפסיקים את המשימה. ביטול צינור עיבוד הנתונים בלי לרוקן אותו.

  3. למחוק את הנושא.

    gcloud pubsub topics delete $TOPIC_ID
  4. מוחקים את הקבצים שנוצרו על ידי צינור העברת הנתונים.

    gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error
    gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
  5. מסירים את הקטגוריה של Cloud Storage.

    gcloud storage rm gs://${BUCKET_NAME} --recursive

  6. מוחקים את חשבון השירות:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. אם תרצו, תוכלו לבטל את פרטי הכניסה שיצרתם ולמחוק את הקובץ המקומי של פרטי הכניסה.

    gcloud auth application-default revoke
  8. אם רוצים, מבטלים את פרטי הכניסה של ה-CLI של gcloud.

    gcloud auth revoke

המאמרים הבאים