Utiliser des jointures de flux à flux dans des requêtes continues

Dans BigQuery, vous pouvez utiliser des jointures de flux à flux dans des requêtes continues pour analyser et corréler des données provenant de deux flux de données en temps réel ou plus. Les jointures de flux à flux sont une opération de jointure entre deux tables ou plus qui reçoivent une ingestion de données orientées dans le temps.

Les cas d'utilisation courants des jointures de flux à flux incluent la détection de fraudes financières, la création de profils clients et l'optimisation de la gestion de la chaîne d'approvisionnement. Ces jointures sont un type clé d'opération avec état. Pour en savoir plus sur les opérations avec état, consultez la section Opérations avec état compatibles.

Types JOIN compatibles

Les types JOIN suivants sont compatibles avec les requêtes continues :

  • JOIN de flux à flux : opération de jointure entre deux tables ou plus qui reçoivent une ingestion de données orientée dans le temps.
  • INNER JOIN.

Types JOIN non compatibles

Les types JOIN suivants ne sont pas compatibles avec les requêtes continues :

  • JOIN de flux à table statique : jointure dans laquelle au moins une table jointe est une table statique qui ne reçoit pas d'ingestion de données orientées dans le temps. Un exemple de table statique est une table de dimension.
  • Autres formes d'opérations JOIN que INNER.
  • JOIN qui ne comportent pas de clauses JOIN orientées dans le temps.

Joindre des données provenant de plusieurs flux

La requête suivante vous montre comment joindre une table de courses en taxi à une table de demandes de taxi, identifier le taxi disponible le plus proche du demandeur dans un délai de cinq minutes et exporter ces données dans une autre table BigQuery :

INSERT INTO
 `real_time_taxi_streaming.matched_rides`
SELECT
 requests.timestamp AS request_time,
 requests.request_id,
 taxis.taxi_id,
 ST_DISTANCE(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude)
   ) AS distance_in_meters,
 taxis.timestamp AS taxi_available_time
FROM
 APPENDS (TABLE `real_time_taxi_streaming.ride_requests`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS requests
INNER JOIN
 APPENDS (TABLE `real_time_taxi_streaming.taxirides`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS taxis
ON
 requests.geohash = taxis.geohash
WHERE
 taxis.ride_status = 'available'
 AND taxis._CHANGE_TIMESTAMP BETWEEN(requests._CHANGE_TIMESTAMP - INTERVAL 5 MINUTE) AND requests._CHANGE_TIMESTAMP
 AND ST_DWITHIN(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude),
   2000 -- Distance in meters
   );

Considérations concernant JOIN

Les sections suivantes décrivent les points à prendre en compte lors de l'exécution d'une jointure de flux à flux.

Limites

  • Seules les opérations INNER JOIN sont compatibles. Les autres formes, telles que LEFT ou FULL OUTER, ne sont pas compatibles.
  • Chaque côté de l'opération INNER JOIN doit spécifier une heure de début pour la requête continue.
  • En plus d'une clé de jointure (par exemple, table1.user_id = table2.user_id), la clause JOIN doit inclure une condition pour limiter la colonne d'horodatage à un intervalle constant. Cette condition limite la durée pendant laquelle le système attend qu'un événement correspondant arrive dans l'autre flux. Par exemple, vous pouvez spécifier qu'un événement d'un flux ne peut être joint à un événement d'un autre flux que si leurs horodatages se trouvent dans un intervalle de cinq minutes. Vous n'êtes pas limité aux intervalles symétriques. Par exemple, vous pouvez utiliser un intervalle de cinq minutes d'un côté de la condition de jointure et un intervalle d'une heure de l'autre.
  • Lorsque vous démarrez une requête continue avec une jointure de flux à flux, seule la fonction APPENDS est compatible. La fonction CHANGES n'est pas compatible.
  • La colonne d'heure système BigQuery, définie par _CHANGE_TIMESTAMP, est la seule colonne d'horodatage compatible pour les opérations de jointure. Les colonnes définies par l'utilisateur ne sont pas compatibles.
  • Lorsque vous utilisez des agrégations fenêtrées, vous devez respecter toutes les limites d'agrégation fenêtrée documentées.

Remarques sur les tarifs

Les requêtes continues BigQuery sont facturées en fonction de la capacité de calcul (emplacements) consommée pendant l'exécution du job. Ce modèle basé sur le calcul s'applique également aux opérations avec état telles que les jointures. Étant donné que les jointures nécessitent que le système stocke un "état" pendant que la requête est active, elles consomment des ressources d'emplacement supplémentaires. Plus de contexte ou de données sont stockés dans une jointure (par exemple, lorsque vous utilisez des intervalles de temps plus longs dans la clause JOIN ou WHERE), plus l'état à conserver est important, ce qui entraîne une utilisation plus élevée des emplacements.

Étape suivante