BigLake metastore を構成する

このドキュメントでは、Dataproc または Trusted Cloud by S3NS Serverless for Apache Spark を使用して BigLake metastore を構成し、Apache Spark や Apache Flink などのオープンソース エンジン間で機能する単一の共有 metastore を作成する方法について説明します。

始める前に

  1. Trusted Cloud プロジェクトに対する課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  2. BigQuery API と Dataproc API を有効にします。

    API を有効にする

  3. 省略可: BigLake metastore の仕組みと使用すべき理由を理解します。

必要なロール

BigLake メタストアの構成に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

  • Dataproc クラスタを作成する。プロジェクトの Compute Engine デフォルトのサービス アカウントに対する Dataproc ワーカーroles/dataproc.worker
  • BigLake metastore テーブルを作成する。
  • BigLake metastore テーブルに対してクエリを実行する。

ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Dataproc でメタストアを構成する

Spark または Flink を使用して、Dataproc で BigLake metastore を構成できます。

Spark

  1. 新しいクラスタを構成します。新しい Dataproc クラスタを作成するには、次の gcloud dataproc clusters create コマンドを実行します。このコマンドには、BigLake metastore の使用に必要な設定が含まれています。

    gcloud dataproc clusters create CLUSTER_NAME \
        --project=PROJECT_ID \
        --region=LOCATION \
        --single-node

    次のように置き換えます。

    • CLUSTER_NAME: Dataproc クラスタの名前。
    • PROJECT_ID: クラスタを作成する Trusted Cloud プロジェクトの ID。
    • LOCATION: クラスタを作成する Compute Engine リージョン。
  2. 次のいずれかの方法で Spark ジョブを送信します。

    Google Cloud CLI

    gcloud dataproc jobs submit spark-sql \
        --project=PROJECT_ID \
        --cluster=CLUSTER_NAME \
        --region=REGION \
        --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
        --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
        --execute="SPARK_SQL_COMMAND"

    次のように置き換えます。

    • PROJECT_ID: Dataproc クラスタを含む Trusted Cloud プロジェクトの ID。
    • CLUSTER_NAME: Spark SQL ジョブの実行に使用している Dataproc クラスタの名前。
    • REGION: クラスタが配置されている Compute Engine リージョン
    • LOCATION: BigQuery リソースのロケーション。
    • CATALOG_NAME: SQL ジョブで使用する Spark カタログの名前。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。
    • SPARK_SQL_COMMAND: 実行する Spark SQL クエリ。このクエリには、リソースを作成するコマンドを含めます。たとえば、名前空間とテーブルを作成します。

    spark-sql CLI

    1. Trusted Cloud コンソールで [VM インスタンス] ページに移動します。

      [VM インスタンス] に移動

    2. Dataproc VM インスタンスに接続するには、Dataproc クラスタのメイン VM インスタンス名(クラスタ名の後に -m 接尾辞が付いたもの)が一覧表示されている行で [SSH] をクリックします。出力は次のようになります。

      Connected, host fingerprint: ssh-rsa ...
      Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
      ...
      example-cluster@cluster-1-m:~$
      
    3. ターミナルで、次の BigLake metastore の初期化コマンドを実行します。

      spark-sql \
          --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
          --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
          --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
          --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

      次のように置き換えます。

      • CATALOG_NAME: SQL ジョブで使用している Spark カタログの名前。
      • PROJECT_ID: Spark カタログがリンクされる BigLake metastore カタログの Trusted Cloud プロジェクト ID。
      • LOCATION: BigLake metastore の Trusted Cloud ロケーション。
      • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。

      クラスタに正常に接続すると、Spark ターミナルに「spark-sql」というプロンプトが表示されます。このプロンプトを使用して Spark ジョブを送信できます。

      spark-sql (default)>
      
  1. オプションの Flink コンポーネントを有効にして Dataproc クラスタを作成し、Dataproc 2.2 以降を使用していることを確認します。
  2. Trusted Cloud コンソールで、[VM インスタンス] ページに移動します。

    [VM インスタンス] に移動

  3. 仮想マシン インスタンスのリストで、[SSH] をクリックして、メインの Dataproc クラスタ VM インスタンスに接続します。このインスタンスは、クラスタ名の後に -m 接尾辞が付いた名前でリストされます。

  4. BigLake metastore 用に Iceberg カスタム カタログ プラグインを構成します。

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. YARN で Flink セッションを開始します。

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
    -s yarn-session
  6. Flink でカタログを作成します。

    CREATE CATALOG CATALOG_NAME WITH (
    'type'='iceberg',
    'warehouse'='WAREHOUSE_DIRECTORY',
    'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
    'gcp_project'='PROJECT_ID',
    'gcp_location'='LOCATION'
    );

    次のように置き換えます。

    • CATALOG_NAME: BigLake metastore カタログにリンクされている Flink カタログ識別子。
    • WAREHOUSE_DIRECTORY: ウェアハウス ディレクトリのベースパス(Flink がファイルを作成する Cloud Storage フォルダ)。この値は gs:// で始まります。
    • PROJECT_ID: Flink カタログがリンクされる BigLake metastore カタログのプロジェクト ID。
    • LOCATION: BigQuery リソースのロケーション

これで、Flink セッションが BigLake metastore に接続され、Flink SQL コマンドを実行できるようになりました。

BigLake metastore に接続したので、BigLake metastore に保存されているメタデータに基づいてリソースを作成して表示できます。

たとえば、インタラクティブ Flink SQL セッションで次のコマンドを実行して、Iceberg のデータベースとテーブルを作成してみます。

  1. カスタムの Iceberg カタログを使用する:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME は、Flink カタログ ID に置き換えます。

  2. データベースを作成する。これにより、BigQuery にデータセットが作成されます。

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME は、新しいデータベースの名前に置き換えます。

  3. 作成したデータベースを使用する。

    USE DATABASE_NAME;
  4. Iceberg テーブルを作成する。次の例では、販売テーブルを作成します。

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
      order_number BIGINT,
      price        DECIMAL(32,2),
      buyer        ROW<first_name STRING, last_name STRING>,
      order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME は、新しいテーブルの名前に置き換えます。

  5. テーブルのメタデータを表示する:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. データベース内のテーブルを一覧表示する。

    SHOW TABLES;

テーブルにデータを取り込む

前のセクションで Iceberg テーブルを作成したら、Flink DataGen をデータソースとして使用して、リアルタイム データをテーブルに取り込むことができます。このワークフローの例を次に示します。

  1. DataGen を使用して一時テーブルを作成します。

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.order_number.kind' = 'sequence',
      'fields.order_number.start' = '1',
      'fields.order_number.end' = '1000000',
      'fields.price.min' = '0',
      'fields.price.max' = '10000',
      'fields.buyer.first_name.length' = '10',
      'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    次のように置き換えます。

    • DATABASE_NAME: 一時テーブルを保存するデータベースの名前。
    • TEMP_TABLE_NAME: 一時テーブルの名前。
    • ICEBERG_TABLE_NAME: 前のセクションで作成した Iceberg テーブルの名前。
  2. 並列処理を 1 に設定します。

    SET 'parallelism.default' = '1';
  3. チェックポイント間隔を設定します。

    SET 'execution.checkpointing.interval' = '10second';
  4. チェックポイントを設定します。

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. リアルタイム ストリーミング ジョブを開始します。

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    出力は次のようになります。

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. ストリーミング ジョブのステータスを確認する手順は次のとおりです。

    1. Trusted Cloud コンソールで、[クラスタ] ページに移動します。

      [クラスタ] に移動

    2. クラスタを選択します。

    3. [ウェブ インターフェース] タブをクリックします。

    4. [YARN ResourceManager] リンクをクリックします。

    5. [YARN ResourceManager] インターフェースで、Flink セッションを見つけて、[Tracking UI] の [ApplicationMaster] リンクをクリックします。

    6. [ステータス] 列で、ジョブのステータスが [実行中] であることを確認します。

  7. Flink SQL クライアントでストリーミング データにクエリを実行します。

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. BigQuery でストリーミング データにクエリを実行します。

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Flink SQL クライアントでストリーミング ジョブを終了します。

    STOP JOB 'JOB_ID';

    JOB_ID は、ストリーミング ジョブの作成時に出力に表示されたジョブ ID に置き換えます。

Apache Spark 向け Serverless で metastore を構成する

BigLake metastore は、Spark SQL または PySpark を使用して Apache Spark 向け Serverless で構成できます。

Spark SQL

  1. BigLake metastore で実行する Spark SQL コマンドを含む SQL ファイルを作成します。たとえば、次のコマンドは名前空間とテーブルを作成します。

    CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME;
    CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';

    次のように置き換えます。

    • CATALOG_NAME: Spark テーブルを参照するカタログ名。
    • NAMESPACE_NAME: Spark テーブルを参照する名前空間名。
    • TABLE_NAME: Spark テーブルのテーブル名。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
  2. 次の gcloud dataproc batches submit spark-sql コマンドを実行して、Spark SQL バッチジョブを送信します。

    gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \
        --project=PROJECT_ID \
        --region=REGION \
        --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
        spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
        spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
        spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
        .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    次のように置き換えます。

    • SQL_SCRIPT_PATH: バッチジョブが使用する SQL ファイルのパス。
    • PROJECT_ID: バッチジョブを実行する Trusted Cloud プロジェクトの ID。
    • REGION: ワークロードが実行されるリージョン。
    • SUBNET_NAME(省略可): セッション サブネットの要件を満たす REGION の VPC サブネットの名前。
    • BUCKET_PATH: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。WAREHOUSE_DIRECTORY はこのバケットにあります。バケットの gs:// URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1 など)を指定できます。
    • LOCATION: バッチジョブを実行するロケーション。

    Spark バッチジョブの送信の詳細については、Spark バッチ ワークロードを実行するをご覧ください。

PySpark

  1. BigLake metastore で実行する PySpark コマンドを含む Python ファイルを作成します。

    たとえば、次のコマンドは、BigLake metastore に保存されている Iceberg テーブルを操作する Spark 環境を設定します。このコマンドは、新しい名前空間とその名前空間内の Iceberg テーブルを作成します。

    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")

    次のように置き換えます。

    • PROJECT_ID: バッチジョブを実行する Trusted Cloud プロジェクトの ID。
    • LOCATION: BigQuery リソースが配置されているロケーション
    • CATALOG_NAME: Spark テーブルを参照するカタログ名。
    • TABLE_NAME: Spark テーブルのテーブル名。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
    • NAMESPACE_NAME: Spark テーブルを参照する名前空間名。
  2. 次の gcloud dataproc batches submit pyspark コマンドを使用して、バッチジョブを送信します。

    gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \
        --version=2.2 \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_PATH \
        --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"

    次のように置き換えます。

    • PYTHON_SCRIPT_PATH: バッチジョブが使用する Python スクリプトのパス。
    • PROJECT_ID: バッチジョブを実行する Trusted Cloud プロジェクトの ID。
    • REGION: ワークロードが実行されるリージョン。
    • BUCKET_PATH: ワークロードの依存関係をアップロードする Cloud Storage バケットのロケーション。バケットの gs:// URI 接頭辞は必要ありません。バケットパスまたはバケット名(mybucketname1 など)を指定できます。

    PySpark バッチジョブの送信の詳細については、PySpark gcloud リファレンスをご覧ください。

次のステップ