使用 Data Engineering Agent 构建和修改数据流水线

借助 Data Engineering Agent,您可以使用 Gemini in BigQuery 构建、修改和管理数据流水线,以便在 BigQuery 中加载和处理数据。借助 Data Engineering Agent,您可以使用自然语言提示从各种数据源生成数据流水线,或调整现有数据流水线以满足您的数据工程需求。数据工程代理具有以下功能:

  • 使用自然语言创建流水线:代理使用 Gemini 来了解您的数据并解读您的纯语言指令。您可以使用简单明了的指令让 Data Engineering Agent 构建或修改数据流水线。

  • Dataform 集成:代理会生成必要的流水线代码并将其整理到 Dataform 仓库中的 SQLX 文件中。代理在 Dataform 工作区中运行,因此 Dataform 流水线会自动提供给代理。

  • 自定义代理指令:使用简单明了的语言创建代理指令,以定义数据工程代理的自定义规则。代理指令在整个组织中保持不变,可用于强制执行组织范围内的规则,例如命名惯例或样式指南。

  • 流水线验证:代理会验证所有生成的代码,以确保数据流水线正常运行。

您可以将自然语言提示与 Data Engineering Agent 搭配使用,以创建表、视图、断言、声明和操作 SQLX 文件。例如,您可以使用数据工程代理执行以下操作:

  • 以各种格式(如 CSV、AVRO 或 PARQUET)从 Cloud Storage 等外部数据源加载数据。
  • 创建或使用现有的 BigQuery 例程 (UDF) 对数据执行自定义分析和转换。
  • 以自然语言为代理定义可重复使用的准则。

如需查看更多可与数据工程代理搭配使用的提示示例,请参阅提示示例

限制

Data Engineering Agent 具有以下限制:

  • 数据工程代理是一项正式发布前产品,不适合在生产环境中使用。
  • Data Engineering Agent 不支持以下文件类型的自然语言命令:
    • 笔记本
    • 数据准备
    • 任何 SQLx 中的 JavaScript
  • Data Engineering Agent 无法执行流水线。用户需要查看并运行或安排流水线。
  • 如果 SQL 依赖于不存在的中间资源,则数据工程代理无法在不完全调用流水线(用户触发)的情况下验证该 SQL。
  • 数据工程代理无法搜索通过指令或直接提示提供的任何网页链接或网址。
  • 代理指令文件中导入文件时,@ 导入语法仅支持以 .// 或字母开头的路径。
  • 数据预览功能仅支持将 hasOutput 标志设置为 true 的表、声明或查询。

支持的区域

Gemini in BigQuery 使用 Cloud de Confiance进行负载均衡,因此能够在全球范围内运行。由于 Gemini in BigQuery 在全球范围内运行,因此您无法选择要使用的区域。

所有 Dataform 和 BigQuery 请求都会发送到各自的区域端点,因此所有数据和代码都会保留在相应区域内。

Gemini in BigQuery 可从以下区域提供服务:

美洲

  • 爱荷华 (us-central1)

欧洲

  • 芬兰 (europe-north1)
  • 法兰克福 (europe-west3)

数据工程代理如何使用您的数据

为了生成更高质量的代理回答,Data Engineering Agent 可以从 BigQuery 和 Dataplex Universal Catalog 中检索其他数据和元数据,包括 BigQuery 表中的示例行以及在 Dataplex Universal Catalog 中生成的数据扫描配置文件。这些数据不会用于训练数据工程代理,仅在代理对话期间用作额外背景信息,以帮助代理生成回答。

准备工作

确保已为您的 Cloud de Confiance by S3NS 项目启用 Gemini in BigQuery。如需了解详情,请参阅设置 Gemini in BigQuery

您还必须启用 Gemini Data Analytics API。

Roles required to enable APIs

To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

Enable the API

所需的角色

如需获得使用 Data Engineering Agent 所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义角色或其他预定义角色来获取所需的权限。

使用数据工程代理生成数据流水线

如需在 BigQuery 中使用数据工程代理,请选择以下选项之一:

BigQuery 流水线

您可以在 BigQuery Pipelines 界面中使用数据工程代理,方法如下:

  1. 转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,依次点击arrow_drop_down 新建 > 流水线

  3. 选择执行凭证的选项,然后点击开始。这些凭证不会被代理使用,但您需要使用它们来执行生成的数据流水线。

  4. 点击试用数据流水线代理体验

  5. 向代理提问字段中,输入自然语言提示以生成数据流水线。例如:

      Create dimension tables for a taxi trips star schema from
      new_york_taxi_trips.tlc_green_trips_2022. Generate surrogate keys and all
      the descriptive attributes.
    

    输入提示后,点击发送

  6. 数据工程代理根据您的提示生成数据流水线。

数据工程代理生成的数据流水线是数据流水线的建议草稿。您可以点击流水线节点,查看生成的 SQLX 查询。 如需应用代理建议的数据流水线,请点击应用

应用数据工程代理针对数据流水线提出的更改建议。

Dataform

您可以在 Dataform 中使用数据工程代理,方法如下:

  1. 前往 Dataform 页面。

    前往 Dataform

  2. 选择一个代码库。

  3. 选择或创建开发工作区。

  4. 在工作区中,点击向代理提问

  5. 在随即显示的向代理提问提示中,输入自然语言提示以生成数据流水线。例如:

      Create dimension tables for a taxi trips star schema from
      new_york_taxi_trips.tlc_green_trips_2022. Generate surrogate keys and all
      the descriptive attributes.
    

    输入提示后,点击发送

发送提示后,Data Engineering Agent 会根据您的提示生成数据流水线并修改 Dataform SQLX 文件。代理会将这些更改直接应用到您的工作区文件。

查看数据流水线

在数据工程代理生成的数据流水线中,您可以点击流水线节点来查看它。

  • 配置标签页显示与节点关联的生成的 SQLX 查询。
  • 数据预览标签页会显示文件的输入和输出表格。 您可以点击运行任务来运行任务(无论是否带依赖项),从而通过此节点预览数据转换。

修改数据流水线

您可以使用数据工程代理修改数据流水线,方法是点击向代理提问,然后输入建议更改数据流水线的提示。

点击“向代理提问”,在数据流水线上使用数据工程代理。

查看数据工程代理提出的更改,然后点击应用以应用这些更改。

您还可以通过选择流水线节点,然后点击打开来手动修改 SQLX 查询。

为数据工程代理创建代理指令

代理指令是针对数据工程代理的自然语言指令,可让您存储持久性指令,以便代理遵循一组自定义的预定义规则。如果您希望代理的输出结果在整个组织中保持一致,例如遵循命名惯例或强制执行样式指南,请使用代理指令。

您可以创建 GEMINI.MD 上下文文件作为数据工程代理的代理指令文件。您可以创建代理指令文件以在本地工作区中使用,也可以使用外部代码库在多个数据流水线中使用相同的指令文件。

如需创建代理指令,请执行以下操作:

  1. 向代理提问下,点击流水线指令
  2. 流水线说明窗格中,点击创建说明文件
  3. 在随即显示的 GEMINI.MD 文件中,以自然语言输入指令。

    以下示例展示了一个包含多条规则的代理指令文件:

      1. All event-specific tables MUST be prefixed with `cs_event_`.
      2. The primary key for any player activity table is a composite key of `player_id` and `event_timestamp_micros`.
      3. Filter out any player actions where `mana_spent` is greater than `max_mana_pool`. This is considered a data anomaly.
    
  4. 点击保存

如需了解如何以最佳方式构建代理指令文件,请参阅代理指令文件最佳实践

从外部代码库加载代理指令

您可以关联外部代码库,以指定一组可在多个数据流水线中重复使用的代理指令:

  1. 向代理提问下,点击流水线指令
  2. 外部代码库下,选择使用外部代码库中的指令
  3. 在提供的字段中,指定包含您要用于数据流水线的代理指令的代码库。
  4. 点击保存

导入其他本地文件作为代理指令

您还可以使用 @file.md 语法将数据工程代理的其他指令文件导入 GEMINI.md 文件中。如需了解详情,请参阅内存导入处理器

排查数据流水线错误

如果您在生成数据流水线期间遇到任何错误,请验证您是否已完成运行 Data Engineering Agent 所需的所有前提条件。如需了解详情,请参阅准备工作

如果生成的数据流水线遇到任何错误,您可以按照以下步骤提示数据工程代理诊断任何数据流水线故障并提出问题排查建议:

  1. 在流水线或开发工作区中,点击执行标签页。
  2. 在执行列表内,找到失败的数据流水线运行。您可以在执行运行的状态列中识别失败的运行。

    在流水线的“状态”列中查找失败的执行运行。

  3. 将光标悬停在该图标上,然后点击调查。Data Engineering Agent 会对数据流水线执行情况进行分析,以查找任何错误。

    提示数据工程代理诊断流水线中的错误。

  4. 分析完成后,数据工程代理会在观测结果和假设部分生成报告。此报告包含以下信息:

    • 从数据流水线执行日志中提取的观测结果和数据点
    • 失败的可能原因
    • 一组可用于解决已发现问题的可行步骤或建议

借助数据工程代理提供的问题排查报告,您可以手动实现建议。您还可以按照以下步骤指示 Data Engineering Agent 为您应用修复:

  1. 复制问题排查报告中的建议。
  2. 返回到 Data Engineering Agent:
    1. 如果您使用的是 BigQuery 流水线,请前往流水线页面,然后点击咨询客服人员
    2. 如果您使用的是 Dataform,请点击咨询代理
  3. 将建议粘贴到提示中,并指示数据工程代理直接对您的数据流水线进行修复。
  4. 点击发送

示例提示

以下部分展示了一些示例提示,您可以使用这些提示与数据工程代理一起开发数据流水线。

将现有数据汇总到新表中

收到此提示后,Data Engineering Agent 会使用架构和样本来推断按键进行的数据分组。代理通常会设置包含表和列说明的新表配置。

  Create a daily sales report from the
  bigquery-public-data.thelook_ecommerce.order_items table into a
  reporting.daily_sales_aggregation table.

向现有表添加数据质量检查

收到此提示后,代理会根据架构和样本推断出合理的表格质量检查。您还可以添加一些主观断言作为提示的一部分。

  Add data quality checks for bigquery-public-data.thelook_ecommerce.users.

创建新的派生列并向新表添加数据质量检查

以下提示展示了如何同时添加表和列,并为表指定质量检查。

  Create a new table named staging.products from
  bigquery-public-data.thelook_ecommerce.products and add a calculated column
  named gross_profit, which is the retail_price minus the cost.


  Also, add the following assertions: ID must not be null and must be unique.
  The retail_price must be greater than or equal to the cost. The department
  column can only contain 'Men' or 'Women'.

创建 UDF 作为模型定义的一部分

数据工程代理还可以设置 DDL 来创建用户定义的函数 (UDF)。虽然代理实际上不会创建 UDF,但您可以通过运行数据流水线来创建 UDF。这些 UDF 可用于数据流水线中的模型定义。

  Create a user-defined function (UDF) named get_age_group that takes an integer
  age as input and returns a string representing the age group ('Gen Z',
  'Millennial', 'Gen X', 'Baby Boomer').


  Use this UDF on the age column from the
  bigquery-public-data.thelook_ecommerce.users table to create a new view called
  reporting.user_age_demographics that includes user_id, age, and the calculated
  age_group.

最佳做法

以下部分建议了使用 Data Engineering Agent 和 Dataform 的最佳实践。

使用数据工程代理的最佳实践

针对常见请求使用代理指令。如果您发现自己经常应用某些技巧,或者经常对代理进行相同的更正,请使用代理指令作为集中存储常用指令和请求的位置。

为代理提供更多上下文信息。您可以将术语表中的术语附加到 BigQuery 表和列,并生成数据分析扫描结果,从而为代理提供来自 Dataplex Universal Catalog 的更多上下文。词汇表术语可用于标记需要额外上下文的列,例如包含个人身份信息 (PII) 的列(需要特殊处理说明),或用于识别不同表格中名称不同的匹配列。数据分析扫描可帮助代理更好地了解表格列中的数据分布,并有助于代理创建更具体的数据质量断言。如需了解详情,请参阅数据分析简介

清晰地撰写内容。明确说明您的要求,避免含糊不清。在提示时,尽可能提供来源和目的地数据源,如以下示例所示:

  Extract data from the sales.customers table in the us_west_1 region, and load
  it into the reporting.dim_customers table in BigQuery. Match the schema of the
  destination table.

提供直接且有范围的请求。一次只问一个问题,并使提示简洁明了。对于包含多个问题的提示,您可以将问题的每个不同部分列出来,以提高清晰度,如以下示例所示:

  1. Create a new table named staging.events_cleaned. Use raw.events as the
     source. This new table should filter out any records where the user_agent
     matches the pattern '%bot%'. All original columns should be included.

  2. Next, create a table named analytics.user_sessions. Use
     staging.events_cleaned as the source. This table should calculate the
     duration for each session by grouping by session_id and finding the
     difference between the MAX(event_timestamp) and MIN(event_timestamp).

提供明确的说明并强调关键术语。您可以在提示中突出显示关键术语或概念,并将某些要求标记为重要,如以下示例所示:

  When creating the staging.customers table, it is *VERY IMPORTANT* that you
  transform the email column from the source table bronze.raw_customers.
  Coalesce any NULL values in the email column to an empty string ''.

指定操作顺序。对于有顺序的任务,您可以将提示以列表的形式呈现,其中列出的项分为专注的小步骤,如下例所示:

  Create a pipeline with the following steps:
  1. Extract data from the ecomm.orders table.
  2. Join the extracted data with the marts.customers table on customer_id
  3. Load the final result into the reporting.customer_orders table.

优化并迭代。不断尝试不同的措辞和方法,看看哪种效果最好。如果代理生成了无效的 SQL 或其他错误,请使用示例或公开文档来引导代理。

  The previous query was incorrect because it removed the timestamp. Please
  correct the SQL. Use the TIMESTAMP_TRUNC function to truncate the
  event_timestamp to the nearest hour, instead of casting it as a DATE. For
  example: TIMESTAMP_TRUNC(event_timestamp, HOUR).

代理指令文件的最佳实践

创建代理指令文件,自定义数据工程代理以满足您的需求。使用代理指令时,我们建议您执行以下操作:

  • Dataform 中的所有文件路径都相对于代码库的根目录。使用相对路径来确保所有 @file.md 语法都能正确地将指令导入到 GEMINI.md
  • GEMINI.md 中导入的文件本身可以包含导入,从而创建嵌套结构。为防止无限递归,GEMINI.md 的最大导入深度为 5 级。
  • 如需在多个数据流水线之间共享指令,请将指令存储在中央 Dataform 代码库中,并将其关联到工作 Dataform 代码库。您可以使用本地指令来替换流水线特定行为的中央规则。
  • 在代理指令文件中使用标题和列表有助于整理和阐明数据工程代理的指令。
  • 提供有意义的文件名,并将类似说明归为一组,放在一个文件中。 使用 Markdown 标题按类别、功能或功能以逻辑方式整理规则。
  • 为避免指令冲突,请明确定义每条指令适用的具体条件。
  • 迭代并优化提示和工作流程。随着代理的推出和模型升级,代理行为会随时间而变化,因此我们建议您使用不同的提示迭代规则,以确定可能需要改进的方面。确保规则文件与数据流水线的任何更改保持同步。

以下示例展示了一个名为 GEMINI.md 的代理指令文件,该文件利用了我们的最佳实践来有效使用数据工程代理:

  ### Naming Conventions

  * Datasets: [business_domain]_[use_case] (e.g., ecommerce_sales)

  * Tables:
      - Raw/External: raw_[source_name]
      - Staging: stg_[business_entity]
      - Dimension: dim_[dimension_name]
      - Fact: fct_[fact_name]

  * Dataform Folders:
      - sources
      - staging
      - marts
      - dataProducts

  * Views: vw_[view_name]

  * Columns: snake_case (e.g., order_id, customer_name)

  ## Cloud Storage data load
  * When ingesting data from Cloud Storage, create external tables.

  ## Null handling
  * Filter out null id values

  ## String normalization
  * Standardize string columns by converting to lower case

  ## Data Cleaning Guidelines
  @./generic_cleaning.md