Surveiller les requêtes continues

Vous pouvez surveiller les requêtes continues BigQuery à l'aide des outils BigQuery suivants :

En raison de la nature de longue durée d'une requête continue BigQuery, les métriques généralement générées à la fin d'une requête SQL peuvent être absentes ou inexactes.

Utiliser les vues INFORMATION_SCHEMA

Vous pouvez utiliser plusieurs vues INFORMATION_SCHEMA pour surveiller les requêtes continues et les réservations de requêtes continues.

Afficher les informations sur le job

Vous pouvez utiliser la vue JOBS pour obtenir des métadonnées de job de requête continue.

La requête suivante renvoie les métadonnées de toutes les requêtes continues actives. Les métadonnées incluent le code temporel du filigrane de sortie, qui représente le point jusqu'auquel la requête continue a traité les données avec succès.

  1. Dans la console Trusted Cloud , accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, saisissez la requête suivante :

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Remplacez les éléments suivants :

Afficher les détails de l'attribution de réservation

Vous pouvez utiliser les vues ASSIGNMENTS et RESERVATIONS pour obtenir les détails d'attribution de réservation de requêtes continues.

Renvoyez les détails d'attribution de réservation pour les requêtes continues :

  1. Dans la console Trusted Cloud , accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, saisissez la requête suivante :

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Remplacez les éléments suivants :

    • ADMIN_PROJECT_ID : ID du projet d'administration propriétaire de la réservation.
    • LOCATION : emplacement de la réservation.
    • PROJECT_ID : ID du projet attribué à la réservation. Seules les informations sur les requêtes continues exécutées dans ce projet sont renvoyées.

Afficher les informations sur la consommation d'emplacements

Vous pouvez utiliser les vues ASSIGNMENTS, RESERVATIONS et JOBS_TIMELINE pour obtenir des informations sur la consommation des emplacements de requête en continu.

Renvoyez des informations sur la consommation des emplacements pour les requêtes continues :

  1. Dans la console Trusted Cloud , accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, saisissez la requête suivante :

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Remplacez les éléments suivants :

    • ADMIN_PROJECT_ID : ID du projet d'administration propriétaire de la réservation.
    • LOCATION : emplacement de la réservation.
    • PROJECT_ID : ID du projet attribué à la réservation. Seules les informations sur les requêtes continues exécutées dans ce projet sont renvoyées.

Vous pouvez également surveiller les réservations de requêtes continues à l'aide d'autres outils tels que l'explorateur de métriques et les graphiques de ressources d'administration. Pour en savoir plus, consultez Surveiller les réservations BigQuery.

Utiliser le graphique d'exécution de requête

Vous pouvez utiliser le graphique d'exécution de requête pour obtenir des informations sur les performances et des statistiques générales pour une requête continue. Pour en savoir plus, consultez la page Afficher les informations sur les performances des requêtes.

Afficher l'historique des missions

Vous pouvez afficher les détails d'un job de requête continue dans l'historique personnel de vos jobs ou dans l'historique des jobs du projet. Pour en savoir plus, consultez la section Afficher les détails des jobs.

Sachez que la liste historique des jobs est triée en fonction de l'heure de début du job. Par conséquent, les requêtes continues qui s'exécutent depuis un certain temps pourraient ne pas se trouver au début de la liste.

Utiliser l'explorateur de jobs d'administration

Dans l'explorateur de jobs administratifs, filtrez vos jobs pour afficher les requêtes continues en définissant le filtre Catégorie de job sur Requête continue.

Utilisez Cloud Monitoring

Vous pouvez afficher des métriques spécifiques aux requêtes continues BigQuery à l'aide de Cloud Monitoring. Pour en savoir plus, consultez Créer des tableaux de bord, des graphiques et des alertes, et découvrez les métriques disponibles pour la visualisation.

Alerter sur les requêtes ayant échoué

Au lieu de vérifier régulièrement si vos requêtes continues ont échoué, il peut être utile de créer une alerte pour vous avertir en cas d'échec. Pour ce faire, vous pouvez créer une métrique basée sur les journaux Cloud Logging personnalisée avec un filtre pour vos jobs, ainsi qu'une règle d'alerte Cloud Monitoring basée sur cette métrique :

  1. Lorsque vous créez une requête continue, utilisez un préfixe d'ID de job personnalisé. Plusieurs requêtes continues peuvent partager le même préfixe. Par exemple, vous pouvez utiliser le préfixe prod- pour indiquer une requête de production.
  2. Dans la console Trusted Cloud , accédez à la page Métriques basées sur les journaux.

    Accéder à la page "Métriques basées sur les journaux"

  3. Cliquez sur Créer une métrique. Le panneau Créer une métrique de journaux s'affiche.

  4. Dans le champ Type de métrique, sélectionnez Compteur.

  5. Dans la section Détails, attribuez un nom à votre métrique. Exemple : CUSTOM_JOB_ID_PREFIX-metric.

  6. Dans la section Sélection du filtre, saisissez ce qui suit dans l'éditeur Créer un filtre :

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Remplacez les éléments suivants :

  7. Cliquez sur Créer une métrique.

  8. Dans le menu de navigation, cliquez sur Métriques basées sur les journaux. La métrique que vous venez de créer apparaît dans la liste des métriques définies par l'utilisateur.

  9. Sur la ligne de votre métrique, cliquez sur Autres actions, puis sur Créer une alerte à partir de la métrique.

  10. Cliquez sur Suivant. Vous n'avez pas besoin de modifier les paramètres par défaut sur la page Mode de configuration des règles.

  11. Cliquez sur Suivant. Vous n'avez pas besoin de modifier les paramètres par défaut sur la page Configurer le déclencheur d'alerte.

  12. Sélectionnez vos canaux de notification et saisissez un nom pour la règle d'alerte.

  13. Cliquez sur Créer une règle.

Pour tester votre alerte, exécutez une requête continue avec le préfixe d'ID de job personnalisé que vous avez sélectionné, puis annulez-la. L'alerte peut mettre quelques minutes à s'afficher dans votre canal de notification.

Réessayer les requêtes ayant échoué

Réessayer une requête continue ayant échoué peut vous aider à éviter les situations où un pipeline continu est hors service pendant une période prolongée ou nécessite une intervention humaine pour redémarrer. Voici quelques points importants à prendre en compte lorsque vous réessayez une requête continue ayant échoué :

  • Indique si le retraitement d'une partie des données traitées par la requête précédente avant son échec est acceptable.
  • Gérer la limitation des tentatives ou l'utilisation d'un intervalle exponentiel entre les tentatives

Voici une approche possible pour automatiser les nouvelles tentatives de requête :

  1. Créez un récepteur Cloud Logging basé sur un filtre d'inclusion correspondant aux critères suivants pour acheminer les journaux vers un sujet Pub/Sub :

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Remplacez les éléments suivants :

  2. Créez une fonction Cloud Run qui se déclenche en réponse à la réception par Pub/Sub de journaux correspondant à votre filtre.

    La fonction Cloud Run pourrait accepter la charge utile de données du message Pub/Sub et tenter de démarrer une nouvelle requête continue en utilisant la même syntaxe SQL que la requête ayant échoué, mais au début, juste après l'arrêt de la tâche précédente.

Par exemple, vous pouvez utiliser une fonction semblable à celle-ci :

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de BigQuery : Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API BigQuery pour Python.

Pour vous authentifier auprès de BigQuery, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez la page Configurer l'authentification pour les bibliothèques clientes.

Avant d'exécuter des exemples de code, définissez la variable d'environnement GOOGLE_CLOUD_UNIVERSE_DOMAIN sur s3nsapis.fr.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

Étapes suivantes