設定 BigLake metastore
本文說明如何使用 Dataproc 或 Trusted Cloud by S3NS Serverless for Apache Spark 設定 BigLake metastore,建立可供 Apache Spark 或 Apache Flink 等開放原始碼引擎使用的單一共用 metastore。
事前準備
- 為 Trusted Cloud 專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
啟用 BigQuery 和 Dataproc API。
選用:瞭解 BigLake 中繼資料存放區的運作方式,以及使用該存放區的原因。
必要的角色
如要取得設定 BigLake Metastore 所需的權限,請要求管理員授予下列 IAM 角色:
-
建立 Dataproc 叢集:
專案中 Compute Engine 預設服務帳戶的 Dataproc 工作站 (
roles/dataproc.worker
) -
建立 BigLake metastore 資料表:
-
Dataproc 工作站 (
roles/dataproc.worker
) 專案中 Dataproc VM 服務帳戶的存取權 -
專案中 Dataproc VM 服務帳戶的 BigQuery 資料編輯者 (
roles/bigquery.dataEditor
) -
專案中 Dataproc VM 服務帳戶的儲存空間物件管理員 (
roles/storage.objectAdmin
)
-
Dataproc 工作站 (
-
查詢 BigLake Metastore 資料表:
-
專案的 BigQuery 資料檢視者 (
roles/bigquery.dataViewer
) -
專案的「BigQuery 使用者」 (
roles/bigquery.user
) 角色 -
專案的Storage 物件檢視者 (
roles/storage.objectViewer
)
-
專案的 BigQuery 資料檢視者 (
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
使用 Dataproc 設定中繼存放區
您可以使用 Spark 或 Flink,透過 Dataproc 設定 BigLake metastore:
Spark
設定新叢集。如要建立新的 Dataproc 叢集,請執行下列
gcloud dataproc clusters create
指令,其中包含使用 BigLake Metastore 的必要設定:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
更改下列內容:
CLUSTER_NAME
:Dataproc 叢集的名稱。PROJECT_ID
:您要建立叢集的 Trusted Cloud 專案 ID。LOCATION
:您要建立叢集的 Compute Engine 區域。
使用下列任一方法提交 Spark 工作:
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
更改下列內容:
PROJECT_ID
:包含 Dataproc 叢集的 Trusted Cloud 專案 ID。CLUSTER_NAME
:用於執行 Spark SQL 工作的 Dataproc 叢集名稱。REGION
:叢集所在的 Compute Engine 區域。LOCATION
:BigQuery 資源的位置。CATALOG_NAME
:要用於 SQL 作業的 Spark 目錄名稱。WAREHOUSE_DIRECTORY
:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為gs://
。SPARK_SQL_COMMAND
:要執行的 Spark SQL 查詢。這項查詢包含建立資源的指令。舉例來說,如要建立命名空間和資料表。
spark-sql CLI
前往 Trusted Cloud 控制台的「VM Instances」(VM 執行個體) 頁面。
如要連線至 Dataproc VM 執行個體,請在列出 Dataproc 叢集主要 VM 執行個體名稱的資料列中,按一下「SSH」SSH。主要 VM 執行個體名稱是叢集名稱,後面加上
-m
尾碼。輸出結果會與下列內容相似:Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
在終端機中執行下列 BigLake 中繼存放區初始化指令:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
更改下列內容:
CATALOG_NAME
:您在 SQL 工作中使用的 Spark 目錄名稱。PROJECT_ID
:BigLake metastore 目錄的 Trusted Cloud 專案 ID,Spark 目錄會連結至該目錄。LOCATION
:BigLake Metastore 的位置。 Trusted CloudWAREHOUSE_DIRECTORY
:包含資料倉儲的 Cloud Storage 資料夾。這個值開頭為gs://
。
成功連線至叢集後,Spark 終端機會顯示
spark-sql
提示,您可以使用該提示提交 Spark 工作。spark-sql (default)>
Flink
- 建立已啟用選用 Flink 元件的 Dataproc 叢集,並確認您使用的是 Dataproc
2.2
以上版本。 前往 Trusted Cloud 控制台的「VM instances」(VM 執行個體) 頁面。
在虛擬機器執行個體清單中,按一下「SSH」SSH,連線至主要的 Dataproc 叢集 VM 執行個體,該執行個體會列為叢集名稱,後方加上
-m
後置字串。為 BigLake Metastore 設定 Iceberg 自訂目錄外掛程式:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
在 YARN 上啟動 Flink 工作階段:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
在 Flink 中建立目錄:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
更改下列內容:
CATALOG_NAME
:Flink 目錄 ID,連結至 BigLake Metastore 目錄。WAREHOUSE_DIRECTORY
:倉庫目錄的基本路徑 (Flink 建立檔案的 Cloud Storage 資料夾)。這個值開頭為gs://
。PROJECT_ID
:Flink 目錄連結的 BigLake metastore 目錄專案 ID。LOCATION
:BigQuery 資源的位置。
Flink 工作階段現已連線至 BigLake 中繼資料存放區,您可以執行 Flink SQL 指令。
管理 BigLake metastore 資源
連線至 BigLake Metastore 後,您就能根據 BigLake Metastore 中儲存的中繼資料,建立及查看資源。
舉例來說,請嘗試在互動式 Flink SQL 工作階段中執行下列指令,建立 Iceberg 資料庫和資料表。
使用自訂 Iceberg 目錄:
USE CATALOG CATALOG_NAME;
將
CATALOG_NAME
替換為 Flink 目錄 ID。建立資料庫,這會在 BigQuery 中建立資料集:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
將
DATABASE_NAME
換成新資料庫的名稱。使用您建立的資料庫:
USE DATABASE_NAME;
建立 Iceberg 資料表。以下範例會建立銷售資料表:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
將
ICEBERG_TABLE_NAME
替換為新資料表的名稱。查看資料表中繼資料:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
列出資料庫中的資料表:
SHOW TABLES;
將資料擷取至資料表
在上一節中建立 Iceberg 資料表後,您可以使用 Flink DataGen 做為資料來源,將即時資料擷取到資料表中。以下步驟是這個工作流程的範例:
使用 DataGen 建立暫時表格:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
更改下列內容:
DATABASE_NAME
:用於儲存暫時性資料表的資料庫名稱。TEMP_TABLE_NAME
:臨時資料表的名稱。ICEBERG_TABLE_NAME
:您在上一節中建立的 Iceberg 資料表名稱。
將平行處理設為 1:
SET 'parallelism.default' = '1';
設定檢查點間隔:
SET 'execution.checkpointing.interval' = '10second';
設定查核點:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
啟動即時串流工作:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
輸出結果會與下列內容相似:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
如要查看串流工作狀態,請按照下列步驟操作:
前往 Trusted Cloud 控制台的「Clusters」(叢集) 頁面。
選取您的叢集。
按一下「網頁介面」分頁標籤。
按一下「YARN ResourceManager」連結。
在 YARN ResourceManager 介面中,找到 Flink 工作階段,然後按一下「Tracking UI」下方的「ApplicationMaster」連結。
在「狀態」欄中,確認工作狀態為「執行中」。
在 Flink SQL 用戶端中查詢串流資料:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
在 BigQuery 中查詢串流資料:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
在 Flink SQL 用戶端終止串流工作:
STOP JOB 'JOB_ID';
將
JOB_ID
替換為您建立串流工作時,輸出內容中顯示的工作 ID。
使用 Serverless for Apache Spark 設定中繼存放區
您可以使用 Spark SQL 或 PySpark,透過 Serverless for Apache Spark 設定 BigLake metastore。
Spark SQL
建立 SQL 檔案,其中包含要在 BigLake metastore 中執行的 Spark SQL 指令。舉例來說,以下指令會建立命名空間和資料表:
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
更改下列內容:
CATALOG_NAME
:參照 Spark 資料表的目錄名稱。NAMESPACE_NAME
:參照 Spark 資料表的命名空間名稱。TABLE_NAME
:Spark 資料表的資料表名稱。WAREHOUSE_DIRECTORY
:儲存資料倉儲的 Cloud Storage 資料夾 URI。
執行下列
gcloud dataproc batches submit spark-sql
指令,提交 Spark SQL 批次工作:gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
更改下列內容:
SQL_SCRIPT_PATH
:批次工作使用的 SQL 檔案路徑。PROJECT_ID
:要執行批次作業的 Trusted Cloud 專案 ID。REGION
:執行工作負載的區域。SUBNET_NAME
(選用):REGION
中符合工作階段子網路需求的虛擬私有雲子網路名稱。BUCKET_PATH
:用來上傳工作負載依附元件的 Cloud Storage 值區位置。WAREHOUSE_DIRECTORY
位於這個值區中。 不需要提供 bucket 的gs://
URI 前置字串。您可以指定 bucket 路徑或 bucket 名稱,例如mybucketname1
。LOCATION
:執行批次作業的位置。
如要進一步瞭解如何提交 Spark 批次工作,請參閱「執行 Spark 批次工作負載」。
PySpark
建立 Python 檔案,其中包含要在 BigLake 中繼資料存放區執行的 PySpark 指令。
舉例來說,下列指令會設定 Spark 環境,與儲存在 BigLake 中繼存放區的 Iceberg 資料表互動。接著,指令會在該命名空間中建立新的命名空間和 Iceberg 資料表。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
更改下列內容:
PROJECT_ID
:要執行批次作業的 Trusted Cloud 專案 ID。LOCATION
:BigQuery 資源所在的位置。CATALOG_NAME
:參照 Spark 資料表的目錄名稱。TABLE_NAME
:Spark 資料表的資料表名稱。WAREHOUSE_DIRECTORY
:儲存資料倉儲的 Cloud Storage 資料夾 URI。NAMESPACE_NAME
:參照 Spark 資料表的命名空間名稱。
使用下列
gcloud dataproc batches submit pyspark
指令提交批次工作:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
更改下列內容:
PYTHON_SCRIPT_PATH
:批次作業使用的 Python 指令碼路徑。PROJECT_ID
:要執行批次作業的 Trusted Cloud 專案 ID。REGION
:執行工作負載的區域。BUCKET_PATH
:用來上傳工作負載依附元件的 Cloud Storage 值區位置。不需要提供 bucket 的gs://
URI 前置字串。您可以指定 bucket 路徑或 bucket 名稱,例如mybucketname1
。
如要進一步瞭解如何提交 PySpark 批次工作,請參閱 PySpark gcloud 參考資料。