配置 BigLake metastore

本文档介绍了如何将 BigLake metastore 与 DataprocTrusted Cloud by S3NS Serverless for Apache Spark 搭配使用,以创建单个共享 metastore,该 metastore 可在 Apache Spark 或 Apache Flink 等开源引擎上运行。

准备工作

  1. 为您的 Trusted Cloud 项目启用结算功能。了解如何检查项目是否已启用结算功能
  2. 启用 BigQuery API 和 Dataproc API。

    启用 API

  3. 可选:了解 BigQuery metastore 的工作原理以及为什么您应该使用它。

所需的角色

如需获得配置 BigLake metastore 所需的权限,请让管理员为您授予以下 IAM 角色:

  • 创建 Dataproc 集群:针对项目中 Compute Engine 默认服务账号的 Dataproc Worker (roles/dataproc.worker)
  • 创建 BigLake metastore 表:
    • 针对项目中 Dataproc 虚拟机服务账号的 Dataproc Worker (roles/dataproc.worker)
    • 针对项目中 Dataproc 虚拟机服务账号的 BigQuery Data Editor (roles/bigquery.dataEditor)
    • 针对项目中 Dataproc 虚拟机服务账号的 Storage Object Admin (roles/storage.objectAdmin)
  • 查询 BigLake metastore 表:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

使用 Dataproc 配置 metastore

您可以使用 Spark 或 Flink 将 BigLake metastore 配置为与 Dataproc 搭配使用:

Spark

  1. 配置新集群。如需创建新的 Dataproc 集群,请运行以下 gcloud dataproc clusters create 命令,其中包含使用 BigLake metastore 所需的设置:

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    替换以下内容:

    • CLUSTER_NAME:Dataproc 集群的名称。
    • PROJECT_ID:您要在其中创建集群的 Trusted Cloud 项目的 ID。
    • LOCATION:您要创建集群的 Compute Engine 区域。
  2. 使用以下方法之一提交 Spark 作业:

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    替换以下内容:

    • PROJECT_ID:包含 Dataproc 集群的 Trusted Cloud 项目的 ID。
    • CLUSTER_NAME:您用于运行 Spark SQL 作业的 Dataproc 集群的名称。
    • REGION:您的集群所在的 Compute Engine 区域
    • LOCATION:BigQuery 资源的位置。
    • CATALOG_NAME:要为 SQL 作业使用的 Spark 目录的名称。
    • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹。此值以 gs:// 开头。
    • SPARK_SQL_COMMAND:您要运行的 Spark SQL 查询。此查询包含用于创建资源的命令。例如,创建命名空间和表。

    spark-sql CLI

    1. 在 Trusted Cloud 控制台中,前往虚拟机实例页面。

      转到“虚拟机实例”

    2. 如需连接到 Dataproc 虚拟机实例,请点击列出 Dataproc 集群主虚拟机实例名称(即集群名称后跟 -m 后缀)的行中的 SSH。输出类似于以下内容:

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. 在终端中,运行以下 BigLake metastore 初始化命令:

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      替换以下内容:

      • CATALOG_NAME:您在 SQL 作业中使用的 Spark 目录的名称。
      • PROJECT_ID:Spark 目录关联的 BigLake metastore 目录的 Trusted Cloud 项目 ID。
      • LOCATION:BigLake metastore 的 Trusted Cloud 位置。
      • WAREHOUSE_DIRECTORY:包含数据仓库的 Cloud Storage 文件夹。此值以 gs:// 开头。

      成功连接到集群后,Spark 终端会显示 spark-sql 提示,您可以使用该提示提交 Spark 作业。

      spark-sql (default)>
      
  1. 创建启用了可选 Flink 组件的 Dataproc 集群,并确保您使用的是 Dataproc 2.2 或更高版本。
  2. 在 Trusted Cloud 控制台中,转到虚拟机实例页面。

    转到虚拟机实例

  3. 在虚拟机实例列表中,点击 SSH 以连接到主 Dataproc 集群虚拟机实例,该实例的名称为集群名称后跟 -m 后缀。

  4. 为 BigLake metastore 配置 Iceberg 自定义目录插件:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. 在 YARN 上启动 Flink 会话:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. 在 Flink 中创建目录:

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    替换以下内容:

    • CATALOG_NAME:Flink 目录标识符,与 BigLake metastore 目录相关联。
    • WAREHOUSE_DIRECTORY:仓库目录(Flink 在其中创建文件的 Cloud Storage 文件夹)的基本路径。此值以 gs:// 开头。
    • PROJECT_ID:Flink 目录关联的 BigLake metastore 目录的项目 ID。
    • LOCATION:BigQuery 资源的位置

您的 Flink 会话现已连接到 BigLake metastore,您可以运行 Flink SQL 命令。

现在,您已连接到 BigLake metastore,可以根据存储在 BigLake metastore 中的元数据创建和查看资源。

例如,尝试在交互式 Flink SQL 会话中运行以下命令,以创建 Iceberg 数据库和表。

  1. 使用自定义 Iceberg 目录:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME 替换为 Flink 目录标识符。

  2. 创建一个数据库,以在 BigQuery 中创建一个数据集:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME 替换为新数据库的名称。

  3. 使用您创建的数据库:

    USE DATABASE_NAME;
  4. 创建 Iceberg 表。以下命令会创建一个示例销售表:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME 替换为新表的名称。

  5. 查看表元数据:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. 列出数据库中的表:

    SHOW TABLES;

将数据提取到表中

在上一部分中创建 Iceberg 表后,您可以使用 Flink DataGen 作为数据源,将实时数据注入到表中。以下步骤是此工作流的一个示例:

  1. 使用 DataGen 创建临时表:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    替换以下内容:

    • DATABASE_NAME:用于存储临时表的数据库的名称。
    • TEMP_TABLE_NAME:临时表的名称。
    • ICEBERG_TABLE_NAME:您在上一部分中创建的 Iceberg 表的名称。
  2. 将并行性设置为 1:

    SET 'parallelism.default' = '1';
  3. 设置检查点间隔:

    SET 'execution.checkpointing.interval' = '10second';
  4. 设置检查点:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. 启动实时流式作业:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    输出类似于以下内容:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. 如需检查流式作业的状态,请执行以下操作:

    1. 在 Trusted Cloud 控制台中,前往集群页面。

      转到集群

    2. 选择您的集群。

    3. 点击网页界面标签页。

    4. 点击 YARN ResourceManager 链接。

    5. YARN ResourceManager 界面中,找到您的 Flink 会话,然后点击跟踪界面下的 ApplicationMaster 链接。

    6. 状态列中,确认作业状态为正在运行

  7. 在 Flink SQL 客户端中查询流式数据:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. 在 BigQuery 中查询流式数据:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. 在 Flink SQL 客户端中终止流式作业:

    STOP JOB 'JOB_ID';

    JOB_ID 替换为创建流式作业时输出中显示的作业 ID。

使用 Serverless for Apache Spark 配置 metastore

您可以使用 Spark SQL 或 PySpark 将 BigLake metastore 配置为与 Serverless for Apache Spark 搭配使用。

Spark SQL

  1. 创建一个 SQL 文件,其中包含要在 BigLake metastore 中运行的 Spark SQL 命令。例如,以下命令会创建一个命名空间和一个表:

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    替换以下内容:

    • CATALOG_NAME:引用 Spark 表的目录名称。
    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
    • TABLE_NAME:Spark 表的表名。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
  2. 运行以下 gcloud dataproc batches submit spark-sql 命令,提交 Spark SQL 批处理作业:

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    替换以下内容:

    • SQL_SCRIPT_PATH:批量作业使用的 SQL 文件的路径。
    • PROJECT_ID:要运行批量作业的 Trusted Cloud 项目的 ID。
    • REGION:工作负载运行所在的区域。
    • SUBNET_NAME(可选):REGION 中满足会话子网要求的 VPC 子网的名称。
    • BUCKET_PATH:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。WAREHOUSE_DIRECTORY 位于此存储桶中。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1
    • LOCATION:运行批量作业的位置。

    如需详细了解如何提交 Spark 批处理作业,请参阅运行 Spark 批处理工作负载

PySpark

  1. 创建一个 Python 文件,其中包含要在 BigLake metastore 中运行的 PySpark 命令。

    例如,以下命令设置了一个 Spark 环境,用于与存储在 BigLake metastore 中的 Iceberg 表进行交互。然后,该命令会在该命名空间内创建一个新的命名空间和一个 Iceberg 表。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    替换以下内容:

    • PROJECT_ID:要运行批量作业的 Trusted Cloud 项目的 ID。
    • LOCATION:BigQuery 资源所在的位置
    • CATALOG_NAME:引用 Spark 表的目录名称。
    • TABLE_NAME:Spark 表的表名。
    • WAREHOUSE_DIRECTORY:存储数据仓库的 Cloud Storage 文件夹的 URI。
    • NAMESPACE_NAME:引用 Spark 表的命名空间名称。
  2. 使用以下 gcloud dataproc batches submit pyspark 命令提交批处理作业:

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    替换以下内容:

    • PYTHON_SCRIPT_PATH:批量作业使用的Python 脚本的路径。
    • PROJECT_ID:要运行批量作业的 Trusted Cloud 项目的 ID。
    • REGION:工作负载运行所在的区域。
    • BUCKET_PATH:用于上传工作负载依赖项的 Cloud Storage 存储桶的位置。存储桶的 gs:// URI 前缀不是必需的。您可以指定存储桶路径或存储桶名称,例如 mybucketname1

    如需详细了解如何提交 PySpark 批处理作业,请参阅 PySpark gcloud 参考文档

后续步骤