Monitorize consultas contínuas

Pode monitorizar as consultas contínuas do BigQuery através das seguintes ferramentas do BigQuery:

Devido à natureza de execução prolongada de uma consulta contínua do BigQuery, as métricas que são normalmente geradas após a conclusão de uma consulta SQL podem estar ausentes ou ser imprecisas.

Use INFORMATION_SCHEMA visualizações

Pode usar várias vistas INFORMATION_SCHEMA para monitorizar consultas contínuas e reservas de consultas contínuas.

Ver detalhes do trabalho

Pode usar a vista JOBS para obter metadados de tarefas de consulta contínuas.

A seguinte consulta devolve os metadados de todas as consultas contínuas ativas. Os metadados incluem a data/hora da marca de água de saída, que representa o ponto até ao qual a consulta contínua processou dados com êxito.

  1. Na Trusted Cloud consola, aceda à página BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, execute a seguinte consulta:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    Substitua o seguinte:

Veja os detalhes da atribuição de reservas

Pode usar as vistas ASSIGNMENTS e RESERVATIONS para obter detalhes de atribuição de reservas de consultas contínuas.

Devolve detalhes de atribuição de reservas para consultas contínuas:

  1. Na Trusted Cloud consola, aceda à página BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, execute a seguinte consulta:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    Substitua o seguinte:

    • ADMIN_PROJECT_ID: o ID do projeto de administração que detém a reserva.
    • LOCATION: a localização da reserva.
    • PROJECT_ID: o ID do projeto atribuído à reserva. Apenas são devolvidas informações sobre consultas contínuas em execução neste projeto.

Veja informações sobre o consumo de ranhuras

Pode usar as vistas ASSIGNMENTS, RESERVATIONS e JOBS_TIMELINE para obter informações contínuas sobre o consumo de slots de consultas.

Devolve informações de consumo de ranhuras para consultas contínuas:

  1. Na Trusted Cloud consola, aceda à página BigQuery.

    Aceda ao BigQuery

  2. No editor de consultas, execute a seguinte consulta:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    Substitua o seguinte:

    • ADMIN_PROJECT_ID: o ID do projeto de administração que detém a reserva.
    • LOCATION: a localização da reserva.
    • PROJECT_ID: o ID do projeto atribuído à reserva. Apenas são devolvidas informações sobre consultas contínuas em execução neste projeto.

Também pode monitorizar as reservas de consultas contínuas através de outras ferramentas, como o explorador de métricas e os gráficos de recursos administrativos. Para mais informações, consulte o artigo Monitorize as reservas do BigQuery.

Use o gráfico de execução de consultas

Pode usar o gráfico de execução de consultas para obter estatísticas gerais e estatísticas de desempenho para uma consulta contínua. Para mais informações, consulte o artigo Veja estatísticas de desempenho das consultas.

Veja o histórico de trabalhos

Pode ver os detalhes das tarefas de consulta contínua no seu histórico de tarefas pessoal ou no histórico de tarefas do projeto. Para mais informações, consulte o artigo Veja os detalhes da tarefa.

Tenha em atenção que a lista de tarefas do histórico é ordenada pela hora de início da tarefa, pelo que as consultas contínuas que estão a ser executadas há algum tempo podem não estar perto do início da lista.

Use o explorador de tarefas administrativas

No explorador de tarefas administrativas, filtre as suas tarefas para mostrar consultas contínuas definindo o filtro Categoria da tarefa como Consulta contínua.

Utilize o Cloud Monitoring

Pode ver métricas específicas das consultas contínuas do BigQuery através do Cloud Monitoring. Para mais informações, consulte os artigos Crie painéis de controlo, gráficos e alertas e leia acerca das métricas disponíveis para visualização.

Alerta sobre consultas com falhas

Em vez de verificar rotineiramente se as suas consultas contínuas falharam, pode ser útil criar um alerta para receber uma notificação em caso de falha. Uma forma de o fazer é criar uma métrica baseada em registos do Cloud Logging personalizada com um filtro para os seus trabalhos e uma política de alertas do Cloud Monitoring com base nessa métrica:

  1. Quando cria uma consulta contínua, use um prefixo de ID da tarefa personalizado. Várias consultas contínuas podem partilhar o mesmo prefixo. Por exemplo, pode usar o prefixo prod- para indicar uma consulta de produção.
  2. Na Trusted Cloud consola, aceda à página Métricas baseadas em registos.

    Aceda a Métricas baseadas em registos

  3. Clique em Criar métrica. É apresentado o painel Criar métrica de registos.

  4. Para Tipo de métrica, selecione Contador.

  5. Na secção Detalhes, atribua um nome à métrica. Por exemplo, CUSTOM_JOB_ID_PREFIX-metric.

  6. Na secção Seleção de filtros, introduza o seguinte no editor de Criação de filtros:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Substitua o seguinte:

  7. Clique em Criar métrica.

  8. No menu de navegação, clique em Métricas baseadas em registos. A métrica que acabou de criar aparece na lista de métricas definidas pelo utilizador.

  9. Na linha da métrica, clique em Mais ações e, de seguida, em Criar alerta a partir da métrica.

  10. Clicar em Seguinte. Não precisa de alterar as predefinições na página Modo de configuração de políticas.

  11. Clicar em Seguinte. Não tem de alterar as predefinições na página Configurar acionador de alerta.

  12. Selecione os canais de notificação e introduza um nome para a política de alerta.

  13. Clique em Criar política.

Pode testar o seu alerta executando uma consulta contínua com o prefixo do ID da tarefa personalizado que selecionou e, em seguida, cancelando-o. A receção do alerta no seu canal de notificação pode demorar alguns minutos.

Repetir consultas com falha

Voltar a tentar uma consulta contínua com falha pode ajudar a evitar situações em que um pipeline contínuo está inativo durante um período prolongado ou requer intervenção humana para ser reiniciado. Seguem-se alguns aspetos importantes a ter em conta quando tenta novamente uma consulta contínua com falhas:

  • Se o reprocessamento de alguma quantidade de dados processados pela consulta anterior antes da falha é tolerável.
  • Como processar a limitação de novas tentativas ou a utilização da retirada exponencial.

Uma abordagem possível para automatizar a repetição de consultas é a seguinte:

  1. Crie um destino do Cloud Logging com base num filtro de inclusão que corresponda aos seguintes critérios para encaminhar registos para um tópico do Pub/Sub:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    Substitua o seguinte:

  2. Crie uma função do Cloud Run que seja acionada em resposta ao Pub/Sub que recebe registos correspondentes ao seu filtro.

    A função do Cloud Run pode aceitar a carga útil de dados da mensagem do Pub/Sub e tentar iniciar uma nova consulta contínua usando a mesma sintaxe SQL da consulta com falha, mas no início, imediatamente após a paragem da tarefa anterior.

Por exemplo, pode usar uma função semelhante à seguinte:

Python

Antes de experimentar este exemplo, siga as Pythoninstruções de configuração no início rápido do BigQuery com bibliotecas cliente. Para mais informações, consulte a API Python BigQuery documentação de referência.

Para se autenticar no BigQuery, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para bibliotecas de cliente.

Antes de executar exemplos de código, defina a variável GOOGLE_CLOUD_UNIVERSE_DOMAIN environment como s3nsapis.fr.

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

O que se segue?