Utiliser BigQuery DataFrames dans dbt

dbt (data build tool) est un framework en ligne de commande Open Source conçu pour la transformation de données dans les entrepôts de données modernes. dbt facilite les transformations de données modulaires en créant des modèles réutilisables basés sur SQL et Python. L'outil orchestre l'exécution de ces transformations dans l'entrepôt de données cible, en se concentrant sur l'étape de transformation du pipeline ELT. Pour en savoir plus, consultez la documentation dbt.

Dans dbt, un modèle Python est une transformation de données définie et exécutée à l'aide de code Python dans votre projet dbt. Au lieu d'écrire du code SQL pour la logique de transformation, vous écrivez des scripts Python que dbt orchestre ensuite pour qu'ils s'exécutent dans l'environnement de l'entrepôt de données. Un modèle Python vous permet d'effectuer des transformations de données qui peuvent être complexes ou inefficaces à exprimer en SQL. Cela permet d'exploiter les capacités de Python tout en bénéficiant de la structure de projet, de l'orchestration, de la gestion des dépendances, des tests et des fonctionnalités de documentation de dbt. Pour en savoir plus, consultez Modèles Python.

L'adaptateur dbt-bigquery permet d'exécuter du code Python défini dans les DataFrames BigQuery. Cette fonctionnalité est disponible dans dbt Cloud et dbt Core. Vous pouvez également obtenir cette fonctionnalité en clonant la dernière version de l'adaptateur dbt-bigquery.

Rôles requis

L'adaptateur dbt-bigquery est compatible avec l'authentification basée sur OAuth et sur un compte de service.

Si vous prévoyez de vous authentifier auprès de l'adaptateur dbt-bigquery à l'aide d'OAuth, demandez à votre administrateur de vous accorder les rôles suivants :

Si vous prévoyez de vous authentifier auprès de l'adaptateur dbt-bigquery à l'aide d'un compte de service, demandez à votre administrateur d'attribuer les rôles suivants au compte de service que vous prévoyez d'utiliser :

Si vous vous authentifiez à l'aide d'un compte de service, assurez-vous également que le rôle Utilisateur du compte de service (roles/iam.serviceAccountUser) est attribué au compte de service que vous prévoyez d'utiliser.

Environnement d'exécution Python

L'adaptateur dbt-bigquery utilise le service d'exécution de notebooks Colab Enterprise pour exécuter le code Python BigQuery DataFrames. Un notebook Colab Enterprise est automatiquement créé et exécuté par l'adaptateur dbt-bigquery pour chaque modèle Python. Vous pouvez choisir le projetTrusted Cloud dans lequel exécuter le notebook. Le notebook exécute le code Python du modèle, qui est converti en code SQL BigQuery par la bibliothèque BigQuery DataFrames. Le code SQL BigQuery est ensuite exécuté dans le projet configuré. Le schéma suivant présente le flux de contrôle :

Environnement d'exécution Python BigQuery DataFrames pour un notebook

Si aucun modèle de notebook n'est déjà disponible dans le projet et que l'utilisateur exécutant le code dispose des autorisations nécessaires pour créer le modèle, l'adaptateur dbt-bigquery crée et utilise automatiquement le modèle de notebook par défaut. Vous pouvez également spécifier un autre modèle de notebook à l'aide d'une configuration dbt.

L'exécution du notebook nécessite un bucket Cloud Storage intermédiaire pour stocker le code et les journaux. Toutefois, l'adaptateur dbt-bigquery copie les journaux dans les journaux dbt. Vous n'avez donc pas besoin de parcourir le bucket.

Fonctionnalités compatibles

L'adaptateur dbt-bigquery est compatible avec les fonctionnalités suivantes pour les modèles dbt Python exécutant BigQuery DataFrames :

  • Charger des données à partir d'une table BigQuery existante avec la macro dbt.source().
  • Charger des données à partir d'autres modèles dbt avec la macro dbt.ref() pour créer des dépendances et des graphes orientés acycliques (DAG) avec des modèles Python.
  • Spécifier et utiliser des packages Python de PyPi pouvant être utilisés avec l'exécution de code Python. Pour en savoir plus, consultez Configurations.
  • Spécifier un modèle d'environnement d'exécution de notebook personnalisé pour vos modèles BigQuery DataFrames.

L'adaptateur dbt-bigquery est compatible avec les stratégies de matérialisation suivantes :

  • Matérialisation de table, où les données sont reconstruites sous forme de tableau à chaque exécution.
  • Matérialisation incrémentielle avec une stratégie de fusion, où les données nouvelles ou mises à jour sont ajoutées à une table existante, souvent à l'aide d'une stratégie de fusion pour gérer les modifications.

Configurer dbt pour utiliser BigQuery DataFrames

Si vous utilisez dbt Core, vous devez utiliser un fichier profiles.yml pour l'utiliser avec BigQuery DataFrames. L'exemple suivant utilise la méthode oauth :

your_project_name:
  outputs:
    dev:
      compute_region: us-central1
      dataset: your_bq_dateset
      gcs_bucket: your_gcs_bucket
      job_execution_timeout_seconds: 300
      job_retries: 1
      location: US
      method: oauth
      priority: interactive
      project: your_gcp_project
      threads: 1
      type: bigquery
  target: dev

Si vous utilisez dbt Cloud, vous pouvez vous connecter à votre plate-forme de données directement dans l'interface dbt Cloud. Dans ce scénario, vous n'avez pas besoin de fichier profiles.yml. Pour en savoir plus, consultez À propos de profiles.yml.

Voici un exemple de configuration au niveau du projet pour le fichier dbt_project.yml :

# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models.
name: 'your_project_name'
version: '1.0.0'

# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models

# In this example config, we tell dbt to build all models in the example/
# directory as views. These settings can be overridden in the individual model
# files using the config(...) macro.

models:
  your_project_name:
    submission_method: bigframes
    notebook_template_id: 7018811640745295872
    packages: ["scikit-learn", "mlflow"]
    timeout: 3000
    # Config indicated by + and applies to all files under models/example/
    example:
      +materialized: view

Certains paramètres peuvent également être configurés à l'aide de la méthode dbt.config dans votre code Python. Si ces paramètres entrent en conflit avec votre fichier dbt_project.yml, les configurations avec dbt.config seront prioritaires.

Pour en savoir plus, consultez Configurations de modèle et dbt_project.yml.

Configurations

Vous pouvez configurer les éléments suivants à l'aide de la méthode dbt.config dans votre modèle Python. Ces configurations remplacent la configuration au niveau du projet.

Configuration Obligatoire Utilisation
submission_method Oui submission_method=bigframes
notebook_template_id Non Si aucun n'est spécifié, un modèle par défaut est créé et utilisé.
packages Non Spécifiez la liste supplémentaire des packages Python, si nécessaire.
timeout Non Facultatif : Prolongez le délai d'expiration de l'exécution du job.

Exemples de modèles Python

Les sections suivantes présentent des exemples de scénarios et de modèles Python.

Charger des données à partir d'une table BigQuery

Pour utiliser les données d'une table BigQuery existante comme source dans votre modèle Python, vous devez d'abord définir cette source dans un fichier YAML. L'exemple suivant est défini dans un fichier source.yml.

version: 2

sources:
  - name: my_project_source   # A custom name for this source group
    database: bigframes-dev   # Your Google Cloud project ID
    schema: yyy_test_us       # The BigQuery dataset containing the table
    tables:
      - name: dev_sql1        # The name of your BigQuery table

Ensuite, vous créez votre modèle Python, qui peut utiliser les sources de données configurées dans ce fichier YAML :

def model(dbt, session):
    # Configure the model to use BigFrames for submission
    dbt.config(submission_method="bigframes")

    # Load data from the 'dev_sql1' table within 'my_project_source'
    source_data = dbt.source('my_project_source', 'dev_sql1')

    # Example transformation: Create a new column 'id_new'
    source_data['id_new'] = source_data['id'] * 10

    return source_data

Référencer un autre modèle

Vous pouvez créer des modèles qui dépendent de la sortie d'autres modèles dbt, comme illustré dans l'exemple suivant. Cette fonctionnalité est utile pour créer des pipelines de données modulaires.

def model(dbt, session):
    # Configure the model to use BigFrames
    dbt.config(submission_method="bigframes")

    # Reference another dbt model named 'dev_sql1'.
    # It assumes you have a model defined in 'dev_sql1.sql' or 'dev_sql1.py'.
    df_from_sql = dbt.ref("dev_sql1")

    # Example transformation on the data from the referenced model
    df_from_sql['id'] = df_from_sql['id'] * 100

    return df_from_sql

Spécifier une dépendance de package

Si votre modèle Python nécessite des bibliothèques tierces spécifiques telles que MLflow ou Boto3, vous pouvez déclarer le package dans la configuration du modèle, comme indiqué dans l'exemple suivant. Ces packages sont installés dans l'environnement d'exécution.

def model(dbt, session):
    # Configure the model for BigFrames and specify required packages
    dbt.config(
        submission_method="bigframes",
        packages=["mlflow", "boto3"]  # List the packages your model needs
    )

    # Import the specified packages for use in your model
    import mlflow
    import boto3

    # Example: Create a DataFrame showing the versions of the imported packages
    data = {
        "mlflow_version": [mlflow.__version__],
        "boto3_version": [boto3.__version__],
        "note": ["This demonstrates accessing package versions after import."]
    }
    bdf = bpd.DataFrame(data)

    return bdf

Spécifier un modèle autre que celui par défaut

Pour mieux contrôler l'environnement d'exécution ou utiliser des paramètres préconfigurés, vous pouvez spécifier un modèle de notebook non défini par défaut pour votre modèle BigQuery DataFrames, comme illustré dans l'exemple suivant.

def model(dbt, session):
    dbt.config(
        submission_method="bigframes",
     # ID of your pre-created notebook template
        notebook_template_id="857350349023451yyyy",
    )

    data = {"int": [1, 2, 3], "str": ['a', 'b', 'c']}
    return bpd.DataFrame(data=data)

Matérialiser les tables

Lorsque dbt exécute vos modèles Python, il doit savoir comment enregistrer les résultats dans votre entrepôt de données. C'est ce qu'on appelle la matérialisation.

Pour la matérialisation de table standard, dbt crée ou remplace entièrement une table dans votre entrepôt par la sortie de votre modèle à chaque exécution. Cela se fait par défaut ou en définissant explicitement la propriété materialized='table', comme indiqué dans l'exemple suivant.

def model(dbt, session):
    dbt.config(
        submission_method="bigframes",
     # Instructs dbt to create/replace this model as a table
        materialized='table',
    )

    data = {"int_column": [1, 2], "str_column": ['a', 'b']}
    return bpd.DataFrame(data=data)

La matérialisation incrémentielle avec une stratégie de fusion permet à dbt de mettre à jour votre table uniquement avec les lignes nouvelles ou modifiées. Cela est utile pour les grands ensembles de données, car la reconstruction complète d'une table à chaque fois peut être inefficace. La stratégie de fusion est une méthode courante pour gérer ces mises à jour.

Cette approche intègre intelligemment les modifications en procédant comme suit :

  • Mise à jour des lignes existantes qui ont été modifiées.
  • Ajouter des lignes
  • Facultatif, selon la configuration : suppression des lignes qui ne sont plus présentes dans la source.

Pour utiliser la stratégie de fusion, vous devez spécifier une propriété unique_key que dbt peut utiliser pour identifier les lignes correspondantes entre la sortie de votre modèle et la table existante, comme illustré dans l'exemple suivant.

def model(dbt, session):
    dbt.config(
        submission_method="bigframes",
        materialized='incremental',
        incremental_strategy='merge',
        unique_key='int',  # Specifies the column to identify unique rows
    )

    # In this example:
    # - Row with 'int' value 1 remains unchanged.
    # - Row with 'int' value 2 has been updated.
    # - Row with 'int' value 4 is a new addition.
    # The 'merge' strategy will ensure that only the updated row ('int 2')
    # and the new row ('int 4') are processed and integrated into the table.
    data = {"int": [1, 2, 4], "str": ['a', 'bbbb', 'd']}
    return bpd.DataFrame(data=data)

Dépannage

Vous pouvez observer l'exécution Python dans les journaux dbt.

Vous pouvez également afficher le code et les journaux (y compris les exécutions précédentes) sur la page Exécutions Colab Enterprise.

Accéder aux exécutions Colab Enterprise

Facturation

Lorsque vous utilisez l'adaptateur dbt-bigquery avec BigQuery DataFrames, des frais Trusted Cloud by S3NS sont facturés pour les éléments suivants :

  • Exécution de notebooks : vous êtes facturé pour l'exécution de l'environnement d'exécution du notebook. Pour en savoir plus, consultez Tarifs des runtimes de notebooks.

  • Exécution des requêtes BigQuery : dans le notebook, BigQuery DataFrames convertit Python en SQL et exécute le code dans BigQuery. Vous êtes facturé en fonction de la configuration de votre projet et de votre requête, comme décrit dans les tarifs de BigQuery DataFrames.

Vous pouvez utiliser le libellé de facturation suivant dans la console de facturation BigQuery pour filtrer le rapport de facturation pour l'exécution de notebooks et pour les exécutions BigQuery déclenchées par l'adaptateur dbt-bigquery :

  • Libellé d'exécution BigQuery : bigframes-dbt-api

Étapes suivantes