Mengonfigurasi metastore BigLake
Dokumen ini menjelaskan cara mengonfigurasi metastore BigLake dengan Dataproc atau Trusted Cloud by S3NS Serverless untuk Apache Spark guna membuat satu metastore bersama yang berfungsi di seluruh mesin open source, seperti Apache Spark atau Apache Flink.
Sebelum memulai
- Aktifkan penagihan untuk Trusted Cloud project Anda. Pelajari cara memeriksa apakah penagihan telah diaktifkan pada suatu project.
Aktifkan BigQuery dan Dataproc API.
Opsional: Pahami cara kerja BigLake metastore dan alasan Anda harus menggunakannya.
Peran yang diperlukan
Untuk mendapatkan izin yang Anda perlukan untuk mengonfigurasi metastore BigLake, minta administrator Anda untuk memberi Anda peran IAM berikut:
-
Buat cluster Dataproc:
Dataproc Worker (
roles/dataproc.worker
) di akun layanan default Compute Engine dalam project -
Buat tabel metastore BigLake:
-
Dataproc Worker (
roles/dataproc.worker
) di akun layanan VM Dataproc dalam project -
BigQuery Data Editor (
roles/bigquery.dataEditor
) di akun layanan VM Dataproc dalam project -
Storage Object Admin (
roles/storage.objectAdmin
) di akun layanan VM Dataproc dalam project
-
Dataproc Worker (
-
Membuat kueri tabel metastore BigLake:
-
BigQuery Data Viewer (
roles/bigquery.dataViewer
) di project -
Pengguna BigQuery (
roles/bigquery.user
) di project -
Storage Object Viewer (
roles/storage.objectViewer
) di project
-
BigQuery Data Viewer (
Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.
Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran kustom atau peran yang telah ditentukan lainnya.
Mengonfigurasi metastore dengan Dataproc
Anda dapat mengonfigurasi metastore BigLake dengan Dataproc menggunakan Spark atau Flink:
Spark
Konfigurasi cluster baru. Untuk membuat cluster Dataproc baru, jalankan perintah
gcloud dataproc clusters create
berikut, yang berisi setelan yang perlu Anda gunakan untuk metastore BigLake:gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --single-node
Ganti kode berikut:
CLUSTER_NAME
: nama untuk cluster Dataproc Anda.PROJECT_ID
: ID Trusted Cloud project tempat Anda membuat cluster.LOCATION
: region Compute Engine tempat Anda membuat cluster.
Kirimkan tugas Spark menggunakan salah satu metode berikut:
Google Cloud CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region=REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Ganti kode berikut:
PROJECT_ID
: ID Trusted Cloud project yang berisi cluster Dataproc.CLUSTER_NAME
: nama cluster Dataproc yang Anda gunakan untuk menjalankan tugas Spark SQL.REGION
: region Compute Engine tempat cluster Anda berada.LOCATION
: lokasi resource BigQuery.CATALOG_NAME
: nama katalog Spark yang akan digunakan dengan tugas SQL Anda.WAREHOUSE_DIRECTORY
: folder Cloud Storage yang berisi data warehouse Anda. Nilai ini diawali dengangs://
.SPARK_SQL_COMMAND
: kueri Spark SQL yang ingin Anda jalankan. Kueri ini mencakup perintah untuk membuat resource Anda. Misalnya, untuk membuat namespace dan tabel.
spark-sql CLI
Di konsol Trusted Cloud , buka halaman VM Instances.
Untuk terhubung ke instance VM Dataproc, klik SSH di baris yang mencantumkan nama instance VM utama cluster Dataproc, yaitu nama cluster yang diikuti dengan akhiran
-m
. Outputnya mirip dengan hal berikut ini:Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
Di terminal, jalankan perintah inisialisasi metastore BigLake berikut:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Ganti kode berikut:
CATALOG_NAME
: nama katalog Spark yang Anda gunakan dengan tugas SQL.PROJECT_ID
: ID Trusted Cloud project dari katalog metastore BigLake yang ditautkan dengan katalog Spark Anda.LOCATION
: Trusted Cloud lokasi BigLake Metastore.WAREHOUSE_DIRECTORY
: folder Cloud Storage yang berisi data warehouse Anda. Nilai ini diawali dengangs://
.
Setelah berhasil terhubung ke cluster, terminal Spark Anda akan menampilkan perintah
spark-sql
, yang dapat Anda gunakan untuk mengirimkan tugas Spark.spark-sql (default)>
Flink
- Buat cluster Dataproc dengan mengaktifkan komponen Flink opsional,
dan pastikan Anda menggunakan Dataproc
2.2
atau yang lebih baru. Di konsol Trusted Cloud , buka halaman VM instances.
Dalam daftar instance virtual machine, klik SSH untuk terhubung ke instance VM cluster Dataproc utama, yang tercantum sebagai nama cluster yang diikuti dengan akhiran
-m
.Konfigurasi plugin katalog kustom Iceberg untuk BigLake Metastore:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Mulai sesi Flink di YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
Buat katalog di Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Ganti kode berikut:
CATALOG_NAME
: ID katalog Flink, yang ditautkan ke katalog metastore BigLake.WAREHOUSE_DIRECTORY
: jalur dasar untuk direktori gudang (folder Cloud Storage tempat Flink membuat file). Nilai ini diawali dengangs://
.PROJECT_ID
: project ID katalog BigLake Metastore yang ditautkan dengan katalog Flink.LOCATION
: lokasi resource BigQuery.
Sesi Flink Anda kini terhubung ke metastore BigLake, dan Anda dapat menjalankan perintah Flink SQL.
Mengelola resource metastore BigLake
Setelah terhubung ke metastore BigLake, Anda dapat membuat dan melihat resource berdasarkan metadata yang disimpan di metastore BigLake.
Misalnya, coba jalankan perintah berikut di sesi Flink SQL interaktif Anda untuk membuat database dan tabel Iceberg.
Menggunakan katalog Iceberg kustom:
USE CATALOG CATALOG_NAME;
Ganti
CATALOG_NAME
dengan ID katalog Flink Anda.Buat database, yang akan membuat set data di BigQuery:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Ganti
DATABASE_NAME
dengan nama database baru Anda.Gunakan database yang Anda buat:
USE DATABASE_NAME;
Buat tabel Iceberg. Contoh berikut membuat tabel penjualan contoh:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Ganti
ICEBERG_TABLE_NAME
dengan nama untuk tabel baru Anda.Melihat metadata tabel:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Mencantumkan tabel dalam database:
SHOW TABLES;
Menyerap data ke dalam tabel
Setelah membuat tabel Iceberg di bagian sebelumnya, Anda dapat menggunakan Flink DataGen sebagai sumber data untuk menyerap data real-time ke dalam tabel Anda. Langkah-langkah berikut adalah contoh alur kerja ini:
Buat tabel sementara menggunakan DataGen:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Ganti kode berikut:
DATABASE_NAME
: nama database untuk menyimpan tabel sementara Anda.TEMP_TABLE_NAME
: nama untuk tabel sementara Anda.ICEBERG_TABLE_NAME
: nama tabel Iceberg yang Anda buat di bagian sebelumnya.
Tetapkan paralelisme ke 1:
SET 'parallelism.default' = '1';
Menetapkan interval titik pemeriksaan:
SET 'execution.checkpointing.interval' = '10second';
Tetapkan titik pemeriksaan:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Mulai tugas streaming real-time:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
Outputnya mirip dengan hal berikut ini:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
Untuk memeriksa status tugas streaming, lakukan hal berikut:
Di konsol Trusted Cloud , buka halaman Clusters.
Pilih cluster Anda.
Klik tab Antarmuka web.
Klik link YARN ResourceManager.
Di antarmuka YARN ResourceManager, temukan sesi Flink Anda, lalu klik link ApplicationMaster di bagian Tracking UI.
Di kolom Status, konfirmasi bahwa status tugas Anda adalah Berjalan.
Buat kueri data streaming di klien Flink SQL:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Mengkueri data streaming di BigQuery:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Hentikan tugas streaming di klien Flink SQL:
STOP JOB 'JOB_ID';
Ganti
JOB_ID
dengan ID tugas yang ditampilkan di output saat Anda membuat tugas streaming.
Mengonfigurasi metastore dengan Serverless untuk Apache Spark
Anda dapat mengonfigurasi metastore BigLake dengan Serverless untuk Apache Spark menggunakan Spark SQL atau PySpark.
Spark SQL
Buat file SQL dengan perintah Spark SQL yang ingin Anda jalankan di metastore BigLake. Misalnya, perintah ini akan membuat namespace dan tabel:
CREATE NAMESPACE `CATALOG_NAME`.NAMESPACE_NAME; CREATE TABLE `CATALOG_NAME`.NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
Ganti kode berikut:
CATALOG_NAME
: nama katalog yang mereferensikan tabel Spark Anda.NAMESPACE_NAME
: nama namespace yang mereferensikan tabel Spark Anda.TABLE_NAME
: nama tabel untuk tabel Spark Anda.WAREHOUSE_DIRECTORY
: URI folder Cloud Storage tempat gudang data Anda disimpan.
Kirimkan tugas batch Spark SQL dengan menjalankan perintah
gcloud dataproc batches submit spark-sql
berikut:gcloud dataproc batches submit spark-sql SQL_SCRIPT_PATH \ --project=PROJECT_ID \ --region=REGION \ --subnet=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ .sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Ganti kode berikut:
SQL_SCRIPT_PATH
: jalur ke file SQL yang digunakan oleh tugas batch.PROJECT_ID
: ID Trusted Cloud project untuk menjalankan tugas batch.REGION
: region tempat workload Anda berjalan.SUBNET_NAME
(opsional): nama subnet VPC diREGION
yang memenuhi persyaratan subnet sesi.BUCKET_PATH
: lokasi bucket Cloud Storage untuk mengupload dependensi workload.WAREHOUSE_DIRECTORY
berada di bucket ini. Awalan URIgs://
bucket tidak diperlukan. Anda dapat menentukan jalur bucket atau nama bucket, misalnya,mybucketname1
.LOCATION
: lokasi untuk menjalankan tugas batch.
Untuk mengetahui informasi selengkapnya tentang cara mengirimkan tugas batch Spark, lihat Menjalankan workload batch Spark.
PySpark
Buat file python dengan perintah PySpark yang ingin Anda jalankan di BigLake metastore.
Misalnya, perintah berikut menyiapkan lingkungan Spark untuk berinteraksi dengan tabel Iceberg yang disimpan di metastore BigLake. Kemudian, perintah akan membuat namespace baru dan tabel Iceberg dalam namespace tersebut.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("BigLake Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';")
Ganti kode berikut:
PROJECT_ID
: ID Trusted Cloud project untuk menjalankan tugas batch.LOCATION
: lokasi tempat resource BigQuery berada.CATALOG_NAME
: nama katalog yang mereferensikan tabel Spark Anda.TABLE_NAME
: nama tabel untuk tabel Spark Anda.WAREHOUSE_DIRECTORY
: URI folder Cloud Storage tempat gudang data Anda disimpan.NAMESPACE_NAME
: nama namespace yang mereferensikan tabel Spark Anda.
Kirimkan tugas batch menggunakan perintah
gcloud dataproc batches submit pyspark
berikut:gcloud dataproc batches submit pyspark PYTHON_SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=BUCKET_PATH \ --properties="spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog,spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog,spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID,spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION,spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY"
Ganti kode berikut:
PYTHON_SCRIPT_PATH
: jalur ke skrip Python yang digunakan tugas batch.PROJECT_ID
: ID Trusted Cloud project untuk menjalankan tugas batch.REGION
: region tempat workload Anda berjalan.BUCKET_PATH
: lokasi bucket Cloud Storage untuk mengupload dependensi workload. Awalan URIgs://
bucket tidak diperlukan. Anda dapat menentukan jalur bucket atau nama bucket, misalnya,mybucketname1
.
Untuk mengetahui informasi selengkapnya tentang cara mengirimkan tugas batch PySpark, lihat referensi gcloud PySpark.