将 BigLake metastore 与 Dataproc 搭配使用

本文档介绍了如何将 BigQuery metastore 与 Compute Engine 上的 Dataproc 搭配使用。借助此连接,您可以获得一个能在 Apache Spark 或 Apache Flink 等开源软件引擎上运行的共享 metastore。

准备工作

  1. 为您的 Trusted Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  2. 启用 BigQuery API 和 Dataproc API。

    启用 API

  3. 可选:了解 BigQuery metastore 的工作原理以及为什么您应该使用它。

所需的角色

如需获得将 Spark 或 Flink 和 Dataproc 与 BigLake metastore 搭配使用作为元数据存储区所需的权限,请让您的管理员为您授予以下 IAM 角色:

  • 创建 Dataproc 集群:针对项目中 Compute Engine 默认服务账号的 Dataproc Worker (roles/dataproc.worker)
  • 在 Spark 或 Flink 中创建 BigLake metastore 表:
    • 针对项目中 Dataproc 虚拟机服务账号的 Dataproc Worker (roles/dataproc.worker)
    • 针对项目中 Dataproc 虚拟机服务账号的 BigQuery Data Editor (roles/bigquery.dataEditor)
    • 针对项目中 Dataproc 虚拟机服务账号的 Storage Object Admin (roles/storage.objectAdmin)
  • 在 BigQuery 中查询 BigLake metastore 表:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

常规工作流程

如需将 Compute Engine 上的 Dataproc 与 BigQuery metastore 搭配使用,请按照以下常规步骤操作:

  1. 创建 Dataproc 集群或配置现有集群。
  2. 连接到您偏好的开源软件引擎,例如 Spark 或 Flink。
  3. 使用 JAR 文件在集群上安装 Apache Iceberg 目录插件。
  4. 根据您使用的开源软件引擎,按需创建和管理 BigLake metastore 资源。
  5. 在 BigQuery 中,访问和使用 BigLake metastore 资源。

将 BigLake metastore 连接到 Spark

以下说明介绍了如何使用交互式 Spark SQL 将 Dataproc 连接到 BigLake metastore。

下载 Iceberg 目录插件

如需将 BigLake metastore 与 Dataproc 和 Spark 连接,您必须使用 BigLake metastore Iceberg 目录插件 jar 文件。

此文件默认包含在 Dataproc 映像版本 2.2 中。如果您的 Dataproc 集群无法直接访问互联网,您必须下载该插件并将其上传到您的 Dataproc 集群可以访问的 Cloud Storage 存储桶。

下载 BigLake metastore Iceberg 目录插件

配置 Dataproc 集群

在连接到 BigLake metastore 之前,您必须设置 Dataproc 集群。

为此,您可以创建新的集群或使用现有集群。之后,您可以使用此集群运行交互式 Spark SQL 并管理 BigLake metastore 资源。

  • 在创建集群的区域内,子网必须启用专用 Google 访问通道 (PGA)。默认情况下,使用版本 2.2(默认值)或更高版本的映像创建的 Dataproc 集群虚拟机仅使用内部 IP 地址。如需允许集群虚拟机与 Google API 通信,请在创建集群所在区域的 default(或用户指定的网络名称,如适用)网络子网上启用专用 Google 访问通道

  • 如果您想运行本指南中的 Zeppelin 网页界面示例,则必须使用或创建已启用 Zeppelin 可选组件的 Dataproc 集群。

新集群

如需创建新的 Dataproc 集群,请运行以下 gcloud dataproc clusters create 命令。此配置包含使用 BigLake metastore 所需的设置。

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

替换以下内容:

  • CLUSTER_NAME:Dataproc 集群的名称。
  • PROJECT_ID:您要在其中创建集群的 Trusted Cloud 项目的 ID。
  • LOCATION:您在其中创建集群的 Trusted Cloud 区域。

现有集群

如需配置现有集群,请将以下 Iceberg Spark 运行时添加到您的集群。

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

您可以使用以下其中一个选项来添加运行时:

提交 Spark 作业

如需提交 Spark 作业,请使用以下方法之一:

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

替换以下内容:

  • PROJECT_ID:包含 Dataproc 集群的 Trusted Cloud 项目的 ID。
  • CLUSTER_NAME:您用于运行 Spark SQL 作业的 Dataproc 集群的名称。
  • REGION:您的集群所在的 Compute Engine 区域
  • LOCATION:BigQuery 资源的位置。
  • CATALOG_NAME:您在 SQL 作业中使用的 Spark 目录的名称。
  • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹。此值以 gs:// 开头。
  • SPARK_SQL_COMMAND:您要运行的 Spark SQL 查询。此查询包含用于创建资源的命令。例如,创建命名空间和表。

互动式 Spark

连接到 Spark 并安装目录插件

如需为 BigLake metastore 安装目录插件,请使用 SSH 连接到 Dataproc 集群。

  1. 在 Trusted Cloud 控制台中,前往虚拟机实例页面。
  2. 如需连接到 Dataproc 虚拟机实例,请点击虚拟机实例列表中的 SSH。输出类似于以下内容:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. 在终端中,运行以下 BigLake metastore 初始化命令:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    替换以下内容:

    • CATALOG_NAME:您在 SQL 作业中使用的 Spark 目录的名称。
    • PROJECT_ID:Spark 目录关联的 BigLake metastore 目录的 Trusted Cloud 项目 ID。
    • LOCATION:BigLake metastore 的 Trusted Cloud 位置。
    • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹。此值以 gs:// 开头。

    成功连接到集群后,Spark 终端会显示 spark-sql 提示。

    spark-sql (default)>
    

管理 BigLake metastore 资源

您现在已连接到 BigLake metastore。您可以查看现有资源,也可以根据存储在 BigLake metastore 中的元数据创建新资源。

例如,尝试在交互式 Spark SQL 会话中运行以下命令,以创建 Iceberg 命名空间和表。

  • 使用自定义 Iceberg 目录:

    USE `CATALOG_NAME`;
  • 创建命名空间:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • 使用创建的命名空间:

    USE NAMESPACE_NAME;
  • 创建 Iceberg 表:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • 插入表格行:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • 添加表格列:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • 查看表元数据:

    DESCRIBE EXTENDED TABLE_NAME;
  • 列出命名空间中的表:

    SHOW TABLES;

Zeppelin 笔记本

  1. 在 Trusted Cloud 控制台中,前往 Dataproc 集群页面。

    转到 Dataproc 集群

  2. 点击您要使用的集群的名称。

    系统会打开集群详情页面。

  3. 在导航菜单中,点击网页界面

  4. 组件网关下,点击 Zeppelin。系统会打开 Zeppelin 笔记本页面。

  5. 在导航菜单中,点击笔记本,然后点击+创建新笔记本

  6. 在对话框中,输入笔记本名称。将 Spark 保留为默认解释器。

  7. 点击创建。系统会创建一个新笔记本。

  8. 在笔记本中,点击设置菜单,然后点击解释器

  9. 搜索解释器字段中,搜索 Spark

  10. 点击修改

  11. Spark.jars 字段中,输入 Spark JAR 的 URI。

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. 点击保存

  13. 点击确定

  14. 将以下 PySpark 代码复制到 Zeppelin 笔记本中。

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    替换以下内容:

    • CATALOG_NAME:要为 SQL 作业使用的 Spark 目录的名称。
    • PROJECT_ID:包含 Dataproc 集群的 Trusted Cloud 项目的 ID。
    • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹。此值以 gs:// 开头。
    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
    • TABLE_NAME:Spark 表的表名。
  15. 点击运行图标或按 Shift-Enter 以运行代码。作业完成后,状态消息会显示“Spark Job Finished”,输出内容会显示表格内容:

以下说明介绍了如何使用 Flink SQL 客户端将 Dataproc 连接到 BigLake metastore。

如需将 BigLake metastore 连接到 Flink,请执行以下操作:

  1. 创建启用了可选 Flink 组件的 Dataproc 集群,并确保您使用的是 Dataproc 2.2 或更高版本。
  2. 在 Trusted Cloud 控制台中,前往虚拟机实例页面。

    转到虚拟机实例

  3. 在虚拟机实例列表中,点击 SSH 以连接到 Dataproc 虚拟机实例。

  4. 为 BigLake metastore 配置 Iceberg 自定义目录插件:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. 在 YARN 上启动 Flink 会话:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. 在 Flink 中创建目录:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    替换以下内容:

    • CATALOG_NAME:Flink 目录标识符,与 BigLake metastore 目录相关联。
    • WAREHOUSE_DIRECTORY:仓库目录(Flink 在其中创建文件的 Cloud Storage 文件夹)的基本路径。此值以 gs:// 开头。
    • PROJECT_ID:Flink 目录关联的 BigLake metastore 目录的项目 ID。
    • LOCATION:BigQuery 资源的位置

您的 Flink 会话现已连接到 BigLake metastore,您可以运行 Flink SQL 命令。

现在,您已连接到 BigLake metastore,可以根据存储在 BigLake metastore 中的元数据创建和查看资源。

例如,尝试在交互式 Flink SQL 会话中运行以下命令,以创建 Iceberg 数据库和表。

  1. 使用自定义 Iceberg 目录:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME 替换为 Flink 目录标识符。

  2. 创建一个数据库,以在 BigQuery 中创建一个数据集:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME 替换为新数据库的名称。

  3. 使用您创建的数据库:

    USE DATABASE_NAME;
  4. 创建 Iceberg 表。以下命令会创建一个示例销售表:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME 替换为新表的名称。

  5. 查看表元数据:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. 列出数据库中的表:

    SHOW TABLES;

将数据注入到表中

在上一部分中创建 Iceberg 表后,您可以使用 Flink DataGen 作为数据源,将实时数据注入到表中。以下步骤是此工作流的一个示例:

  1. 使用 DataGen 创建临时表:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    替换以下内容:

    • DATABASE_NAME:用于存储临时表的数据库的名称。
    • TEMP_TABLE_NAME:临时表的名称。
    • ICEBERG_TABLE_NAME:您在上一部分中创建的 Iceberg 表的名称。
  2. 将并行性设置为 1:

    SET 'parallelism.default' = '1';
  3. 设置检查点间隔:

    SET 'execution.checkpointing.interval' = '10second';
  4. 设置检查点:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. 启动实时流式作业:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    输出类似于以下内容:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. 如需检查流式作业的状态,请执行以下操作:

    1. 在 Trusted Cloud 控制台中,前往集群页面。

      转到集群

    2. 选择您的集群。

    3. 点击网页界面标签页。

    4. 点击 YARN ResourceManager 链接。

    5. YARN ResourceManager 界面中,找到您的 Flink 会话,然后点击跟踪界面下的 ApplicationMaster 链接。

    6. 状态列中,确认作业状态为正在运行

  7. 在 Flink SQL 客户端中查询流式数据:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. 在 BigQuery 中查询流式数据:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. 在 Flink SQL 客户端中终止流作业:

    STOP JOB 'JOB_ID';

    JOB_ID 替换为创建流式作业时输出中显示的作业 ID。

后续步骤