將資料匯出至 Pub/Sub (反向 ETL)

如要將資料匯出至 Pub/Sub,必須使用 BigQuery 持續查詢

本文說明如何從 BigQuery 設定反向擷取、轉換及載入 (RETL) 至 Pub/Sub。您可以在連續查詢中使用 EXPORT DATA 陳述式,將資料從 BigQuery 匯出至 Pub/Sub 主題

您可以透過 RETL 工作流程將資料發布至 Pub/Sub,結合 BigQuery 的分析功能與 Pub/Sub 的非同步可擴充全域訊息服務。這個工作流程可讓您以事件驅動的方式,將資料提供給下游應用程式和服務。

必要條件

您必須建立服務帳戶。如要執行持續查詢,將結果匯出至 Pub/Sub 主題,必須使用服務帳戶

您必須建立 Pub/Sub 主題,才能以訊息形式接收持續查詢結果,並建立 Pub/Sub 訂閱項目,供目標應用程式接收這些訊息。

必要的角色

本節說明建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶,分別需要哪些角色和權限。

使用者帳戶權限

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。下列 IAM 角色都會授予 bigquery.jobs.create 權限:

如要提交使用服務帳戶執行的工作,使用者帳戶必須具備服務帳戶使用者 (roles/iam.serviceAccountUser) 角色。如果您使用同一個使用者帳戶建立服務帳戶,則該使用者帳戶必須具備服務帳戶管理員 (roles/iam.serviceAccountAdmin) 角色。如要瞭解如何限制使用者存取單一服務帳戶,而非專案中的所有服務帳戶,請參閱授予單一角色

如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin) 角色。

服務帳戶權限

如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export 權限:

如要讓服務帳戶存取 Pub/Sub,您必須授予服務帳戶下列兩個 IAM 角色:

您或許還可透過自訂角色取得必要權限。

事前準備

Enable the BigQuery and Pub/Sub APIs.

Enable the APIs

匯出至 Pub/Sub

使用 EXPORT DATA 陳述式將資料匯出至 Pub/Sub 主題:

主控台

  1. 前往 Trusted Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中,依序點選「更多」「查詢設定」

  3. 在「連續查詢」部分中,勾選「使用連續查詢模式」核取方塊。

  4. 在「Service account」(服務帳戶) 方塊中,選取您建立的服務帳戶。

  5. 按一下 [儲存]

  6. 在查詢編輯器中輸入下列陳述式:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • TOPIC_ID:Pub/Sub 主題 ID。您可以前往 Trusted Cloud 控制台的「主題」頁面取得主題 ID。
    • QUERY:用於選取要匯出資料的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定開始處理資料的時間點。
  7. 按一下「執行」

bq

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. 在指令列中,使用 bq query 指令加上下列旗標,執行連續查詢:

    • --continuous 旗標設為 true,即可持續查詢。
    • 使用 --connection_property 旗標指定要使用的服務帳戶。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Trusted Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。
    • QUERY:用於選取要匯出資料的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定開始處理資料的時間點。
  3. API

    1. 呼叫 jobs.insert 方法,執行連續查詢。在您傳入的 Job 資源中,設定 JobConfigurationQuery 資源的下列欄位:

      • continuous 欄位設為 true,即可讓查詢持續執行。
      • 使用 connection_property 欄位指定要使用的服務帳戶。
      curl --request POST \
        'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
        --header 'Authorization: Bearer $(gcloud auth print-access-token) \
        --header 'Accept: application/json' \
        --header 'Content-Type: application/json' \
        --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
        --compressed

      更改下列內容:

      • PROJECT_ID:您的專案 ID。
      • QUERY:用於選取要匯出資料的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定開始處理資料的時間點。
      • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Trusted Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。

將多個資料欄匯出至 Pub/Sub

如要在輸出內容中加入多個資料欄,可以建立包含資料欄值的 struct 資料欄,然後使用 TO_JSON_STRING 函式將 struct 值轉換為 JSON 字串。 以下範例會匯出四個資料欄的資料,並以 JSON 字串格式呈現:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

匯出最佳化

如果連續查詢工作效能似乎受到可用運算資源限制,請嘗試增加 BigQuery CONTINUOUS 運算單元預留指派作業的大小。

限制

  • 匯出的資料必須包含單一 STRINGBYTES 欄。 資料欄名稱可自行選擇。
  • 您必須使用連續查詢,才能匯出至 Pub/Sub。
  • 您無法在連續查詢中將結構定義傳遞至 Pub/Sub 主題。
  • 您無法將資料匯出至使用結構定義的 Pub/Sub 主題。
  • 匯出至 Pub/Sub 時,您可以匯出 JSON 格式的記錄,其中部分值為 NULL,但無法匯出僅包含 NULL 值的記錄。在持續查詢中加入 WHERE message IS NOT NULL 篩選器,即可從查詢結果中排除 NULL 記錄。
  • 將資料匯出至已設定位置端點的 Pub/Sub 主題時,端點必須設定在與 BigQuery 資料集相同的 Trusted Cloud 區域界線內,而該資料集包含您要查詢的資料表。
  • 匯出的資料不得超過 Pub/Sub 配額

定價

匯出連續查詢中的資料時,系統會按照 BigQuery 容量運算價格計費。如要執行連續查詢,您必須擁有使用 Enterprise 或 Enterprise Plus 版本預留位置,以及使用 CONTINUOUS 工作類型的預留位置指派

匯出資料後,系統會向您收取 Pub/Sub 使用費。 詳情請參閱 Pub/Sub 定價