יצירת נושא ייבוא ב-Cloud Storage

נושא ייבוא של Cloud Storage מאפשר לכם להטמיע נתונים מ-Cloud Storage ב-Pub/Sub באופן רציף. אחר כך תוכלו להזרים את הנתונים לכל אחד מהיעדים ש-Pub/Sub תומך בהם. ‫Pub/Sub מזהה באופן אוטומטי אובייקטים חדשים שנוספו לקטגוריה של Cloud Storage ומבצע להם הטמעה.

‫Cloud Storage הוא שירות לאחסון אובייקטים ב-Cloud de Confiance by S3NS. אובייקט הוא חלק של נתונים שלא ניתן לשינוי, שמורכב מקובץ בכל סוג של פורמט. האובייקטים מאוחסנים בקונטיינרים שנקראים קטגוריות. קטגוריות יכולות להכיל גם תיקיות מנוהלות, שמשמשות למתן גישה מורחבת לקבוצות של אובייקטים עם קידומת משותפת של שם.

מידע נוסף על Cloud Storage זמין במאמרי העזרה של Cloud Storage.

מידע נוסף על ייבוא נושאים זמין במאמר מידע על ייבוא נושאים.

לפני שמתחילים

תפקידים והרשאות נדרשים

כדי לקבל את ההרשאות שנדרשות ליצירה ולניהול של נושא ייבוא של Cloud Storage, צריך לבקש מהאדמין להקצות לכם את התפקיד עריכת Pub/Sub (roles/pubsub.editor) ב-IAM בנושא או בפרויקט. כדי לקרוא הסבר על מתן תפקידים, ראו איך מנהלים את הגישה ברמת הפרויקט, התיקייה והארגון.

זהו תפקיד מוגדר מראש עם ההרשאות שנדרשות ליצירה ולניהול של נושא ייבוא ב-Cloud Storage. כדי לראות בדיוק אילו הרשאות נדרשות, אפשר להרחיב את הקטע ההרשאות הנדרשות:

ההרשאות הנדרשות

כדי ליצור ולנהל נושא ייבוא של Cloud Storage, צריך את ההרשאות הבאות:

  • יוצרים נושא לייבוא: pubsub.topics.create
  • כדי למחוק נושא ייבוא: pubsub.topics.delete
  • קבלת נושא ייבוא: pubsub.topics.get
  • כדי להוסיף נושא לייבוא: pubsub.topics.list
  • פרסום בנושא מיובא: pubsub.topics.publish
  • כדי לעדכן נושא ייבוא: pubsub.topics.update
  • קבלת מדיניות IAM לנושא ייבוא: pubsub.topics.getIamPolicy
  • מגדירים את מדיניות ה-IAM לנושא ייבוא: pubsub.topics.setIamPolicy

יכול להיות שתקבלו את ההרשאות האלה באמצעות תפקידים בהתאמה אישית או תפקידים מוגדרים מראש אחרים.

אפשר להגדיר בקרת גישה ברמת הפרויקט וברמת המשאב הבודד.

מדיניות אחסון ההודעות תואמת למיקום של הקטגוריה

מדיניות אחסון ההודעות של נושא Pub/Sub צריכה לחפוף לאזורים שבהם ממוקמת קטגוריה של Cloud Storage. המדיניות הזו קובעת איפה מותר ל-Pub/Sub לאחסן את נתוני ההודעות שלכם.

  • לקטגוריות עם סוג המיקום 'אזור': המדיניות חייבת לכלול את האזור הספציפי הזה. לדוגמה, אם הקטגוריה נמצאת באזור us-central1, מדיניות אחסון ההודעות חייבת לכלול גם את us-central1.

  • לקטגוריות עם סוג מיקום של שני אזורים או מספר אזורים: המדיניות חייבת לכלול לפחות אזור אחד במיקום של שני אזורים או מספר אזורים. לדוגמה, אם הקטגוריה נמצאת באזור US multi-region, מדיניות אחסון ההודעות יכולה לכלול את האזורים us-central1,‏ us-east1 או כל אזור אחר בתוך US multi-region.

    אם המדיניות לא כוללת את האזור של הקטגוריה, יצירת הנושא תיכשל. לדוגמה, אם ה-bucket שלכם נמצא ב-europe-west1 ומדיניות אחסון ההודעות כוללת רק את asia-east1, תקבלו שגיאה.

    אם מדיניות אחסון ההודעות כוללת רק אזור אחד שחופף למיקום של הקטגוריה, יכול להיות שהיתירות בכמה אזורים תיפגע. הסיבה לכך היא שאם האזור היחיד הזה לא יהיה זמין, יכול להיות שלא תהיה לכם גישה לנתונים. כדי להבטיח יתירות מלאה, מומלץ לכלול לפחות שני אזורים במדיניות אחסון ההודעות, שהם חלק מהמיקום של הקטגוריה במספר אזורים או בשני אזורים.

מידע נוסף על מיקומי קטגוריות זמין במאמרי העזרה.

הפעלת הפרסום

כדי להפעיל את הפרסום, צריך להקצות את התפקיד 'פרסום הודעות ב-Pub/Sub' לחשבון השירות של Pub/Sub, כדי ש-Pub/Sub יוכל לפרסם בנושא הייבוא של Cloud Storage.

הפעלת פרסום בכל נושאי הייבוא של Cloud Storage

בוחרים באפשרות הזו אם בפרויקט שלכם אין נושא ייבוא של Cloud Storage.

  1. נכנסים לדף IAM במסוף Cloud de Confiance .

    כניסה לדף IAM

  2. מסמנים את תיבת הסימון Include S3NS-provided role grants.

  3. מחפשים את חשבון השירות של Pub/Sub בפורמט:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

  4. לוחצים על הלחצן Edit Principal (עריכת חשבון המשתמש) בחשבון השירות הרלוונטי.

  5. אם צריך, לוחצים על הוספת תפקיד נוסף.

  6. מחפשים ובוחרים את התפקיד 'פרסום הודעות ב-Pub/Sub' (roles/pubsub.publisher).

  7. לוחצים על Save.

הפעלת פרסום בנושא יבוא יחיד של Cloud Storage

כדי להעניק ל-Pub/Sub הרשאה לפרסם בנושא ספציפי של ייבוא ל-Cloud Storage שכבר קיים, פועלים לפי השלבים הבאים:

  1. במסוף Cloud de Confiance , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Cloud de Confiance המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID הוא המזהה או השם של נושא הייבוא של Cloud Storage.

    • PROJECT_NUMBER הוא מספר הפרויקט. במאמר זיהוי פרויקטים מוסבר איך לראות את מספר הפרויקט.

הקצאת תפקידים ב-Cloud Storage לחשבון השירות של Pub/Sub

כדי ליצור נושא לייבוא מ-Cloud Storage, לחשבון השירות של Pub/Sub צריכה להיות הרשאת קריאה מקטגוריית Cloud Storage ספציפית. נדרשות ההרשאות הבאות:

  • storage.objects.list
  • storage.objects.get
  • storage.buckets.get

כדי להקצות את ההרשאות האלה לחשבון השירות של Pub/Sub, בוחרים באחת מהפעולות הבאות:

  • הענקת הרשאות ברמת הקטגוריה. בקטגוריה הספציפית של Cloud Storage, מקצים לחשבון השירות של Pub/Sub את התפקידים 'קריאת אובייקטים באחסון מדור קודם' (roles/storage.legacyObjectReader) ו'קריאה בקטגוריה באחסון מדור קודם' (roles/storage.legacyBucketReader).

  • אם אתם חייבים להקצות תפקידים ברמת הפרויקט, אתם יכולים במקום זאת להקצות את התפקיד 'אדמין לניהול אחסון' (roles/storage.admin) בפרויקט שמכיל את הקטגוריה של Cloud Storage. מקצים את התפקיד הזה לחשבון השירות של Pub/Sub.

הרשאות של קטגוריות

כדי להקצות את התפקידים 'קריאת אובייקטים באחסון מדור קודם' (roles/storage.legacyObjectReader) ו'קריאה בקטגוריה באחסון מדור קודם' (roles/storage.legacyBucketReader) לחשבון השירות של Pub/Sub ברמת הקטגוריה, מבצעים את השלבים הבאים:

  1. פותחים את הדף Cloud Storage במסוף Cloud de Confiance .

    כניסה ל-Cloud Storage

  2. לוחצים על הקטגוריה של Cloud Storage שממנה רוצים לקרוא הודעות ולייבא אותן לנושא הייבוא של Cloud Storage.

    הדף Bucket details נפתח.

  3. בדף Bucket details, לוחצים על הכרטיסייה Permissions.

  4. בכרטיסייה Permissions (הרשאות) > View by Principals (הצגה לפי ישויות), לוחצים על Grant access (מתן גישה).

    ייפתח הדף הענקת גישה.

  5. בקטע Add Principals (הוספת חשבונות משתמשים), מזינים את השם של חשבון השירות של Pub/Sub.

    הפורמט של חשבון השירות הוא service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com. לדוגמה, בפרויקט עם PROJECT_NUMBER=112233445566, חשבון השירות הוא בפורמט service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  6. בתפריט הנפתח Assign roles > Select a role, מזינים Object Reader ובוחרים את התפקיד Storage Legacy Object Reader.

  7. לוחצים על הוספת תפקיד נוסף.

  8. בתפריט הנפתח Select a role, מזינים Bucket Reader ובוחרים את התפקיד Storage Legacy Bucket Reader.

  9. לוחצים על Save.

הרשאות בפרויקט

כדי להקצות את התפקיד Storage Admin ‏(roles/storage.admin) ברמת הפרויקט, מבצעים את השלבים הבאים:

  1. נכנסים לדף IAM במסוף Cloud de Confiance .

    כניסה לדף IAM

  2. בכרטיסייה Permissions (הרשאות) > View by Principals (הצגה לפי ישויות), לוחצים על Grant access (מתן גישה).

    ייפתח הדף הענקת גישה.

  3. בקטע Add Principals (הוספת חשבונות משתמשים), מזינים את השם של חשבון השירות של Pub/Sub.

    הפורמט של חשבון השירות הוא service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com. לדוגמה, בפרויקט עם PROJECT_NUMBER=112233445566, חשבון השירות הוא בפורמט service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. בתפריט הנפתח Assign roles > Select a role, מקלידים Storage Admin ובוחרים את התפקיד Storage Admin.

  5. לוחצים על Save.

מידע נוסף על IAM ב-Cloud Storage זמין במאמר ניהול זהויות והרשאות גישה ב-Cloud Storage.

מאפיינים של נושאים לייבוא ב-Cloud Storage

מידע נוסף על המאפיינים המשותפים לכל הנושאים זמין במאמר מאפיינים של נושא.

שם הקטגוריה

זהו השם של קטגוריה של Cloud Storage שממנה Pub/Sub קורא את הנתונים שמתפרסמים בנושא ייבוא של Cloud Storage.

פורמט קלט

כשיוצרים נושא ייבוא של Cloud Storage, אפשר לציין את הפורמט של האובייקטים שייקלטו כ-Text,‏ Avro או Pub/Sub Avro.

  • טקסט. ההנחה היא שהאובייקטים מכילים נתונים בטקסט פשוט. פורמט הקלט הזה מנסה להטמיע את כל האובייקטים בקטגוריה, כל עוד האובייקט עומד בזמן היצירה המינימלי של האובייקט ומתאים לקריטריונים של תבנית glob.

    תו מפריד. אפשר גם לציין תו מפריד שלפיו האובייקטים יפוצלו להודעות. אם לא מגדירים תו מפריד, ברירת המחדל היא תו מעבר לשורה (\n). התו המפריד חייב להיות תו יחיד.

  • Avro. האובייקטים הם בפורמט בינארי של Apache Avro. כל אובייקט שלא נמצא בפורמט Apache Avro תקין לא ייקלט. אלה המגבלות שקשורות ל-Avro:

    • אין תמיכה בגרסאות Avro 1.1.0 ו-1.2.0.
    • הגודל המקסימלי של בלוק Avro הוא 16MB.
  • Pub/Sub Avro. האובייקטים הם בפורמט הבינארי של Apache Avro עם סכימה שתואמת לסכימה של אובייקט שנכתב ב-Cloud Storage באמצעות מינוי ל-Cloud Storage ב-Pub/Sub עם פורמט הקובץ Avro. ריכזנו כאן כמה הנחיות חשובות לגבי Pub/Sub Avro:

    • שדה הנתונים של רשומת Avro משמש לאכלוס שדה הנתונים של הודעת Pub/Sub שנוצרה.

    • אם האפשרות write_metadata מוגדרת במינוי ל-Cloud Storage, כל הערכים בשדה המאפיינים מאוכלסים כמאפיינים של הודעת Pub/Sub שנוצרה.

    • אם מפתח סידור מצוין בהודעה המקורית שנכתבה ב-Cloud Storage, השדה הזה מאוכלס כמאפיין עם השם original_message_ordering_key בהודעת Pub/Sub שנוצרה.

זמן יצירת האובייקט המינימלי

כשיוצרים נושא לייבוא ב-Cloud Storage, אפשר לציין זמן מינימלי ליצירת אובייקט. רק אובייקטים שנוצרו בתאריך ובשעה שמופיעים בחותמת הזמן הזו או אחריהם ייקלטו. חותמת הזמן הזו צריכה להיות בפורמט כמו YYYY-MM-DDThh:mm:ssZ. כל תאריך, בעבר או בעתיד, בין 0001-01-01T00:00:00Z ל-9999-12-31T23:59:59Z כולל, הוא תקין.

התאמה לדפוס glob

כשיוצרים נושא ייבוא של Cloud Storage, אפשר לציין אופציונלית תבנית glob להתאמה. רק אובייקטים עם שמות שתואמים לתבנית הזו ייכללו בתהליך. לדוגמה, כדי להטמיע את כל האובייקטים עם הסיומת .txt, אפשר לציין את תבנית ה-glob‏ **.txt.

מידע על התחביר הנתמך של תבניות glob זמין במאמרי העזרה של Cloud Storage.

שימוש בנושאי ייבוא של Cloud Storage

אפשר ליצור נושא חדש לייבוא או לערוך נושא קיים.

לתשומת ליבכם

  • יצירת הנושא והמינוי בנפרד, גם אם היא נעשית ברצף מהיר, עלולה לגרום לאובדן נתונים. יש חלון זמן קצר שבו הנושא קיים ללא מינוי. אם נתונים יישלחו לנושא במהלך הזמן הזה, הם יאבדו. אם יוצרים את הנושא קודם, יוצרים את המינוי ואז ממירים את הנושא לנושא ייבוא, אפשר להבטיח שלא יפספסו הודעות במהלך תהליך הייבוא.

יצירת נושא ייבוא ב-Cloud Storage

כדי ליצור נושא ייבוא של Cloud Storage, מבצעים את השלבים הבאים:

המסוף

  1. נכנסים לדף Topics במסוף Cloud de Confiance .

    לדף Topics

  2. לוחצים על יצירת נושא.

    ייפתח דף הפרטים של הנושא.

  3. בשדה Topic ID (מזהה הנושא), מזינים מזהה לנושא הייבוא של Cloud Storage.

    מידע נוסף על מתן שמות לנושאים זמין בהנחיות למתן שמות.

  4. בוחרים באפשרות הוספת מינוי שמוגדר כברירת מחדל.

  5. בוחרים באפשרות הפעלת הטמעה.

  6. בקטע 'מקור ההטמעה', בוחרים באפשרות Google Cloud Storage.

  7. בקטע קטגוריה של Cloud Storage, לוחצים על Browse.

    נפתח הדף Select bucket. בוחרים באחת מהאפשרויות הבאות:

    • בוחרים דלי קיים מכל פרויקט מתאים.

    • לוחצים על סמל היצירה ופועלים לפי ההוראות במסך כדי ליצור מאגר חדש. אחרי שיוצרים את הקטגוריה, בוחרים אותה לנושא הייבוא של Cloud Storage.

  8. כשמציינים את הקטגוריה, Pub/Sub בודק אם לחשבון השירות של Pub/Sub יש את ההרשאות המתאימות בקטגוריה. אם יש בעיות בהרשאות, תוצג הודעה דומה לזו:

    Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

    אם יש בעיות בהרשאות, לוחצים על הגדרת הרשאות. מידע נוסף זמין במאמר הענקת הרשאות Cloud Storage לחשבון השירות של Pub/Sub.

  9. בשדה Object format, בוחרים באפשרות Text,‏ Avro או Pub/Sub Avro.

    אם בוחרים באפשרות טקסט, אפשר לציין תו מפריד שבאמצעותו יפוצלו האובייקטים להודעות.

    מידע נוסף על האפשרויות האלה זמין במאמר בנושא פורמט קלט.

  10. זה שינוי אופציונלי. אפשר לציין זמן מינימלי ליצירת אובייקט לנושא. אם מוגדר ערך, המערכת תבצע המרה רק של אובייקטים שנוצרו אחרי זמן היצירה המינימלי של האובייקט.

    מידע נוסף זמין במאמר בנושא זמן יצירת אובייקט מינימלי.

  11. צריך לציין תבנית Glob. כדי להטמיע את כל האובייקטים בקטגוריה, משתמשים ב-** כדפוס glob. אם המאפיין מוגדר, המערכת מבצעת המרה רק של אובייקטים שתואמים לתבנית שצוינה.

    מידע נוסף מופיע במאמר בנושא התאמה של תבנית glob.

  12. משאירים את שאר הגדרות ברירת המחדל.

  13. לוחצים על יצירת נושא.

gcloud

  1. במסוף Cloud de Confiance , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Cloud de Confiance המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. מריצים את הפקודה gcloud pubsub topics create:

    gcloud pubsub topics create TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME \
        --cloud-storage-ingestion-input-format=INPUT_FORMAT \
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
        --cloud-storage-ingestion-match-glob=MATCH_GLOB
    

    בפקודה, רק TOPIC_ID, הדגל --cloud-storage-ingestion-bucket והדגל --cloud-storage-ingestion-input-format הם חובה. שאר הדגלים הם אופציונליים ואפשר להשמיט אותם.

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID: השם או המזהה של הנושא.
    • BUCKET_NAME: ציון השם של קטגוריה קיימת. לדוגמה, prod_bucket. שם הקטגוריה לא יכול לכלול את מזהה הפרויקט. במאמר יצירת קטגוריות מוסבר איך ליצור קטגוריה.
    • INPUT_FORMAT: מציין את הפורמט של האובייקטים שמועברים. הערך יכול להיות text, avro או pubsub_avro. מידע נוסף על האפשרויות האלה זמין במאמר בנושא פורמט קלט.
    • TEXT_DELIMITER: מציין את התו המפריד שבו יש לפצל אובייקטים של טקסט להודעות Pub/Sub. התו המפריד צריך להיות תו יחיד, וצריך להגדיר אותו רק אם הערך של INPUT_FORMAT הוא text. ברירת המחדל היא תו השורה החדשה (\n).

      כשמשתמשים ב-CLI של gcloud כדי לציין את התו שמפריד בין הערכים, צריך לשים לב במיוחד לטיפול בתווים מיוחדים כמו מעבר שורה \n. כדי לוודא שהתו המפריד יפורש בצורה נכונה, צריך להשתמש בפורמט '\n'. שימוש ב-\n בלי מרכאות או בלי escape יגרום להגדרת התו "n" כתו מפריד.

    • MINIMUM_OBJECT_CREATE_TIME: מציין את הזמן המינימלי שחלף מאז שאובייקט נוצר, כדי שהוא ייכלל בתהליך ההטמעה. הערך צריך להיות בפורמט UTC‏ YYYY-MM-DDThh:mm:ssZ. לדוגמה, 2024-10-14T08:30:30Z.

      כל תאריך, בעבר או בעתיד, מ-0001-01-01T00:00:00Z עד 9999-12-31T23:59:59Z כולל, הוא תקין.

    • MATCH_GLOB: מציין את תבנית ה-glob להתאמה כדי שאובייקט ייקלט. כשמשתמשים ב-CLI של gcloud, אם מחפשים התאמה באמצעות glob עם התווים *, צריך להוסיף לפני התו * את התו * כדי לבטל את המשמעות המיוחדת שלו, כלומר להשתמש בפורמט \*\*.txt, או להוסיף את כל ה-glob של ההתאמה במירכאות "**.txt" או '**.txt'. מידע על התחביר הנתמך של תבניות glob מופיע במאמרי העזרה של Cloud Storage.

C++‎

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של C++‎ במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף זמין במאמרי העזרה של Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string bucket, std::string const& input_format,
   std::string text_delimiter, std::string match_glob,
   std::string const& minimum_object_create_time) {
  google::pubsub::v1::Topic request;
  request.set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                             ->mutable_cloud_storage();
  cloud_storage.set_bucket(std::move(bucket));
  if (input_format == "text") {
    cloud_storage.mutable_text_format()->set_delimiter(
        std::move(text_delimiter));
  } else if (input_format == "avro") {
    cloud_storage.mutable_avro_format();
  } else if (input_format == "pubsub_avro") {
    cloud_storage.mutable_pubsub_avro_format();
  } else {
    std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                 "got value: "
              << input_format << std::endl;
    return;
  }

  if (!match_glob.empty()) {
    cloud_storage.set_match_glob(std::move(match_glob));
  }

  if (!minimum_object_create_time.empty()) {
    google::protobuf::Timestamp timestamp;
    if (!google::protobuf::util::TimeUtil::FromString(
            minimum_object_create_time,
            cloud_storage.mutable_minimum_object_create_time())) {
      std::cout << "Invalid minimum object create time: "
                << minimum_object_create_time << std::endl;
    }
  }

  auto topic = client.CreateTopic(request);
  // Note that kAlreadyExists is a possible error when the library retries.
  if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The topic already exists\n";
    return;
  }
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully created: " << topic->DebugString()
            << "\n";
}

המשך

בדוגמה הבאה נעשה שימוש בגרסה הראשית של ספריית הלקוח Go Pub/Sub ‏ (v2). אם אתם עדיין משתמשים בספרייה v1, כדאי לעיין במדריך להעברה לגרסה v2. כדי לראות רשימה של דוגמאות קוד מגרסה 1, אפשר לעיין ב דוגמאות הקוד שהוצאו משימוש.

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Go במאמר מדריך למתחילים: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/timestamppb"
)

func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// bucket := "my-bucket"
	// matchGlob := "**.txt"
	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
	// delimiter := ","

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
	if err != nil {
		return err
	}

	topicpb := &pubsubpb.Topic{
		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
					Bucket: bucket,
					// Alternatively, can be Avro or PubSubAvro formats. See
					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
							Delimiter: &delimiter,
						},
					},
					MatchGlob:               matchGlob,
					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
				},
			},
		},
	}
	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
	if err != nil {
		return fmt.Errorf("CreateTopic: %w", err)
	}
	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
	return nil
}

Java

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Java במאמר התחלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Java API.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.text.ParseException;

public class CreateTopicWithCloudStorageIngestionExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Cloud Storage ingestion settings.
    // bucket and inputFormat are required arguments.
    String bucket = "your-bucket";
    String inputFormat = "text";
    String textDelimiter = "\n";
    String matchGlob = "**.txt";
    String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";

    createTopicWithCloudStorageIngestionExample(
        projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
  }

  public static void createTopicWithCloudStorageIngestionExample(
      String projectId,
      String topicId,
      String bucket,
      String inputFormat,
      String textDelimiter,
      String matchGlob,
      String minimumObjectCreateTime)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
          IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
      switch (inputFormat) {
        case "text":
          cloudStorageBuilder.setTextFormat(
              IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                  .setDelimiter(textDelimiter)
                  .build());
          break;
        case "avro":
          cloudStorageBuilder.setAvroFormat(
              IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
          break;
        case "pubsub_avro":
          cloudStorageBuilder.setPubsubAvroFormat(
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
          break;
        default:
          throw new IllegalArgumentException(
              "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
      }

      if (matchGlob != null && !matchGlob.isEmpty()) {
        cloudStorageBuilder.setMatchGlob(matchGlob);
      }

      if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
        try {
          cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
        } catch (ParseException e) {
          System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
        }
      }

      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder()
              .setCloudStorage(cloudStorageBuilder.build())
              .build();

      TopicName topicName = TopicName.of(projectId, topicId);

      Topic topic =
          topicAdminClient.createTopic(
              Topic.newBuilder()
                  .setName(topicName.toString())
                  .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                  .build());

      System.out.println(
          "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
    }
  }
}

Node.js

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId,
  bucket,
  inputFormat,
  textDelimiter,
  matchGlob,
  minimumObjectCreateTime,
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Node.ts

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Node.js במאמר הפעלה מהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const bucket = 'YOUR_BUCKET_NAME';
// const inputFormat = 'text';
// const textDelimiter = '\n';
// const matchGlob = '**.txt';
// const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createTopicWithCloudStorageIngestion(
  topicNameOrId: string,
  bucket: string,
  inputFormat: string,
  textDelimiter: string,
  matchGlob: string,
  minimumObjectCreateTime: string,
) {
  const minimumDate = Date.parse(minimumObjectCreateTime);
  const topicMetadata: TopicMetadata = {
    name: topicNameOrId,
    ingestionDataSourceSettings: {
      cloudStorage: {
        bucket,
        minimumObjectCreateTime: {
          seconds: minimumDate / 1000,
          nanos: (minimumDate % 1000) * 1000,
        },
        matchGlob,
      },
    },
  };

  // Make a format appropriately.
  switch (inputFormat) {
    case 'text':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
        delimiter: textDelimiter,
      };
      break;
    case 'avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
      break;
    case 'pubsub_avro':
      topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
        {};
      break;
    default:
      console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
      return;
  }

  // Creates a new topic with Cloud Storage ingestion.
  await pubSubClient.createTopic(topicMetadata);
  console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
}

Python

לפני שמנסים את הדוגמה הזו, צריך לפעול לפי הוראות ההגדרה של Python במאמר תחילת העבודה המהירה: שימוש בספריות לקוח. מידע נוסף מופיע במאמרי העזרה של ה-API בשפת Python של Pub/Sub.

from google.cloud import pubsub_v1
from google.protobuf import timestamp_pb2
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# bucket = "your-bucket"
# input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
# text_delimiter = "\n"
# match_glob = "**.txt"
# minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
    bucket=bucket,
)
if input_format == "text":
    cloud_storage_settings.text_format = (
        IngestionDataSourceSettings.CloudStorage.TextFormat(
            delimiter=text_delimiter
        )
    )
elif input_format == "avro":
    cloud_storage_settings.avro_format = (
        IngestionDataSourceSettings.CloudStorage.AvroFormat()
    )
elif input_format == "pubsub_avro":
    cloud_storage_settings.pubsub_avro_format = (
        IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
    )
else:
    print(
        "Invalid input_format: "
        + input_format
        + "; must be in ('text', 'avro', 'pubsub_avro')"
    )
    return

if match_glob:
    cloud_storage_settings.match_glob = match_glob

if minimum_object_create_time:
    try:
        minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
        minimum_object_create_time_timestamp.FromJsonString(
            minimum_object_create_time
        )
        cloud_storage_settings.minimum_object_create_time = (
            minimum_object_create_time_timestamp
        )
    except ValueError:
        print("Invalid minimum_object_create_time: " + minimum_object_create_time)
        return

request = Topic(
    name=topic_path,
    ingestion_data_source_settings=IngestionDataSourceSettings(
        cloud_storage=cloud_storage_settings,
    ),
)

topic = publisher.create_topic(request=request)

print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

נתקלתם בבעיות? היעזרו במאמר בנושא פתרון בעיות בייבוא של Cloud Storage.

עריכת נושא ייבוא של Cloud Storage

אתם יכולים לערוך נושא ייבוא של Cloud Storage כדי לעדכן את המאפיינים שלו.

לדוגמה, כדי להפעיל מחדש את ההטמעה, אפשר לשנות את הקטגוריה או לעדכן את הזמן המינימלי ליצירת אובייקט.

כדי לערוך נושא ייבוא של Cloud Storage:

המסוף

  1. נכנסים לדף Topics במסוף Cloud de Confiance .

    לדף Topics

  2. לוחצים על הנושא של הייבוא מ-Cloud Storage.

  3. בדף הפרטים של הנושא, לוחצים על עריכה.

  4. מעדכנים את השדות שרוצים לשנות.

  5. לוחצים על עדכון.

gcloud

  1. במסוף Cloud de Confiance , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Cloud de Confiance המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. כדי לא לאבד את ההגדרות של נושא הייבוא, חשוב לכלול את כל ההגדרות בכל פעם שמעדכנים את הנושא. אם משמיטים משהו, Pub/Sub מאפס את ההגדרה לערך ברירת המחדל המקורי שלה.

    מריצים את הפקודה gcloud pubsub topics update עם כל הדגלים שמוזכרים בדוגמה הבאה:

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    מחליפים את מה שכתוב בשדות הבאים:

    • TOPIC_ID הוא המזהה או השם של הנושא. אי אפשר לעדכן את השדה הזה.

    • BUCKET_NAME: ציון השם של קטגוריה קיימת. לדוגמה, prod_bucket. שם הקטגוריה לא יכול לכלול את מזהה הפרויקט. במאמר יצירת קטגוריות מוסבר איך ליצור קטגוריה.

    • INPUT_FORMAT: מציין את הפורמט של האובייקטים שמועברים. הערך יכול להיות text, avro או pubsub_avro. מידע נוסף על האפשרויות האלה מופיע בקטע פורמט קלט.

    • TEXT_DELIMITER: מציין את התו המפריד שבו יש לפצל אובייקטים של טקסט להודעות Pub/Sub. התו המפריד צריך להיות תו יחיד, וצריך להגדיר אותו רק אם הערך של INPUT_FORMAT הוא text. ברירת המחדל היא תו השורה החדשה (\n).

      כשמשתמשים ב-CLI של gcloud כדי לציין את התו שמפריד בין הערכים, צריך לשים לב במיוחד לטיפול בתווים מיוחדים כמו מעבר שורה \n. כדי לוודא שהתו המפריד יפורש בצורה נכונה, צריך להשתמש בפורמט '\n'. שימוש ב-\n בלי מרכאות או בלי escape יגרום להגדרת התו "n" כתו מפריד.

    • MINIMUM_OBJECT_CREATE_TIME: מציין את הזמן המינימלי שחלף מאז שאובייקט נוצר, כדי שהוא ייכלל בתהליך ההטמעה. הערך צריך להיות בפורמט UTC‏ YYYY-MM-DDThh:mm:ssZ. לדוגמה, 2024-10-14T08:30:30Z.

      כל תאריך, בעבר או בעתיד, מ-0001-01-01T00:00:00Z עד 9999-12-31T23:59:59Z כולל, הוא תקין.

    • MATCH_GLOB: מציין את תבנית ה-glob להתאמה כדי שאובייקט ייקלט. כשמשתמשים ב-CLI של gcloud, אם מחפשים התאמה באמצעות glob עם התווים *, צריך להוסיף לפני התו * את התו * כדי לבטל את המשמעות המיוחדת שלו, כלומר להשתמש בפורמט \*\*.txt, או להוסיף את כל ה-glob של ההתאמה במירכאות "**.txt" או '**.txt'. מידע על התחביר הנתמך של תבניות glob מופיע ב מסמכי Cloud Storage.

מכסות ומגבלות בנושא ייבוא ל-Cloud Storage

התפוקה של המוציא לאור בנושאים של ייבוא מוגבלת על ידי מכסת הפרסום של הנושא. מידע נוסף זמין במאמר מכסות ומגבלות ב-Pub/Sub.

המאמרים הבאים