Executar o código PySpark em notebooks do BigQuery Studio
Neste documento, mostramos como executar o código PySpark em um notebook Python do BigQuery.
Antes de começar
Se ainda não tiver feito isso, crie um Trusted Cloud by S3NS projeto e um bucket do Cloud Storage.
Configurar o projeto
-
In the Trusted Cloud console, on the project selector page, select or create a Trusted Cloud project.
-
Enable the Dataproc, BigQuery, and Cloud Storage APIs.
-
Crie um bucket do Cloud Storage no seu projeto se você não tiver um disponível.
Configurar o notebook
- Credenciais do notebook: por padrão, a sessão do notebook usa suas credenciais de usuário. Se você quiser especificar as credenciais da conta de serviço para sua sessão, ela precisa ter o papel de trabalhador do Dataproc (
roles/dataproc.worker
). Para mais informações, consulte Conta de serviço do Dataproc sem servidor. - Ambiente de execução do notebook: o notebook usa um ambiente de execução padrão do Vertex, a menos que você selecione um ambiente de execução diferente. Se você quiser definir seu próprio ambiente de execução, crie-o na página Ambientes de execução do console Trusted Cloud .
- Credenciais do notebook: por padrão, a sessão do notebook usa suas credenciais de usuário. Se você quiser especificar as credenciais da conta de serviço para sua sessão, ela precisa ter o papel de trabalhador do Dataproc (
Preços
Para mais informações sobre preços, consulte Preços do ambiente de execução de notebooks do BigQuery.
Abrir um notebook Python do BigQuery Studio
No console Trusted Cloud , acesse a página BigQuery.
Na barra de guias do painel de detalhes, clique na
seta ao lado do sinal + e clique em Notebook.
Criar uma sessão do Spark em um notebook do BigQuery Studio
Você pode usar um notebook Python do BigQuery Studio para criar uma sessão interativa do Spark Connect. Cada notebook do BigQuery Studio pode ter apenas uma sessão ativa do Dataproc sem servidor associada.
É possível criar uma sessão do Spark em um notebook Python do BigQuery Studio das seguintes maneiras:
- Configure e crie uma única sessão no notebook.
- Configure uma sessão do Spark em um modelo de sessão interativa do Dataproc sem servidor para Spark e use o modelo para configurar e criar uma sessão no notebook.
O BigQuery oferece um recurso
Query using Spark
que ajuda a começar a codificar a sessão com modelo, conforme explicado na guia Sessão de modelo do Spark.
Sessão única
Para criar uma sessão do Spark em um novo notebook, faça o seguinte:
Na barra de guias do painel do editor, clique na
seta suspensa ao lado do sinal + e clique em Notebook.Copie e execute o código abaixo em uma célula do notebook para configurar e criar uma sessão básica do Spark.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.connect.functions as f
session = Session()
# Create the Spark session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
Substitua:
- APP_NAME: um nome opcional para a sessão.
- Configurações opcionais da sessão:é possível adicionar configurações da API Dataproc
Session
para personalizar a sessão. Confira alguns exemplos:RuntimeConfig
:session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
session.runtime_config.container_image = path/to/container/image
EnvironmentConfig
:- session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
session.environment_config.execution_config.ttl = {"seconds": VALUE}
session.environment_config.execution_config.service_account = SERVICE_ACCOUNT
Sessão do Spark com modelo
É possível inserir e executar o código em uma célula de notebook para
criar uma sessão do Spark com base em um modelo de sessão do Dataproc sem servidor.
Todas as configurações de session
que você fornecer no código do notebook vão
substituir as mesmas configurações definidas no modelo de sessão.
Para começar rapidamente, use o modelo Query using Spark
para pré-preencher seu notebook com o código do modelo de sessão do Spark:
- Na barra de guias do painel do editor, clique na
- Em Começar com um modelo, clique em Consultar usando o Spark e em Usar modelo para inserir o código no seu notebook.
- Especifique as variáveis conforme explicado nas Observações.
- Você pode excluir outras células de código de exemplo inseridas no notebook.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# Configure the session with an existing session template.
session_template = "SESSION_TEMPLATE"
session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
# Create the Spark session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
- PROJECT: o ID do projeto, que está listado na seção Informações do projeto do painel doTrusted Cloud console.
- LOCATION: a região do Compute Engine em que a sessão do notebook será executada. Se não for fornecido, o local padrão será a região da VM que cria o notebook.
SESSION_TEMPLATE: o nome de um modelo de sessão interativa do Dataproc Serverless. As configurações de configuração da sessão são obtidas do modelo. O modelo também precisa especificar as seguintes configurações:
- Versão do ambiente de execução
2.3
+ Tipo de notebook:
Spark Connect
Exemplo:
- Versão do ambiente de execução
APP_NAME: um nome opcional para a sessão.
Programar e executar código PySpark no seu notebook do BigQuery Studio
Depois de criar uma sessão do Spark no notebook, use a sessão para executar o código do notebook do Spark no notebook.
Suporte à API PySpark do Spark Connect:sua sessão de notebook do Spark Connect oferece suporte à maioria das APIs PySpark, incluindo DataFrame, Functions e Column, mas não oferece suporte a SparkContext e RDD e outras APIs PySpark. Para mais informações, consulte O que é compatível com o Spark 3.5.
APIs específicas do Dataproc:o Dataproc simplifica
a adição dinâmica de pacotes PyPI
à sua
sessão do Spark, estendendo o método addArtifacts
. É possível especificar a lista no formato version-scheme
, semelhante a pip install
. Isso instrui o servidor do Spark Connect a instalar pacotes e as dependências deles em todos os nós do cluster, disponibilizando-os para os workers das UDFs.
Exemplo que instala a versão especificada de textdistance
e as bibliotecas
random2
compatíveis mais recentes no cluster para permitir que as UDFs usem textdistance
e random2
em nós de trabalho.
spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)
Ajuda de código do notebook:o notebook do BigQuery Studio oferece ajuda de código quando você mantém o ponteiro sobre um nome de classe ou método e oferece ajuda de preenchimento automático ao inserir código.
No exemplo abaixo, insira DataprocSparkSession
. e manter o
ponteiro sobre o nome da classe mostra a conclusão de código
e a ajuda da documentação.

Exemplos de PySpark no notebook do BigQuery Studio
Esta seção fornece exemplos de notebooks Python do BigQuery Studio com código PySpark para realizar as seguintes tarefas:
- Execute uma contagem de palavras em um conjunto de dados público de Shakespeare.
- Crie uma tabela do Iceberg com metadados salvos no Metastore do BigLake.
Wordcount
O exemplo do Pyspark a seguir cria uma sessão do Spark e conta as ocorrências de palavras em um conjunto de dados bigquery-public-data.samples.shakespeare
público.
# Basic wordcount example
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# Create the Spark session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
# Run a wordcount on the public Shakespeare dataset.
df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
word_counts_df.show()
Substitua:
- APP_NAME: um nome opcional para a sessão.
Saída:
A saída da célula lista um exemplo da saída de contagem de palavras. Para conferir os detalhes da sessão no console Trusted Cloud , clique no link Visualização de detalhes da sessão interativa. Para monitorar sua sessão do Spark, clique em Ver a interface do Spark na página de detalhes da sessão.

Interactive Session Detail View: LINK +------------+-----+ | word|count| +------------+-----+ | '| 42| | ''All| 1| | ''Among| 1| | ''And| 1| | ''But| 1| | ''Gamut'| 1| | ''How| 1| | ''Lo| 1| | ''Look| 1| | ''My| 1| | ''Now| 1| | ''O| 1| | ''Od's| 1| | ''The| 1| | ''Tis| 4| | ''When| 1| | ''tis| 1| | ''twas| 1| | 'A| 10| |'ARTEMIDORUS| 1| +------------+-----+ only showing top 20 rows
Tabela Iceberg
Executar o código do PySpark para criar uma tabela Iceberg com metadados do BigLake Metastore
O exemplo de código abaixo cria um sample_iceberg_table
com metadados de tabela armazenados no BigLake Metastore e consulta a tabela.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
# Create the Dataproc Serverless session.
session = Session()
# Set the session configuration for BigQuery Metastore with the Iceberg environment.
project = "PROJECT"
region = "REGION"
subnet_name = "SUBNET_NAME"
location = "LOCATION"
session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
catalog = "CATALOG_NAME"
namespace = "NAMESPACE"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
# Create the Spark Connect session.
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
# Create the namespace in BigQuery.
spark.sql(f"USE `{catalog}`;")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
spark.sql(f"USE `{namespace}`;")
# Create the Iceberg table.
spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
spark.sql("DESCRIBE sample_iceberg_table;")
# Insert table data and query the table.
spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
# Alter table, then query and display table data and schema.
spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
spark.sql("DESCRIBE sample_iceberg_table;")
df = spark.sql("SELECT * FROM sample_iceberg_table")
df.show()
df.printSchema()
Observações:
- PROJECT: o ID do projeto, que está listado na seção Informações do projeto do painel doTrusted Cloud console.
- REGION e SUBNET_NAME: especifique a região do Compute Engine e o nome de uma sub-rede na região da sessão. O Dataproc Serverless ativa o Acesso particular do Google (PGA, na sigla em inglês) na sub-rede especificada.
- LOCATION: o
BigQuery_metastore_config.location
e ospark.sql.catalog.{catalog}.gcp_location
padrão sãoUS
, mas você pode escolher qualquer local do BigQuery com suporte. - BUCKET e WAREHOUSE_DIRECTORY: o bucket e a pasta do Cloud Storage usados para o diretório do repositório do Iceberg.
- CATALOG_NAME e NAMESPACE: o nome do catálogo e o namespace do Iceberg se combinam para identificar a tabela Iceberg (
catalog.namespace.table_name
). - APP_NAME: um nome opcional para a sessão.
A saída da célula lista o sample_iceberg_table
com a coluna adicionada e mostra
um link para a página Detalhes da sessão interativa no console Trusted Cloud .
Clique em Ver a interface do Spark na página de detalhes da sessão para monitorar sua sessão do Spark.

Interactive Session Detail View: LINK +---+---------+------------+ | id| data|newDoubleCol| +---+---------+------------+ | 1|first row| NULL| +---+---------+------------+ root |-- id: integer (nullable = true) |-- data: string (nullable = true) |-- newDoubleCol: double (nullable = true)
Conferir os detalhes da tabela no BigQuery
Siga estas etapas para verificar os detalhes da tabela Iceberg no BigQuery:
No console Trusted Cloud , acesse a página BigQuery.
No painel de recursos do projeto, clique no seu projeto e, em seguida, no seu namespace para listar a tabela
sample_iceberg_table
. Clique na tabela Detalhes para conferir as informações da Configuração da tabela do catálogo aberto.Os formatos de entrada e saída são os formatos padrão da classe
InputFormat
eOutputFormat
do Hadoop que o Iceberg usa.
Outros exemplos
Crie um DataFrame
do Spark (sdf
) a partir de um DataFrame do Pandas (df
).
sdf = spark.createDataFrame(df)
sdf.show()
Execute agregações no DataFrames
do Spark.
from pyspark.sql import functions as F
sdf.groupby("segment").agg(
F.mean("total_spend_per_user").alias("avg_order_value"),
F.approx_count_distinct("user_id").alias("unique_customers")
).show()
Leia do BigQuery usando o conector Spark-BigQuery.
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","my-bigquery-dataset")
sdf = spark.read.format('bigquery') \
.load(query)
Programar código Spark com o Gemini Code Assist
Você pode pedir ao Gemini Code Assist para gerar o código do PySpark no seu notebook. O Gemini Code Assist busca e usa tabelas relevantes do BigQuery e do Dataproc Metastore e os esquemas delas para gerar uma resposta de código.
Para gerar o código do Gemini Code Assist no seu notebook, faça o seguinte:
Insira uma nova célula de código clicando em + Código na barra de ferramentas. A nova célula de código mostra
Start coding or generate with AI
. Clique em Gerar.No editor "Generate", insira um comando de linguagem natural e clique em
enter
. Inclua a palavra-chavespark
oupyspark
no comando.Exemplo de comando:
create a spark dataframe from order_items and filter to orders created in 2024
Exemplo de resposta:
spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items") df = spark.sql("SELECT * FROM order_items")
Dicas para a geração de código do Gemini Code Assist
Para permitir que o Gemini Code Assist busque tabelas e esquemas relevantes, ative a Sincronização do Data Catalog para instâncias do Dataproc Metastore.
Verifique se a conta de usuário tem acesso ao Data Catalog e às tabelas de consulta. Para fazer isso, atribua o papel
DataCatalog.Viewer
.
Encerrar a sessão do Spark
Você pode realizar qualquer uma das seguintes ações para interromper a sessão do Spark Connect no seu notebook do BigQuery Studio:
- Execute
spark.stop()
em uma célula do notebook. - Encerre o ambiente de execução no notebook:
- Clique no seletor de ambiente de execução e em Gerenciar sessões.
- Na caixa de diálogo Sessões ativas, clique no ícone de encerramento e, em seguida,
clique em Encerrar.
- Clique no seletor de ambiente de execução e em Gerenciar sessões.
Orquestrar o código do notebook do BigQuery Studio
É possível orquestrar o código do notebook do BigQuery Studio das seguintes maneiras:
Programe o código do notebook no console Trusted Cloud . O preço do notebook se aplica.
Execute o código do notebook como uma carga de trabalho em lote do Dataproc sem servidor. Os preços do Dataproc sem servidor se aplicam.
Programar o código do notebook no console do Trusted Cloud
É possível programar o código de um notebook das seguintes maneiras:
- Programe o notebook.
- Se a execução de código do notebook fizer parte de um fluxo de trabalho, programe o notebook como parte de um pipeline.
Executar o código do notebook como uma carga de trabalho em lote do Dataproc Serverless
Siga as etapas abaixo para executar o código do notebook do BigQuery Studio como uma carga de trabalho em lote do Dataproc sem servidor.
Faça o download do código do notebook em um arquivo em um terminal local ou no Cloud Shell.
Abra o notebook no painel Explorer na página BigQuery Studio no console Trusted Cloud .
Faça o download do código do notebook selecionando Download no menu File e depois
Download .py
.
Gerar
requirements.txt
.- Instale o
pipreqs
no diretório em que você salvou o arquivo.py
.pip install pipreqs
Execute
pipreqs
para gerarrequirements.txt
.pipreqs filename.py
Use a ferramenta
gsutil
para copiar o arquivorequirements.txt
local para um bucket no Cloud Storage.gsutil cp requirements.txt gs://BUCKET/
- Instale o
Atualize o código da sessão do Spark editando o arquivo
.py
salvo.Remova ou comente todos os comandos do script de shell.
Remova o código que configura a sessão do Spark e, em seguida, especifique os parâmetros de configuração como parâmetros de envio de carga de trabalho em lote. Consulte Enviar uma carga de trabalho em lote do Spark.
Exemplo:
Remova a seguinte linha de configuração da sub-rede de sessão do código:
session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
Ao executar a carga de trabalho em lote, use a flag
--subnet
para especificar a sub-rede.gcloud dataproc batches submit pyspark \ --subnet=SUBNET_NAME
Use um snippet de código simples para criar sessões.
Exemplo de código de notebook transferido antes da simplificação.
from google.cloud.dataproc_spark_connect import DataprocSparkSession from google.cloud.dataproc_v1 import Session
session = Session() spark = DataprocSparkSession \ .builder \ .appName("CustomSparkSession") .dataprocSessionConfig(session) \ .getOrCreate()
Código de carga de trabalho em lote após a simplificação.
from pyspark.sql import SparkSession
spark = SparkSession \ .builder \ .getOrCreate()
Execute a carga de trabalho em lote.
Consulte Enviar a carga de trabalho em lote do Spark para ver instruções.
Inclua a flag --deps-bucket para apontar para o bucket do Cloud Storage que contém seu arquivo
requirements.txt
.Exemplo:
gcloud dataproc batches submit pyspark FILENAME.py \ --region=REGION \ --deps-bucket=BUCKET \ --version=2.3
Observações:
- FILENAME: o nome do arquivo de código do notebook transferido por download e editado.
- REGION: a região do Compute Engine em que o cluster está localizado.
- BUCKET O nome do bucket do Cloud Storage
que contém o arquivo
requirements.txt
. --version
: a versão 2.3 do ambiente de execução do Spark é selecionada para executar a carga de trabalho em lote.
Confirme seu código.
- Depois de testar o código da carga de trabalho em lote, você pode confirmar o arquivo
.ipynb
ou.py
no repositório usando o clientegit
, como GitHub, GitLab ou Bitbucket, como parte do pipeline de CI/CD.
- Depois de testar o código da carga de trabalho em lote, você pode confirmar o arquivo
Programe sua carga de trabalho em lote com o Cloud Composer.
- Consulte Executar cargas de trabalho do Dataproc sem servidor com o Cloud Composer para ver instruções.
Resolver problemas de notebooks
Se ocorrer uma falha em uma célula que contém o código do Spark, você pode resolver o erro clicando no link Visualização de detalhes da sessão interativa na saída da célula. Consulte os exemplos de tabelas Wordcount e Iceberg.
Problemas conhecidos e soluções
Erro: um ambiente de execução de notebooks
criado com a versão 3.10
do Python pode causar um erro PYTHON_VERSION_MISMATCH
ao tentar se conectar à sessão do Spark.
Solução: recrie o ambiente de execução com a versão 3.11
do Python.
A seguir
- Demonstração em vídeo do YouTube: Liberando o poder do Apache Spark integrado ao BigQuery.
- Usar o metastore do BigLake com o Dataproc
- Usar o metastore do BigLake com o Dataproc Serverless