搭配 Dataproc 使用 BigLake 中繼存放區

本文說明如何搭配使用 BigLake 中繼資料庫與 Compute Engine 上的 Dataproc。這個連線可為您提供單一共用中繼資料庫,可在 Apache Spark 或 Apache Flink 等開放原始碼軟體引擎中運作。

事前準備

  1. 為 Trusted Cloud 專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能
  2. 啟用 BigQuery 和 Dataproc API。

    啟用 API

  3. 選用:瞭解 BigLake 元資料庫的運作方式,以及為何應使用這項服務。

必要的角色

如要取得使用 Spark 或 Flink 和 Dataproc 時所需的權限,並將 BigLake 中繼資料庫做為中繼資料儲存庫,請要求管理員授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

一般工作流程

如要在 Compute Engine 上使用 Dataproc 搭配 BigLake 元資料庫,請按照下列一般步驟操作:

  1. 建立 Dataproc 叢集或設定現有叢集。
  2. 連結至您偏好的開放原始碼軟體引擎,例如 Spark 或 Flink。
  3. 使用 JAR 檔案在叢集上安裝 Apache Iceberg 目錄外掛程式。
  4. 視您使用的開放原始碼軟體引擎而定,視需要建立及管理 BigLake 中繼存放區資源。
  5. 在 BigQuery 中存取及使用 BigLake 中繼資料庫資源。

將 BigLake 中繼資料庫連結至 Spark

下列操作說明將說明如何使用互動式 Spark SQL 將 Dataproc 連結至 BigLake 中繼存放區。

下載 Iceberg 目錄外掛程式

如要將 BigLake 中繼存放區連結至 Dataproc 和 Spark,您必須使用 BigLake 中繼存放區 Iceberg 目錄外掛程式 jar 檔案。

根據預設,這個檔案會包含在 Dataproc 映像檔 2.2 版中。如果 Dataproc 叢集無法直接存取網際網路,您必須下載外掛程式,然後上傳至 Dataproc 叢集可存取的 Cloud Storage 值區。

下載 BigLake Metastore Iceberg 目錄外掛程式

設定 Dataproc 叢集

您必須先設定 Dataproc 叢集,才能連線至 BigLake Metastore。

您可以建立新叢集或使用現有叢集。之後,您可以使用這個叢集執行互動式 Spark SQL,並管理 BigLake 中繼存放區資源。

  • 建立叢集的區域中,子網路必須已啟用 私人 Google 存取權 (PGA)。根據預設,使用 2.2 (預設) 以上版本的映像檔建立的 Dataproc 叢集 VM 僅具備內部 IP 位址。如要讓叢集 VM 與 Google API 通訊,請在建立叢集的區域中,針對 default (或使用者指定的網路名稱,如適用) 網路子網路啟用私人 Google 存取權

  • 如果您要執行本指南中的 Zeppelin 網頁介面範例,請務必使用或建立已啟用 Zeppelin 選用元件的 Dataproc 叢集。

新叢集

如要建立新的 Dataproc 叢集,請執行下列 gcloud dataproc clusters create 指令。這項設定包含使用 BigLake 中繼存放區所需的設定。

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

更改下列內容:

  • CLUSTER_NAME:Dataproc 叢集的名稱。
  • PROJECT_ID:您建立叢集的 Trusted Cloud 專案 ID。
  • LOCATION:建立叢集的 Trusted Cloud 區域。

現有叢集

如要設定現有叢集,請將下列 Iceberg Spark 執行階段新增至叢集。

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

您可以使用下列其中一個選項新增執行階段:

提交 Spark 工作

如要提交 Spark 工作,請使用下列其中一種方法:

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

更改下列內容:

  • PROJECT_ID:包含 Dataproc 叢集的 Trusted Cloud 專案 ID。
  • CLUSTER_NAME:您用來執行 Spark SQL 工作 Dataproc 叢集的名稱。
  • REGION:叢集所在的 Compute Engine 地區
  • LOCATION:BigQuery 資源的位置。
  • CATALOG_NAME:您在 SQL 工作中使用的 Spark 目錄名稱。
  • WAREHOUSE_DIRECTORY:包含資料倉儲的 Cloud Storage 資料夾。這個值的開頭為 gs://
  • SPARK_SQL_COMMAND:您要執行的 Spark SQL 查詢。這項查詢包含建立資源的指令。例如建立命名空間和資料表。

互動式 Spark

連線至 Spark 並安裝目錄外掛程式

如要安裝 BigLake 中繼存放區的產品目錄外掛程式,請使用 SSH 連線至 Dataproc 叢集。

  1. 前往 Trusted Cloud 控制台的「VM Instances」(VM 執行個體) 頁面
  2. 如要連線至 Dataproc VM 執行個體,請在虛擬機器執行個體清單中按一下「SSH」SSH。輸出結果會與下列內容相似:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. 在終端機中執行下列 BigLake 元資料庫初始化指令:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    更改下列內容:

    • CATALOG_NAME:您在 SQL 工作中使用的 Spark 目錄名稱。
    • PROJECT_ID:Spark 目錄連結的 BigLake 元資料庫目錄的 Trusted Cloud 專案 ID。
    • LOCATION:BigLake 中繼資料庫的 Trusted Cloud 位置。
    • WAREHOUSE_DIRECTORY:包含資料倉儲的 Cloud Storage 資料夾。這個值的開頭為 gs://

    成功連線至叢集後,Spark 終端機會顯示 spark-sql 提示。

    spark-sql (default)>
    

管理 BigLake 元資料庫資源

您現在已連線至 BigLake 元資料庫。您可以查看現有資源,或根據儲存在 BigLake 中繼儲存庫的中繼資料建立新資源。

舉例來說,請嘗試在互動式 Spark SQL 工作階段中執行下列指令,建立 Iceberg 命名空間和資料表。

  • 使用自訂 Iceberg 目錄:

    USE `CATALOG_NAME`;
  • 建立命名空間:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • 使用已建立的命名空間:

    USE NAMESPACE_NAME;
  • 建立 Iceberg 資料表:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • 插入表格列:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • 新增表格欄:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • 查看資料表中繼資料:

    DESCRIBE EXTENDED TABLE_NAME;
  • 列出命名空間中的資料表:

    SHOW TABLES;

Zeppelin 筆記本

  1. 在 Trusted Cloud 控制台中,前往「Dataproc Clusters」(Dataproc 叢集) 頁面。

    前往 Dataproc 叢集

  2. 按一下要使用的叢集名稱。

    「Cluster Details」(叢集詳細資料) 頁面隨即開啟。

  3. 在導覽選單中,按一下「網頁介面」

  4. 在「元件閘道」下方,點選「Zeppelin」Zeppelin 筆記本頁面隨即開啟。

  5. 在導覽選單中,依序點選「Notebook」和「+ 建立新筆記」

  6. 在對話方塊中輸入記事本名稱。將「Spark」Spark設為預設轉譯器。

  7. 按一下「建立」,系統會建立新的筆記本。

  8. 在筆記本中,按一下設定選單,然後點選「轉譯器」

  9. 在「Search interpreters」欄位中,搜尋「Spark」

  10. 按一下 [編輯]

  11. 在「Spark.jars」Spark.jars欄位中,輸入 Spark jar 的 URI。

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. 按一下 [儲存]

  13. 按一下 [確定]

  14. 將下列 PySpark 程式碼複製到 Zeppelin 筆記本中。

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    更改下列內容:

    • CATALOG_NAME:要用於 SQL 工作的 Spark 目錄名稱。
    • PROJECT_ID:包含 Dataproc 叢集的 Trusted Cloud 專案 ID。
    • WAREHOUSE_DIRECTORY:包含資料倉儲的 Cloud Storage 資料夾。這個值的開頭為 gs://
    • NAMESPACE_NAME:參照 Spark 資料表的命名空間名稱。
    • WAREHOUSE_DIRECTORY:資料倉儲儲存位置的 Cloud Storage 資料夾 URI。
    • TABLE_NAME:Spark 資料表的資料表名稱。
  15. 按一下執行圖示或按下 Shift-Enter 即可執行程式碼。工作完成後,狀態訊息會顯示「Spark Job Finished」,輸出內容則會顯示表格內容:

以下操作說明說明如何使用 Flink SQL 用戶端,將 Dataproc 連線至 BigLake 元資料庫。

如要將 BigLake 中繼存放區連結至 Flink,請按照下列步驟操作:

  1. 建立啟用選用 Flink 元件的 Dataproc 叢集,並確認您使用的是 Dataproc 2.2 以上版本。
  2. 前往 Trusted Cloud 控制台的「VM instances」(VM 執行個體) 頁面

    前往 VM 執行個體

  3. 在虛擬機器執行個體清單中,按一下「SSH」SSH,連線至 Dataproc VM 執行個體。

  4. 為 BigLake Metastore 設定 Iceberg 自訂目錄外掛程式:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. 在 YARN 上啟動 Flink 工作階段:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. 在 Flink 中建立目錄:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    更改下列內容:

    • CATALOG_NAME:Flink 目錄 ID,可連結至 BigLake 元資料庫目錄。
    • WAREHOUSE_DIRECTORY:倉庫目錄 (Flink 建立檔案的 Cloud Storage 資料夾) 的基礎路徑。這個值的開頭為 gs://
    • PROJECT_ID:Flink 目錄連結的 BigLake 中繼資料目錄專案 ID。
    • LOCATION:BigQuery 資源的位置

Flink 工作階段現在已連線至 BigLake 元資料庫,您可以執行 Flink SQL 指令。

連結至 BigLake 中繼存放區後,您就可以根據儲存在 BigLake 中繼存放區的中繼資料,建立及查看資源。

舉例來說,請嘗試在互動式 Flink SQL 工作階段中執行下列指令,建立 Iceberg 資料庫和資料表。

  1. 使用自訂 Iceberg 目錄:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME 替換為您的 Flink 目錄 ID。

  2. 建立資料庫,在 BigQuery 中建立資料集:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME 替換為新資料庫的名稱。

  3. 使用您建立的資料庫:

    USE DATABASE_NAME;
  4. 建立 Iceberg 資料表。以下是建立範例銷售資料表的程式碼:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME 替換為新資料表的名稱。

  5. 查看資料表中繼資料:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. 列出資料庫中的資料表:

    SHOW TABLES;

將資料擷取至資料表

在上一節中建立 Iceberg 資料表後,您可以使用 Flink DataGen 做為資料來源,將即時資料取入資料表。以下步驟是這個工作流程的範例:

  1. 使用 DataGen 建立臨時表格:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    更改下列內容:

    • DATABASE_NAME:儲存臨時表格的資料庫名稱。
    • TEMP_TABLE_NAME:臨時資料表的名稱。
    • ICEBERG_TABLE_NAME:您在先前章節建立的 Iceberg 資料表名稱。
  2. 將平行處理數設為 1:

    SET 'parallelism.default' = '1';
  3. 設定檢查點間隔:

    SET 'execution.checkpointing.interval' = '10second';
  4. 設定檢查點:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. 啟動即時串流工作:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    輸出結果會與下列內容相似:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. 如要查看串流工作的狀態,請按照下列步驟操作:

    1. 前往 Trusted Cloud 控制台的「叢集」頁面。

      前往「叢集」

    2. 選取您的叢集。

    3. 按一下「網路介面」分頁標籤。

    4. 按一下「YARN ResourceManager」連結。

    5. YARN ResourceManager 介面中找出 Flink 工作階段,然後按一下「Tracking UI」下方的「ApplicationMaster」連結。

    6. 在「狀態」欄中,確認工作狀態為「執行中」

  7. 在 Flink SQL 用戶端中查詢串流資料:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. 在 BigQuery 中查詢串流資料:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. 在 Flink SQL 用戶端中終止串流工作:

    STOP JOB 'JOB_ID';

    JOB_ID 替換為建立串流工作時輸出內容中顯示的工作 ID。

後續步驟