Dataproc で BigLake metastore を使用する

このドキュメントでは、Compute Engine 上の Dataproc で BigLake metastore を使用する方法について説明します。この接続により、Apache Spark や Apache Flink などのオープンソース ソフトウェア エンジン間で機能する単一の共有メタストアが提供されます。

始める前に

  1. Trusted Cloud プロジェクトに対する課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  2. BigQuery API と Dataproc API を有効にします。

    API を有効にする

  3. 省略可: BigLake metastore の仕組みと、使用する理由について理解します。

必要なロール

メタデータ ストアとして BigLake metastore で Spark または Flink と Dataproc を使用するのに必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

全般的なワークフロー

BigLake metastore で Compute Engine 上の Dataproc を使用する一般的な手順は次のとおりです。

  1. Dataproc クラスタを作成するか、既存のクラスタを構成します。
  2. 任意のオープンソース ソフトウェア エンジン(Spark や Flink など)に接続します。
  3. JAR ファイルを使用して、Apache Iceberg カタログ プラグインをクラスタにインストールします。
  4. 使用しているオープンソース ソフトウェア エンジンに応じて、必要な BigLake Metastore リソースを作成して管理します。
  5. BigQuery で BigLake メタストア リソースにアクセスして使用します。

BigLake metastore を Spark に接続する

次の手順では、インタラクティブな Spark SQL を使用して Dataproc を BigLake metastore に接続します。

Iceberg カタログ プラグインをダウンロードする

BigLake metastore を Dataproc と Spark に接続するには、BigLake metastore Iceberg カタログ プラグイン jar ファイルを使用する必要があります。

このファイルは、Dataproc イメージ バージョン 2.2 にデフォルトで含まれています。Dataproc クラスタがインターネットに直接アクセスできない場合は、プラグインをダウンロードして、Dataproc クラスタがアクセスできる Cloud Storage バケットにアップロードする必要があります。

BigLake metastore Iceberg カタログ プラグインをダウンロードします

Dataproc クラスタを構成する

BigLake metastore に接続する前に、Dataproc クラスタを設定する必要があります。

これを行うには、新しいクラスタを作成するか、既存のクラスタを使用します。その後、このクラスタを使用してインタラクティブな Spark SQL を実行し、BigLake metastore リソースを管理します。

  • クラスタが作成されるリージョンのサブネットで、プライベート Google アクセス(PGA)を有効にする必要があります。デフォルトでは、2.2(デフォルト)以降のイメージ バージョンで作成された Dataproc クラスタ VM には、内部 IP アドレスのみがあります。クラスタ VM が Google API と通信できるようにするには、クラスタが作成されたリージョンの default(またはユーザー指定のネットワーク名)のネットワーク サブネットでプライベート Google アクセスを有効にします

  • このガイドの Zeppelin ウェブ インターフェースの例を実行する場合は、Zeppelin オプション コンポーネントが有効になっている Dataproc クラスタを使用または作成する必要があります。

新しいクラスタ

新しい Dataproc クラスタを作成するには、次の gcloud dataproc clusters create コマンドを実行します。この構成には、BigLake metastore の使用に必要な設定が含まれています。

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

次のように置き換えます。

  • CLUSTER_NAME: Dataproc クラスタの名前。
  • PROJECT_ID: クラスタを作成する Trusted Cloud プロジェクトの ID。
  • LOCATION: クラスタを作成する Trusted Cloud リージョン。

既存のクラスタ

既存のクラスタを構成するには、次の Iceberg Spark ランタイムをクラスタに追加します。

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

ランタイムは、次のいずれかの方法で追加できます。

Spark ジョブの送信

Spark ジョブを送信するには、次のいずれかの方法を使用します。

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

次のように置き換えます。

  • PROJECT_ID: Dataproc クラスタを含む Trusted Cloud プロジェクトの ID。
  • CLUSTER_NAME: Spark SQL ジョブの実行に使用している Dataproc クラスタの名前。
  • REGION: クラスタが配置されている Compute Engine リージョン
  • LOCATION: BigQuery リソースのロケーション。
  • CATALOG_NAME: SQL ジョブで使用している Spark カタログの名前。
  • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。
  • SPARK_SQL_COMMAND: 実行する Spark SQL クエリ。このクエリには、リソースを作成するコマンドを含めます。たとえば、名前空間とテーブルを作成します。

インタラクティブ Spark

Spark に接続してカタログ プラグインをインストールする

BigLake metastore のカタログ プラグインをインストールするには、SSH を使用して Dataproc クラスタに接続します。

  1. Trusted Cloud コンソールで [VM インスタンス] ページに移動します。
  2. Dataproc VM インスタンスに接続するには、仮想マシン インスタンスのリストで [SSH] をクリックします。出力は次のようになります。

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. ターミナルで、次の BigLake metastore の初期化コマンドを実行します。

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    次のように置き換えます。

    • CATALOG_NAME: SQL ジョブで使用している Spark カタログの名前。
    • PROJECT_ID: Spark カタログがリンクされる BigLake metastore カタログのプロジェクト ID。 Trusted Cloud
    • LOCATION: BigLake metastore の Trusted Cloud ロケーション。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。

    クラスタに正常に接続すると、Spark ターミナルに「spark-sql」というプロンプトが表示されます。

    spark-sql (default)>
    

BigLake metastore リソースを管理する

これで BigLake metastore に接続されました。BigLake Metastore に保存されているメタデータに基づいて、既存のリソースを表示したり、新しいリソースを作成したりできます。

たとえば、インタラクティブ Spark SQL セッションで次のコマンドを実行して、Iceberg 名前空間とテーブルを作成します。

  • カスタムの Iceberg カタログを使用する:

    USE `CATALOG_NAME`;
  • 名前空間を作成する:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • 作成した名前空間を使用する:

    USE NAMESPACE_NAME;
  • Iceberg テーブルを作成する:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • テーブルに行を挿入する:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • テーブルの列を追加する:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • テーブルのメタデータを表示する:

    DESCRIBE EXTENDED TABLE_NAME;
  • 名前空間内のテーブルの一覧を取得する:

    SHOW TABLES;

Zeppelin ノートブック

  1. Trusted Cloud コンソールで、[Dataproc クラスタ] ページに移動します。

    Dataproc クラスタに移動

  2. 使用するクラスタの名前をクリックします。

    [クラスタの詳細] ページが開きます。

  3. ナビゲーション メニューで [ウェブ インターフェース] をクリックします。

  4. [コンポーネント ゲートウェイ] で [Zeppelin] をクリックします。Zeppelin ノートブック ページが開きます。

  5. ナビゲーション メニューで [Notebook]、[+ Create new note] の順にクリックします。

  6. ダイアログでノートブック名を入力します。デフォルトのインタープリタとして [Spark] が選択されたままにします。

  7. [Create] をクリックします。新しいノートブックが作成されます。

  8. ノートブックで設定メニューをクリックし、[Interpreter] をクリックします。

  9. [Search interpreters] フィールドで「Spark」を検索します。

  10. [Edit] をクリックします。

  11. [Spark.jars] フィールドに、Spark JAR の URI を入力します。

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. [Save] をクリックします。

  13. [OK] をクリックします。

  14. 次の PySpark コードを Zeppelin ノートブックにコピーします。

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    次のように置き換えます。

    • CATALOG_NAME: SQL ジョブに使用する Spark カタログの名前。
    • PROJECT_ID: Dataproc クラスタを含む Trusted Cloud プロジェクトの ID。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。
    • NAMESPACE_NAME: Spark テーブルを参照する名前空間名。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
    • TABLE_NAME: Spark テーブルのテーブル名。
  15. 実行アイコンをクリックするか、Shift-Enter キーを押してコードを実行します。ジョブが完了すると、「Spark Job Finished」というステータス メッセージが表示され、出力にテーブルの内容が表示されます。

次の手順では、Flink SQL クライアントを使用して Dataproc を BigLake metastore に接続する方法を示します。

BigLake Metastore を Flink に接続するには、次の操作を行います。

  1. オプションの Flink コンポーネントを有効にして Dataproc クラスタを作成し、Dataproc 2.2 以降を使用していることを確認します。
  2. Trusted Cloud コンソールで、[VM インスタンス] ページに移動します。

    [VM インスタンス] に移動

  3. 仮想マシン インスタンスのリストで [SSH] をクリックして、Dataproc VM インスタンスに接続します。

  4. BigLake metastore 用に Iceberg カスタム カタログ プラグインを構成します。

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. YARN で Flink セッションを開始します。

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. Flink でカタログを作成します。

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    次のように置き換えます。

    • CATALOG_NAME: BigLake メタストア カタログにリンクされている Flink カタログ ID。
    • WAREHOUSE_DIRECTORY: ウェアハウス ディレクトリのベースパス(Flink がファイルを作成する Cloud Storage フォルダ)。この値は gs:// で始まります。
    • PROJECT_ID: Flink カタログがリンクされる BigLake メタストア カタログのプロジェクト ID。
    • LOCATION: BigQuery リソースのロケーション

Flink セッションが BigLake metastore に接続され、Flink SQL コマンドを実行できるようになりました。

BigLake Metastore に接続したので、BigLake Metastore に保存されているメタデータに基づいてリソースを作成して表示できます。

たとえば、インタラクティブな Flink SQL セッションで次のコマンドを実行して、Iceberg データベースとテーブルを作成します。

  1. カスタムの Iceberg カタログを使用する:

    USE CATALOG CATALOG_NAME;

    CATALOG_NAME は、Flink カタログ ID に置き換えます。

  2. データベースを作成します。これにより、BigQuery にデータセットが作成されます。

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    DATABASE_NAME は、新しいデータベースの名前に置き換えます。

  3. 作成したデータベースを使用します。

    USE DATABASE_NAME;
  4. Iceberg テーブルを作成します。次のコマンドでは、販売テーブルの例を作成します。

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    ICEBERG_TABLE_NAME は、新しいテーブルの名前に置き換えます。

  5. テーブルのメタデータを表示する:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. データベース内のテーブルを一覧表示します。

    SHOW TABLES;

テーブルにデータを取り込む

前のセクションで Iceberg テーブルを作成したら、Flink DataGen をデータソースとして使用して、リアルタイム データをテーブルに取り込むことができます。このワークフローの例を次に示します。

  1. DataGen を使用して一時テーブルを作成します。

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    次のように置き換えます。

    • DATABASE_NAME: 一時テーブルを保存するデータベースの名前。
    • TEMP_TABLE_NAME: 一時テーブルの名前。
    • ICEBERG_TABLE_NAME: 前のセクションで作成した Iceberg テーブルの名前。
  2. 並列処理を 1 に設定します。

    SET 'parallelism.default' = '1';
  3. チェックポイント間隔を設定します。

    SET 'execution.checkpointing.interval' = '10second';
  4. チェックポイントを設定します。

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. リアルタイム ストリーミング ジョブを開始します。

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    出力は次のようになります。

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. ストリーミング ジョブのステータスを確認する手順は次のとおりです。

    1. Trusted Cloud コンソールで [クラスタ] ページに移動します。

      [クラスタ] に移動

    2. クラスタを選択します。

    3. [ウェブ インターフェース] タブをクリックします。

    4. [YARN ResourceManager] リンクをクリックします。

    5. YARN ResourceManager インターフェースで Flink セッションを見つけて、[Tracking UI] の [ApplicationMaster] リンクをクリックします。

    6. [ステータス] 列で、ジョブのステータスが [実行中] であることを確認します。

  7. Flink SQL クライアントでストリーミング データにクエリを実行します。

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. BigQuery でストリーミング データをクエリします。

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Flink SQL クライアントでストリーミング ジョブを終了します。

    STOP JOB 'JOB_ID';

    JOB_ID は、ストリーミング ジョブの作成時に出力に表示されたジョブ ID に置き換えます。

次のステップ