配置 BigLake metastore
本文档介绍了如何将 BigLake metastore 与 Dataproc 或 Trusted Cloud by S3NS Serverless for Apache Spark 搭配使用,以创建单个共享 metastore,该 metastore 可在 Apache Spark 或 Apache Flink 等开源引擎上运行。
准备工作
- 为您的 Trusted Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能。
启用 BigQuery API 和 Dataproc API。
可选:了解 BigQuery metastore 的工作原理以及为什么您应该使用它。
所需的角色
如需获得配置 BigLake metastore 所需的权限,请让管理员为您授予以下 IAM 角色:
-
创建 Dataproc 集群:针对项目中 Compute Engine 默认服务账号的 Dataproc Worker (
roles/dataproc.worker
) -
创建 BigLake metastore 表:
-
针对项目中 Dataproc 虚拟机服务账号的 Dataproc Worker (
roles/dataproc.worker
) -
针对项目中 Dataproc 虚拟机服务账号的 BigQuery Data Editor (
roles/bigquery.dataEditor
) -
针对项目中 Dataproc 虚拟机服务账号的 Storage Object Admin (
roles/storage.objectAdmin
)
-
针对项目中 Dataproc 虚拟机服务账号的 Dataproc Worker (
-
查询 BigLake metastore 表:
-
项目的 BigQuery Data Viewer (
roles/bigquery.dataViewer
) -
项目的 BigQuery User (
roles/bigquery.user
) -
项目的 Storage Object Viewer (
roles/storage.objectViewer
)
-
项目的 BigQuery Data Viewer (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
使用 Dataproc 配置 metastore
您可以使用 Spark 或 Flink 将 BigLake metastore 配置为与 Dataproc 搭配使用:
Spark
配置新集群。如需创建新的 Dataproc 集群,请运行以下
gcloud dataproc clusters create
命令,其中包含使用 BigLake metastore 所需的设置:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
替换以下内容:
CLUSTER_NAME
:Dataproc 集群的名称。PROJECT_ID
:您要在其中创建集群的 Trusted Cloud 项目的 ID。LOCATION
:您要创建集群的 Compute Engine 区域。
使用以下方法之一提交 Spark 作业:
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
替换以下内容:
PROJECT_ID
:包含 Dataproc 集群的 Trusted Cloud 项目的 ID。CLUSTER_NAME
:您用于运行 Spark SQL 作业的 Dataproc 集群的名称。REGION
:您的集群所在的 Compute Engine 区域。LOCATION
:BigQuery 资源的位置。CATALOG_NAME
:要为 SQL 作业使用的 Spark 目录的名称。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹。此值以gs://
开头。SPARK_SQL_COMMAND
:您要运行的 Spark SQL 查询。此查询包含用于创建资源的命令。例如,创建命名空间和表。
spark-sql CLI
在 Trusted Cloud 控制台中,前往虚拟机实例页面。
如需连接到 Dataproc 虚拟机实例,请点击列出 Dataproc 集群主虚拟机实例名称(即集群名称后跟
-m
后缀)的行中的 SSH。输出类似于以下内容:Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
在终端中,运行以下 BigLake metastore 初始化命令:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
替换以下内容:
CATALOG_NAME
:您在 SQL 作业中使用的 Spark 目录的名称。PROJECT_ID
:Spark 目录关联的 BigLake metastore 目录的 Trusted Cloud 项目 ID。LOCATION
:BigLake metastore 的 Trusted Cloud 位置。WAREHOUSE_DIRECTORY
:包含数据仓库的 Cloud Storage 文件夹。此值以gs://
开头。
成功连接到集群后,Spark 终端会显示
spark-sql
提示,您可以使用该提示提交 Spark 作业。spark-sql (default)>
Flink
- 创建启用了可选 Flink 组件的 Dataproc 集群,并确保您使用的是 Dataproc
2.2
或更高版本。 在 Trusted Cloud 控制台中,转到虚拟机实例页面。
在虚拟机实例列表中,点击 SSH 以连接到主 Dataproc 集群虚拟机实例,该实例的名称为集群名称后跟
-m
后缀。为 BigLake metastore 配置 Iceberg 自定义目录插件:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
在 YARN 上启动 Flink 会话:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
在 Flink 中创建目录:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
替换以下内容:
CATALOG_NAME
:Flink 目录标识符,与 BigLake metastore 目录相关联。WAREHOUSE_DIRECTORY
:仓库目录(Flink 在其中创建文件的 Cloud Storage 文件夹)的基本路径。此值以gs://
开头。PROJECT_ID
:Flink 目录关联的 BigLake metastore 目录的项目 ID。LOCATION
:BigQuery 资源的位置。
您的 Flink 会话现已连接到 BigLake metastore,您可以运行 Flink SQL 命令。
管理 BigLake metastore 资源
现在,您已连接到 BigLake metastore,可以根据存储在 BigLake metastore 中的元数据创建和查看资源。
例如,尝试在交互式 Flink SQL 会话中运行以下命令,以创建 Iceberg 数据库和表。
使用自定义 Iceberg 目录:
USE CATALOG CATALOG_NAME;
将
CATALOG_NAME
替换为 Flink 目录标识符。创建一个数据库,以在 BigQuery 中创建一个数据集:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
将
DATABASE_NAME
替换为新数据库的名称。使用您创建的数据库:
USE DATABASE_NAME;
创建 Iceberg 表。以下命令会创建一个示例销售表:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
将
ICEBERG_TABLE_NAME
替换为新表的名称。查看表元数据:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
列出数据库中的表:
SHOW TABLES;
将数据提取到表中
在上一部分中创建 Iceberg 表后,您可以使用 Flink DataGen 作为数据源,将实时数据注入到表中。以下步骤是此工作流的一个示例:
使用 DataGen 创建临时表:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
替换以下内容:
DATABASE_NAME
:用于存储临时表的数据库的名称。TEMP_TABLE_NAME
:临时表的名称。ICEBERG_TABLE_NAME
:您在上一部分中创建的 Iceberg 表的名称。
将并行性设置为 1:
SET 'parallelism.default' = '1';
设置检查点间隔:
SET 'execution.checkpointing.interval' = '10second';
设置检查点:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
启动实时流式作业:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
输出类似于以下内容:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
如需检查流式作业的状态,请执行以下操作:
在 Trusted Cloud 控制台中,前往集群页面。
选择您的集群。
点击网页界面标签页。
点击 YARN ResourceManager 链接。
在 YARN ResourceManager 界面中,找到您的 Flink 会话,然后点击跟踪界面下的 ApplicationMaster 链接。
在状态列中,确认作业状态为正在运行。
在 Flink SQL 客户端中查询流式数据:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
在 BigQuery 中查询流式数据:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
在 Flink SQL 客户端中终止流式作业:
STOP JOB 'JOB_ID';
将
JOB_ID
替换为创建流式作业时输出中显示的作业 ID。
使用 Serverless for Apache Spark 配置 metastore
您可以使用 Spark SQL 或 PySpark 将 BigLake metastore 配置为与 Serverless for Apache Spark 搭配使用。
Spark SQL
创建一个 SQL 文件,其中包含要在 BigLake metastore 中运行的 Spark SQL 命令。例如,以下命令会创建一个命名空间和一个表:
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
替换以下内容:
CATALOG_NAME
:引用 Spark 表的目录名称。NAMESPACE_NAME
:引用 Spark 表的命名空间名称。TABLE_NAME
:Spark 表的表名。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。
运行以下
gcloud dataproc batches submit spark-sql
命令,提交 Spark SQL 批处理作业:gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
替换以下内容:
SQL_SCRIPT_PATH
:批量作业使用的 SQL 文件的路径。PROJECT_ID
:要运行批量作业的 Trusted Cloud 项目的 ID。REGION
:工作负载运行所在的区域。SUBNET_NAME
(可选):REGION
中满足会话子网要求的 VPC 子网的名称。BUCKET_PATH
:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。WAREHOUSE_DIRECTORY
位于此存储桶中。存储桶的gs://
URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如mybucketname1
。LOCATION
:运行批量作业的位置。
如需详细了解如何提交 Spark 批处理作业,请参阅运行 Spark 批处理工作负载。
PySpark
创建一个 Python 文件,其中包含要在 BigLake metastore 中运行的 PySpark 命令。
例如,以下命令设置了一个 Spark 环境,用于与存储在 BigLake metastore 中的 Iceberg 表进行交互。然后,该命令会在该命名空间内创建一个新的命名空间和一个 Iceberg 表。
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
替换以下内容:
PROJECT_ID
:要运行批量作业的 Trusted Cloud 项目的 ID。LOCATION
:BigQuery 资源所在的位置。CATALOG_NAME
:引用 Spark 表的目录名称。TABLE_NAME
:Spark 表的表名。WAREHOUSE_DIRECTORY
:存储数据仓库的 Cloud Storage 文件夹的 URI。NAMESPACE_NAME
:引用 Spark 表的命名空间名称。
使用以下
gcloud dataproc batches submit pyspark
命令提交批处理作业:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
替换以下内容:
PYTHON_SCRIPT_PATH
:批量作业使用的Python 脚本的路径。PROJECT_ID
:要运行批量作业的 Trusted Cloud 项目的 ID。REGION
:工作负载运行所在的区域。BUCKET_PATH
:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的gs://
URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如mybucketname1
。
如需详细了解如何提交 PySpark 批处理作业,请参阅 PySpark gcloud 参考文档。