在持续查询中使用流到流联接

如需就此功能请求支持或提供反馈,请发送电子邮件至 bq-continuous-queries-feedback@google.com

持续查询支持 JOIN 作为有状态操作。有状态操作使持续查询能够执行复杂的分析,从而在多个行或时间间隔内保留信息。借助此功能,您可以在查询运行时将必要的数据存储在内存中,从而关联来自不同流的事件。如需详细了解有状态操作,请参阅支持的有状态操作

流到流联接是指在两个或更多个接收时间导向型数据注入的表之间进行的联接操作。

支持的 JOIN 类型

持续查询支持以下联接类型:

  • 流到流联接 - 两个或更多接收时间导向型数据注入的表之间的联接操作。
  • INNER JOIN

不支持的 JOIN 类型

持续查询不支持以下联接类型:

  • 流式表与静态表之间的联接 - 至少一个联接表是静态表,不接收时间导向型数据注入。静态表的一个示例是维度表。
  • INNER 之外的其他形式的 JOIN 操作。
  • 没有面向时间的 JOIN 子句的 JOIN。

联接来自多个数据流的数据

以下查询展示了如何将出租车行程表与出租车请求表联接,识别 5 分钟时间范围内距离请求者最近的可用出租车,并将此数据导出到另一个 BigQuery 表中:

INSERT INTO
 `real_time_taxi_streaming.matched_rides`
SELECT
 requests.timestamp AS request_time,
 requests.request_id,
 taxis.taxi_id,
 ST_DISTANCE(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude)
   ) AS distance_in_meters,
 taxis.timestamp AS taxi_available_time
FROM
 APPENDS (TABLE `real_time_taxi_streaming.ride_requests`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS requests
INNER JOIN
 APPENDS (TABLE `real_time_taxi_streaming.taxirides`,
   CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS taxis
ON
 requests.geohash = taxis.geohash
WHERE
 taxis.ride_status = 'available'
 AND taxis._CHANGE_TIMESTAMP BETWEEN(requests._CHANGE_TIMESTAMP - INTERVAL 5 MINUTE) AND requests._CHANGE_TIMESTAMP
 AND ST_DWITHIN(
   ST_GEOGPOINT(requests.longitude, requests.latitude),
   ST_GEOGPOINT(taxis.longitude, taxis.latitude),
   2000 -- Distance in meters
   );

加入注意事项

以下部分介绍了执行流到流联接时需要考虑的事项。

限制

  • 仅支持 INNER JOIN 操作;不支持其他形式,例如 LEFTFULL OUTER
  • INNER JOIN 操作的每一侧都必须为持续查询指定开始时间。
  • 除了联接键(例如 table1.user_id = table2.user_id)之外,JOIN 子句还必须包含一个条件,用于将时间戳列限制为恒定间隔。此条件限制了系统在另一个流中等待匹配事件到达的时间。例如,您可以指定,只有当一个流中的事件与另一个流中的事件的时间戳相差在 5 分钟以内时,这两个事件才能联接。您不限于使用对称区间。例如,您可以在联接条件的一侧使用 5 分钟的间隔,而在另一侧使用 1 小时的间隔。
  • 在启动包含流到流联接的持续查询时,仅支持 APPENDS 函数。不支持 CHANGES 函数。
  • _CHANGE_TIMESTAMP 定义的 BigQuery 系统时间列是唯一受支持的用于联接操作的时间戳列。不支持用户定义的列。
  • 与窗口化聚合函数搭配使用时,您必须遵循所有已记录的窗口聚合函数限制

价格注意事项

BigQuery 持续查询的费用根据作业运行期间消耗的计算容量(槽)计算。这种基于计算的模型也适用于联接等有状态操作。由于联接需要系统在查询处于活跃状态时存储“状态”,因此它们会消耗额外的 slot 资源。联接中存储的上下文或数据越多(例如,在 JOINWHERE 子句中使用的时间间隔越长),需要保留的状态就越多,从而导致 slot 利用率更高。

后续步骤