搭配 Spark 預存程序使用 BigLake 元資料庫
本文說明如何搭配使用 Apache Spark 預存程序和 BigLake 元資料庫。
事前準備
- 為 Trusted Cloud 專案啟用計費功能。瞭解如何檢查專案是否已啟用計費功能。
啟用 BigQuery 和 Dataflow API。
選用:進一步瞭解下列主題:
- 瞭解 BigLake 元資料庫的運作方式,以及為何要使用這項功能。
- 瞭解 BigQuery Spark 預存程序的運作方式,並在開始執行工作前完成相關作業。
必要的角色
如要使用 Spark 預存程序,請查看預存程序的必要角色,並授予必要角色。
如要取得使用 Spark 和儲存程序的權限,並將 BigLake 中繼資料庫做為中繼資料儲存庫,請要求管理員授予您下列 IAM 角色:
-
在 Spark 中建立 BigLake 中繼資料表:
-
專案中 Spark Connection 服務帳戶的 BigQuery 資料編輯者 (
roles/bigquery.dataEditor
) -
專案中 Spark Connection 服務帳戶的 Storage 物件管理員 (
roles/storage.objectAdmin
)
-
專案中 Spark Connection 服務帳戶的 BigQuery 資料編輯者 (
-
在 BigQuery 中查詢 BigLake 元資料表:
-
專案中的 BigQuery 資料檢視者 (
roles/bigquery.dataViewer
) -
專案中的 BigQuery 使用者 (
roles/bigquery.user
) -
專案的 Storage 物件檢視器 (
roles/storage.objectViewer
)
-
專案中的 BigQuery 資料檢視者 (
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。
建立及執行預存程序
以下範例說明如何使用 BigLake 元資料庫建立及執行儲存程序。
前往「BigQuery」頁面
在查詢編輯器中,為
CREATE PROCEDURE
陳述式新增以下範例程式碼。CREATE OR REPLACE PROCEDURE `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`() WITH CONNECTION `PROJECT_ID.REGION.SPARK_CONNECTION_ID` OPTIONS (engine='SPARK', runtime_version='1.1', properties=[("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY"), ("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION"), ("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID"), ("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog"), ("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"), ("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1")], jar_uris=["gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar"]) LANGUAGE python AS R""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("BigLake Metastore Iceberg") \ .getOrCreate() spark.sql("USE CATALOG_NAME;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY'") spark.sql("DESCRIBE TABLE_NAME;") spark.sql("INSERT INTO TABLE_NAME VALUES (1, \"first row\");") spark.sql("SELECT * from TABLE_NAME;") spark.sql("ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);") spark.sql("DESCRIBE TABLE_NAME;") """; CALL `PROJECT_ID.BQ_DATASET_ID.PROCEDURE_NAME`();
更改下列內容:
PROJECT_ID
: Trusted Cloud 專案的 ID。BQ_DATASET_ID
:BigQuery 中包含程序的資料集 ID。PROCEDURE_NAME
:您要建立或取代的程序名稱。REGION
:Spark 連線的位置。LOCATION
:BigQuery 資源的位置。SPARK_CONNECTION_ID
:Spark 連線的 ID。CATALOG_NAME
:您使用的目錄名稱。WAREHOUSE_DIRECTORY
:包含資料倉儲的 Cloud Storage 資料夾 URI。NAMESPACE_NAME
:您使用的命名空間。