Stream-to-Stream-Joins in kontinuierlichen Abfragen verwenden
In BigQuery können Sie Stream-to-Stream-Joins in kontinuierlichen Abfragen verwenden, um Daten aus zwei oder mehr Echtzeitdatenstreams zu analysieren und in Beziehung zu setzen. Stream-to-Stream-Joins sind Join-Vorgänge zwischen zwei oder mehr Tabellen, in die zeitbezogene Daten aufgenommen werden.
Häufige Anwendungsfälle für Stream-to-Stream-Joins sind die Erkennung von Finanzbetrug, das Erstellen von Kundenprofilen und die Optimierung des Lieferkettenmanagements. Diese Joins sind ein wichtiger Typ von zustandsbehafteten Vorgängen. Weitere Informationen zu zustandsbehafteten Vorgängen finden Sie unter Unterstützte zustandsbehaftete Vorgänge.
Unterstützte JOIN-Typen
Die folgenden JOIN-Typen werden von kontinuierlichen Abfragen unterstützt:
- Stream-to-Stream-JOIN: Ein Join-Vorgang zwischen zwei oder mehr Tabellen, in die zeitbezogene Daten aufgenommen werden.
- INNER JOIN
Nicht unterstützte JOIN-Typen
Die folgenden JOIN-Typen werden von kontinuierlichen Abfragen nicht unterstützt:
- Stream-to-static-table JOINs (Verknüpfungen von Stream- und statischen Tabellen): Eine Verknüpfung, bei der mindestens eine verknüpfte Tabelle eine statische Tabelle ist, in die keine zeitbezogene Datenaufnahme erfolgt. Ein Beispiel für eine statische Tabelle ist eine Dimensionstabelle.
- Andere Formen von JOIN-Vorgängen als
INNER. - JOINs ohne zeitbezogene JOIN-Klauseln.
Daten aus mehreren Streams zusammenführen
In der folgenden Abfrage wird gezeigt, wie Sie eine Tabelle mit Taxifahrten mit einer Tabelle mit Taxianfragen verknüpfen, das nächstgelegene verfügbare Taxi für den Anfragenden innerhalb eines Zeitfensters von 5 Minuten ermitteln und diese Daten in eine andere BigQuery-Tabelle exportieren:
INSERT INTO
`real_time_taxi_streaming.matched_rides`
SELECT
requests.timestamp AS request_time,
requests.request_id,
taxis.taxi_id,
ST_DISTANCE(
ST_GEOGPOINT(requests.longitude, requests.latitude),
ST_GEOGPOINT(taxis.longitude, taxis.latitude)
) AS distance_in_meters,
taxis.timestamp AS taxi_available_time
FROM
APPENDS (TABLE `real_time_taxi_streaming.ride_requests`,
CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS requests
INNER JOIN
APPENDS (TABLE `real_time_taxi_streaming.taxirides`,
CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS taxis
ON
requests.geohash = taxis.geohash
WHERE
taxis.ride_status = 'available'
AND taxis._CHANGE_TIMESTAMP BETWEEN(requests._CHANGE_TIMESTAMP - INTERVAL 5 MINUTE) AND requests._CHANGE_TIMESTAMP
AND ST_DWITHIN(
ST_GEOGPOINT(requests.longitude, requests.latitude),
ST_GEOGPOINT(taxis.longitude, taxis.latitude),
2000 -- Distance in meters
);
Hinweise zu JOIN
In den folgenden Abschnitten werden wichtige Aspekte beschrieben, die bei der Durchführung eines Stream-to-Stream-Joins berücksichtigt werden müssen.
Beschränkungen
- Es werden nur
INNER JOIN-Vorgänge unterstützt. Andere Formen wieLEFToderFULL OUTERwerden nicht unterstützt. - Auf jeder Seite des
INNER JOIN-Vorgangs muss eine Startzeit für die kontinuierliche Abfrage angegeben werden. - Zusätzlich zu einem Join-Schlüssel (z. B.
table1.user_id = table2.user_id) muss dieJOIN-Klausel eine Bedingung enthalten, um die Zeitstempelspalte auf ein konstantes Intervall zu beschränken. Mit dieser Bedingung wird begrenzt, wie lange das System auf ein übereinstimmendes Ereignis im anderen Stream wartet. Sie können beispielsweise festlegen, dass ein Ereignis aus einem Stream nur mit einem Ereignis aus einem anderen Stream verknüpft werden kann, wenn die Zeitstempel innerhalb eines 5-Minuten-Intervalls liegen. Sie sind nicht auf symmetrische Intervalle beschränkt. Sie können beispielsweise ein 5‑Minuten-Intervall auf der einen Seite der Join-Bedingung und ein 1‑Stunden-Intervall auf der anderen Seite verwenden. - Wenn Sie eine kontinuierliche Abfrage mit einem Stream-zu-Stream-Join starten, wird nur die Funktion
APPENDSunterstützt. Die FunktionCHANGESwird nicht unterstützt. - Die BigQuery-Systemzeitspalte, die durch
_CHANGE_TIMESTAMPdefiniert wird, ist die einzige unterstützte Zeitstempelspalte für Join-Vorgänge. Benutzerdefinierte Spalten werden nicht unterstützt. - Bei Verwendung in Verbindung mit Fensteraggregationen müssen Sie alle dokumentierten Einschränkungen für Fensteraggregationen einhalten.
Kosten
Kontinuierliche BigQuery-Abfragen werden basierend auf der Rechenkapazität (Slots) abgerechnet, die während der Ausführung des Jobs verbraucht wird. Dieses rechenbasierte Modell gilt auch für zustandsbehaftete Vorgänge wie Joins. Da für Joins der „Status“ gespeichert werden muss, während die Abfrage aktiv ist, werden zusätzliche Slot-Ressourcen benötigt. Wenn Sie mehr Kontext oder Daten in einem Join speichern, z. B. wenn Sie längere Zeitintervalle in der JOIN- oder WHERE-Klausel verwenden, muss mehr Status beibehalten werden, was zu einer höheren Slot-Auslastung führt.