Fonctionnalités supplémentaires de BigLake Metastore

Pour personnaliser la configuration de votre metastore BigLake, vous pouvez utiliser les fonctionnalités supplémentaires suivantes :

  • Procédures Apache Spark Iceberg
  • Option de filtrage pour les tableaux non compatibles
  • Remplacements de la connexion BigQuery
  • Règles de contrôle des accès pour les tables Iceberg BigLake Metastore

Utiliser les procédures Iceberg Spark

Pour utiliser les procédures Spark Iceberg, vous devez inclure les extensions SQL Iceberg dans votre configuration Spark. Par exemple, vous pouvez créer une procédure pour revenir à un état précédent.

Utiliser Spark-SQL interactif pour revenir à un état antérieur

Vous pouvez utiliser une procédure Iceberg Spark pour créer, modifier et rétablir l'état précédent d'une table. Exemple :

  1. Créez une table Spark :

    spark-sql \
       --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
       --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
       --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
       --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Remplacez les éléments suivants :

    • CATALOG_NAME : nom du catalogue qui fait référence à votre table Spark.
    • PROJECT_ID : ID du projet Trusted Cloud .

    • WAREHOUSE_DIRECTORY : URI du dossier Cloud Storage dans lequel votre entrepôt de données est stocké.

    USE `CATALOG_NAME`;
    CREATE NAMESPACE NAMESPACE_NAME;
    USE NAMESPACE NAMESPACE_NAME;
    CREATE TABLE NAMESPACE_NAME.TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY';
    INSERT INTO NAMESPACE_NAME.TABLE_NAME VALUES (1, "first row");
    DESCRIBE EXTENDED TABLE_NAME;

    Remplacez les éléments suivants :

    • NAMESPACE_NAME : nom de l'espace de noms qui fait référence à votre table Spark.
    • TABLE_NAME : nom de table qui fait référence à votre table Spark.

    Le résultat contient des informations sur la configuration de la table :

    ...
    Table Properties [current-snapshot-id=1659239298328512231,format=iceberg/parquet,format-version=2,write.parquet.compression-codec=zstd]
    ...
  2. Modifiez à nouveau la table, puis restaurez-la à l'instantané 1659239298328512231 créé précédemment :

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
    INSERT INTO TABLE_NAME VALUES (2, "second row", 2.5);
    SELECT * FROM TABLE_NAME;
    CALL CATALOG_NAME.system.set_current_snapshot('NAMESPACE_NAME.TABLE_NAME', SNAPSHOT_ID);
    SELECT * FROM TABLE_NAME;

    Remplacez les éléments suivants :

    • SNAPSHOT_ID : ID de l'instantané vers lequel vous effectuez le rollback.

    Le résultat ressemble à ce qui suit :

    1 first row
    Time taken: 0.997 seconds, Fetched 1 row(s)

Filtrer les tables non compatibles à partir des fonctions de liste des tables

Lorsque vous utilisez SparkSQL avec le catalogue BigLake Metastore, la commande SHOW TABLES affiche toutes les tables de l'espace de noms spécifié, y compris celles qui ne sont pas compatibles avec Spark.

Pour n'afficher que les tables compatibles, activez l'option filter_unsupported_tables :

spark-sql
  --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
  --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
  --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
  --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
  --conf spark.sql.catalog.CATALOG_NAME.filter_unsupported_tables="true"

Remplacez les éléments suivants :

  • CATALOG_NAME : nom du catalogue Spark à utiliser.
  • PROJECT_ID : ID du projet Trusted Cloud à utiliser.
  • LOCATION : emplacement des ressources BigQuery.
  • WAREHOUSE_DIRECTORY : dossier Cloud Storage à utiliser comme entrepôt de données.

Définir un remplacement de connexion BigQuery

Vous pouvez utiliser des connexions BigQuery pour accéder aux données stockées en dehors de BigQuery, par exemple dans Cloud Storage.

Pour définir un remplacement de connexion BigQuery qui donne accès à un bucket Cloud Storage, procédez comme suit :

  1. Dans votre projet BigQuery, créez une connexion à votre ressource Cloud Storage. Cette connexion définit la façon dont BigQuery accède à vos données.

  2. Attribuez le rôle roles/bigquery.connectionUser à l'utilisateur ou au compte de service qui accède aux données de la connexion.

    Assurez-vous que la ressource de connexion partage le même emplacement que les ressources cibles dans BigQuery. Pour en savoir plus, consultez la page Gérer les connexions.

  3. Spécifiez la connexion dans votre table Iceberg avec la propriété bq_connection :

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG LOCATION 'WAREHOUSE_DIRECTORY' TBLPROPERTIES ('bq_connection'='projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_ID');

    Remplacez les éléments suivants :

    • TABLE_NAME : nom de table pour votre table Spark.
    • WAREHOUSE_DIRECTORY : URI du bucket Cloud Storage qui stocke vos données.
    • PROJECT_ID : ID du projet Trusted Cloud à utiliser.
    • LOCATION : emplacement de la connexion.
    • CONNECTION_ID : ID de la connexion.

Définir des stratégies de contrôle des accès

Vous pouvez activer le contrôle des accès détaillé (FGAC, fine-grained access control) sur les tables Iceberg du metastore BigLake en configurant des stratégies de contrôle des accès. Vous ne pouvez définir des règles de contrôle des accès que sur les tables qui utilisent un remplacement de connexion BigQuery. Vous pouvez définir ces règles de différentes manières :

Une fois que vous avez configuré vos règles FGAC, vous pouvez interroger la table depuis Spark à l'aide de l'exemple suivant :

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
.appName("BigLake Metastore Iceberg") \
.config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
.config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
.config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
.getOrCreate()

spark.sql("USE `CATALOG_NAME`;")

# Configure spark for storing temp results
spark.conf.set("viewsEnabled","true")
spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE");
spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE")

spark.sql("USE NAMESPACE DATASET_NAME;")

sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME"""
df = spark.read.format("bigquery").load(sql)
df.show()

Remplacez les éléments suivants :

  • CATALOG_NAME : nom de votre catalogue.
  • PROJECT_ID : ID du projet contenant vos ressources BigQuery.
  • LOCATION : emplacement des ressources BigQuery.
  • WAREHOUSE_DIRECTORY : URI du dossier Cloud Storage contenant votre entrepôt de données.
  • MATERIALIZATION_NAMESPACE : espace de noms dans lequel vous souhaitez stocker les résultats temporaires.
  • DATASET_NAME : nom de l'ensemble de données contenant la table que vous interrogez.
  • ICEBERG_TABLE_NAME : nom de la table que vous interrogez.

Étapes suivantes