設定 BigLake metastore

本文說明如何使用 DataprocTrusted Cloud by S3NS Serverless for Apache Spark 設定 BigLake metastore,建立可供 Apache Spark 或 Apache Flink 等開放原始碼引擎使用的單一共用 metastore。

事前準備

  1. 為 Trusted Cloud 專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能
  2. 啟用 BigQuery 和 Dataproc API。

    啟用 API

  3. 選用:瞭解 BigLake 中繼資料存放區的運作方式,以及使用該存放區的原因。

必要的角色

如要取得設定 BigLake Metastore 所需的權限,請要求管理員授予下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

使用 Dataproc 設定中繼存放區

您可以使用 Spark 或 Flink,透過 Dataproc 設定 BigLake metastore:

Spark

  1. 設定新叢集。如要建立新的 Dataproc 叢集,請執行下列 gcloud dataproc clusters create 指令,其中包含使用 BigLake Metastore 的必要設定:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    更改下列內容:

    • CLUSTER_NAME:Dataproc 叢集的名稱。
    • PROJECT_ID:您要建立叢集的 Trusted Cloud 專案 ID。
    • LOCATION:您要建立叢集的 Compute Engine 區域。
  2. 使用下列任一方法提交 Spark 工作:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    更改下列內容:

    • PROJECT_ID:包含 Dataproc 叢集的 Trusted Cloud 專案 ID。
    • CLUSTER_NAME:用於執行 Spark SQL 工作的 Dataproc 叢集名稱。
    • REGION:叢集所在的 Compute Engine 區域
    • LOCATION:BigQuery 資源的位置。
    • CATALOG_NAME:要用於 SQL 作業的 Spark 目錄名稱。
    • WAREHOUSE_DIRECTORY:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為 gs://
    • SPARK_SQL_COMMAND:要執行的 Spark SQL 查詢。這項查詢包含建立資源的指令。舉例來說,如要建立命名空間和資料表。

    spark-sql CLI

    1. 前往 Trusted Cloud 控制台的「VM Instances」(VM 執行個體) 頁面

      前往「VM instances」(VM 執行個體) 頁面

    2. 如要連線至 Dataproc VM 執行個體,請在列出 Dataproc 叢集主要 VM 執行個體名稱的資料列中,按一下「SSH」SSH。主要 VM 執行個體名稱是叢集名稱,後面加上 -m 尾碼。輸出結果會與下列內容相似:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. 在終端機中執行下列 BigLake 中繼存放區初始化指令:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      更改下列內容:

      • CATALOG_NAME:您在 SQL 工作中使用的 Spark 目錄名稱。
      • PROJECT_ID:BigLake metastore 目錄的 Trusted Cloud 專案 ID,Spark 目錄會連結至該目錄。
      • LOCATION:BigLake Metastore 的位置。 Trusted Cloud
      • WAREHOUSE_DIRECTORY:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為 gs://

      成功連線至叢集後,Spark 終端機會顯示 spark-sql 提示,您可以使用該提示提交 Spark 工作。

      spark-sql (default)>
      
  1. 建立已啟用選用 Flink 元件的 Dataproc 叢集,並確認您使用的是 Dataproc 2.2 以上版本。
  2. 前往 Trusted Cloud 控制台的「VM instances」(VM 執行個體) 頁面

    前往 VM 執行個體

  3. 在虛擬機器執行個體清單中,按一下「SSH」SSH,連線至主要的 Dataproc 叢集 VM 執行個體,該執行個體會列為叢集名稱,後方加上 -m 後置字串。

  4. 為 BigLake Metastore 設定 Iceberg 自訂目錄外掛程式:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. 在 YARN 上啟動 Flink 工作階段:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. 在 Flink 中建立目錄:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    更改下列內容:

    • CATALOG_NAME:Flink 目錄 ID,連結至 BigLake Metastore 目錄。
    • WAREHOUSE_DIRECTORY:倉庫目錄的基本路徑 (Flink 建立檔案的 Cloud Storage 資料夾)。這個值開頭為 gs://
    • PROJECT_ID:Flink 目錄連結的 BigLake metastore 目錄專案 ID。
    • LOCATION:BigQuery 資源的位置

Flink 工作階段現已連線至 BigLake 中繼資料存放區,您可以執行 Flink SQL 指令。

連線至 BigLake Metastore 後,您就能根據 BigLake Metastore 中儲存的中繼資料,建立及查看資源。

舉例來說,請嘗試在互動式 Flink SQL 工作階段中執行下列指令,建立 Iceberg 資料庫和資料表。

  1. 使用自訂 Iceberg 目錄:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME 替換為 Flink 目錄 ID。

  2. 建立資料庫,這會在 BigQuery 中建立資料集:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME 換成新資料庫的名稱。

  3. 使用您建立的資料庫:

    USE DATABASE_NAME;
  4. 建立 Iceberg 資料表。以下範例會建立銷售資料表:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME 替換為新資料表的名稱。

  5. 查看資料表中繼資料:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. 列出資料庫中的資料表:

    SHOW TABLES;

將資料擷取至資料表

在上一節中建立 Iceberg 資料表後,您可以使用 Flink DataGen 做為資料來源,將即時資料擷取到資料表中。以下步驟是這個工作流程的範例:

  1. 使用 DataGen 建立暫時表格:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    更改下列內容:

    • DATABASE_NAME:用於儲存暫時性資料表的資料庫名稱。
    • TEMP_TABLE_NAME:臨時資料表的名稱。
    • ICEBERG_TABLE_NAME:您在上一節中建立的 Iceberg 資料表名稱。
  2. 將平行處理設為 1:

    SET 'parallelism.default' = '1';
  3. 設定檢查點間隔:

    SET 'execution.checkpointing.interval' = '10second';
  4. 設定查核點:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. 啟動即時串流工作:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    輸出結果會與下列內容相似:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. 如要查看串流工作狀態,請按照下列步驟操作:

    1. 前往 Trusted Cloud 控制台的「Clusters」(叢集) 頁面。

      前往「Clusters」(叢集)

    2. 選取您的叢集。

    3. 按一下「網頁介面」分頁標籤。

    4. 按一下「YARN ResourceManager」連結。

    5. YARN ResourceManager 介面中,找到 Flink 工作階段,然後按一下「Tracking UI」下方的「ApplicationMaster」連結。

    6. 在「狀態」欄中,確認工作狀態為「執行中」

  7. 在 Flink SQL 用戶端中查詢串流資料:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. 在 BigQuery 中查詢串流資料:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. 在 Flink SQL 用戶端終止串流工作:

    STOP JOB 'JOB_ID';

    JOB_ID 替換為您建立串流工作時,輸出內容中顯示的工作 ID。

使用 Serverless for Apache Spark 設定中繼存放區

您可以使用 Spark SQL 或 PySpark,透過 Serverless for Apache Spark 設定 BigLake metastore。

Spark SQL

  1. 建立 SQL 檔案,其中包含要在 BigLake metastore 中執行的 Spark SQL 指令。舉例來說,以下指令會建立命名空間和資料表:

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    更改下列內容:

    • CATALOG_NAME:參照 Spark 資料表的目錄名稱。
    • NAMESPACE_NAME:參照 Spark 資料表的命名空間名稱。
    • TABLE_NAME:Spark 資料表的資料表名稱。
    • WAREHOUSE_DIRECTORY:儲存資料倉儲的 Cloud Storage 資料夾 URI。
  2. 執行下列 gcloud dataproc batches submit spark-sql 指令,提交 Spark SQL 批次工作:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    更改下列內容:

    • SQL_SCRIPT_PATH:批次工作使用的 SQL 檔案路徑。
    • PROJECT_ID:要執行批次作業的 Trusted Cloud 專案 ID。
    • REGION:執行工作負載的區域。
    • SUBNET_NAME (選用):REGION 中符合工作階段子網路需求的虛擬私有雲子網路名稱。
    • BUCKET_PATH:用來上傳工作負載依附元件的 Cloud Storage 值區位置。WAREHOUSE_DIRECTORY位於這個值區中。 不需要提供 bucket 的 gs:// URI 前置字串。您可以指定 bucket 路徑或 bucket 名稱,例如 mybucketname1
    • LOCATION:執行批次作業的位置。

    如要進一步瞭解如何提交 Spark 批次工作,請參閱「執行 Spark 批次工作負載」。

PySpark

  1. 建立 Python 檔案,其中包含要在 BigLake 中繼資料存放區執行的 PySpark 指令。

    舉例來說,下列指令會設定 Spark 環境,與儲存在 BigLake 中繼存放區的 Iceberg 資料表互動。接著,指令會在該命名空間中建立新的命名空間和 Iceberg 資料表。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    更改下列內容:

    • PROJECT_ID:要執行批次作業的 Trusted Cloud 專案 ID。
    • LOCATION:BigQuery 資源所在的位置
    • CATALOG_NAME:參照 Spark 資料表的目錄名稱。
    • TABLE_NAME:Spark 資料表的資料表名稱。
    • WAREHOUSE_DIRECTORY:儲存資料倉儲的 Cloud Storage 資料夾 URI。
    • NAMESPACE_NAME:參照 Spark 資料表的命名空間名稱。
  2. 使用下列 gcloud dataproc batches submit pyspark 指令提交批次工作:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    更改下列內容:

    • PYTHON_SCRIPT_PATH:批次作業使用的 Python 指令碼路徑。
    • PROJECT_ID:要執行批次作業的 Trusted Cloud 專案 ID。
    • REGION:執行工作負載的區域。
    • BUCKET_PATH:用來上傳工作負載依附元件的 Cloud Storage 值區位置。不需要提供 bucket 的 gs:// URI 前置字串。您可以指定 bucket 路徑或 bucket 名稱,例如 mybucketname1

    如要進一步瞭解如何提交 PySpark 批次工作,請參閱 PySpark gcloud 參考資料

後續步驟