Executar o código PySpark em notebooks do BigQuery Studio

Neste documento, mostramos como executar o código PySpark em um notebook Python do BigQuery.

Antes de começar

Se ainda não tiver feito isso, crie um Trusted Cloud by S3NS projeto e um bucket do Cloud Storage.

  1. Configurar o projeto

    1. In the Trusted Cloud console, on the project selector page, select or create a Trusted Cloud project.

      Go to project selector

    2. Enable the Dataproc, BigQuery, and Cloud Storage APIs.

      Enable the APIs

  2. Crie um bucket do Cloud Storage no seu projeto se você não tiver um disponível.

  3. Configurar o notebook

    1. Credenciais do notebook: por padrão, a sessão do notebook usa suas credenciais de usuário. Se você quiser especificar as credenciais da conta de serviço para sua sessão, ela precisa ter o papel de trabalhador do Dataproc (roles/dataproc.worker). Para mais informações, consulte Conta de serviço do Dataproc sem servidor.
    2. Ambiente de execução do notebook: o notebook usa um ambiente de execução padrão do Vertex, a menos que você selecione um ambiente de execução diferente. Se você quiser definir seu próprio ambiente de execução, crie-o na página Ambientes de execução do console Trusted Cloud .

Preços

Para mais informações sobre preços, consulte Preços do ambiente de execução de notebooks do BigQuery.

Abrir um notebook Python do BigQuery Studio

  1. No console Trusted Cloud , acesse a página BigQuery.

    Acessar o BigQuery

  2. Na barra de guias do painel de detalhes, clique na seta ao lado do sinal + e clique em Notebook.

Criar uma sessão do Spark em um notebook do BigQuery Studio

Você pode usar um notebook Python do BigQuery Studio para criar uma sessão interativa do Spark Connect. Cada notebook do BigQuery Studio pode ter apenas uma sessão ativa do Dataproc sem servidor associada.

É possível criar uma sessão do Spark em um notebook Python do BigQuery Studio das seguintes maneiras:

  • Configure e crie uma única sessão no notebook.
  • Configure uma sessão do Spark em um modelo de sessão interativa do Dataproc sem servidor para Spark e use o modelo para configurar e criar uma sessão no notebook. O BigQuery oferece um recurso Query using Spark que ajuda a começar a codificar a sessão com modelo, conforme explicado na guia Sessão de modelo do Spark.

Sessão única

Para criar uma sessão do Spark em um novo notebook, faça o seguinte:

  1. Na barra de guias do painel do editor, clique na seta suspensa ao lado do sinal + e clique em Notebook.

    Captura de tela mostrando a interface do BigQuery com o botão "+" para criar um novo notebook.
  2. Copie e execute o código abaixo em uma célula do notebook para configurar e criar uma sessão básica do Spark.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session

import pyspark.sql.connect.functions as f

session = Session()

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)

Substitua:

  • APP_NAME: um nome opcional para a sessão.
  • Configurações opcionais da sessão:é possível adicionar configurações da API Dataproc Session para personalizar a sessão. Confira alguns exemplos:
    • RuntimeConfig:
      Ajuda do código mostrando as opções de session.runtime.config.
      • session.runtime_config.properties={spark.property.key1:VALUE_1,...,spark.property.keyN:VALUE_N}
      • session.runtime_config.container_image = path/to/container/image
    • EnvironmentConfig:
      Ajuda do código mostrando as opções de configuração de execução de configuração de ambiente de sessão.
      • session.environment_config.execution_config.subnetwork_uri = "SUBNET_NAME"
      • session.environment_config.execution_config.ttl = {"seconds": VALUE}
      • session.environment_config.execution_config.service_account = SERVICE_ACCOUNT

Sessão do Spark com modelo

É possível inserir e executar o código em uma célula de notebook para criar uma sessão do Spark com base em um modelo de sessão do Dataproc sem servidor. Todas as configurações de session que você fornecer no código do notebook vão substituir as mesmas configurações definidas no modelo de sessão.

Para começar rapidamente, use o modelo Query using Spark para pré-preencher seu notebook com o código do modelo de sessão do Spark:

  1. Na barra de guias do painel do editor, clique na seta suspensa ao lado do sinal + e clique em Notebook.
    Captura de tela mostrando a interface do BigQuery com o botão "+" para criar um novo notebook.
  2. Em Começar com um modelo, clique em Consultar usando o Spark e em Usar modelo para inserir o código no seu notebook.
    Seleções da interface do BigQuery para começar com um modelo
  3. Especifique as variáveis conforme explicado nas Observações.
  4. Você pode excluir outras células de código de exemplo inseridas no notebook.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# Configure the session with an existing session template.
session_template = "SESSION_TEMPLATE"
session.session_template = f"projects/{project}/locations/{location}/sessionTemplates/{session_template}"
# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)

Substitua:

  • PROJECT: o ID do projeto, que está listado na seção Informações do projeto do painel doTrusted Cloud console.
  • LOCATION: a região do Compute Engine em que a sessão do notebook será executada. Se não for fornecido, o local padrão será a região da VM que cria o notebook.
  • SESSION_TEMPLATE: o nome de um modelo de sessão interativa do Dataproc Serverless. As configurações de configuração da sessão são obtidas do modelo. O modelo também precisa especificar as seguintes configurações:

    • Versão do ambiente de execução 2.3+
    • Tipo de notebook: Spark Connect

      Exemplo:

      Captura de tela mostrando as configurações necessárias do Spark Connect.
  • APP_NAME: um nome opcional para a sessão.

Programar e executar código PySpark no seu notebook do BigQuery Studio

Depois de criar uma sessão do Spark no notebook, use a sessão para executar o código do notebook do Spark no notebook.

Suporte à API PySpark do Spark Connect:sua sessão de notebook do Spark Connect oferece suporte à maioria das APIs PySpark, incluindo DataFrame, Functions e Column, mas não oferece suporte a SparkContext e RDD e outras APIs PySpark. Para mais informações, consulte O que é compatível com o Spark 3.5.

APIs específicas do Dataproc:o Dataproc simplifica a adição dinâmica de pacotes PyPI à sua sessão do Spark, estendendo o método addArtifacts. É possível especificar a lista no formato version-scheme, semelhante a pip install. Isso instrui o servidor do Spark Connect a instalar pacotes e as dependências deles em todos os nós do cluster, disponibilizando-os para os workers das UDFs.

Exemplo que instala a versão especificada de textdistance e as bibliotecas random2 compatíveis mais recentes no cluster para permitir que as UDFs usem textdistance e random2 em nós de trabalho.

spark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)

Ajuda de código do notebook:o notebook do BigQuery Studio oferece ajuda de código quando você mantém o ponteiro sobre um nome de classe ou método e oferece ajuda de preenchimento automático ao inserir código.

No exemplo abaixo, insira DataprocSparkSession. e manter o ponteiro sobre o nome da classe mostra a conclusão de código e a ajuda da documentação.

Exemplos de documentação e dicas de preenchimento de código.

Exemplos de PySpark no notebook do BigQuery Studio

Esta seção fornece exemplos de notebooks Python do BigQuery Studio com código PySpark para realizar as seguintes tarefas:

  • Execute uma contagem de palavras em um conjunto de dados público de Shakespeare.
  • Crie uma tabela do Iceberg com metadados salvos no Metastore do BigLake.

Wordcount

O exemplo do Pyspark a seguir cria uma sessão do Spark e conta as ocorrências de palavras em um conjunto de dados bigquery-public-data.samples.shakespeare público.

# Basic wordcount example
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)
# Run a wordcount on the public Shakespeare dataset.
df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
words_df = df.select(f.explode(f.split(f.col("word"), " ")).alias("word"))
word_counts_df = words_df.filter(f.col("word") != "").groupBy("word").agg(f.count("*").alias("count")).orderBy("word")
word_counts_df.show()

Substitua:

  • APP_NAME: um nome opcional para a sessão.

Saída:

A saída da célula lista um exemplo da saída de contagem de palavras. Para conferir os detalhes da sessão no console Trusted Cloud , clique no link Visualização de detalhes da sessão interativa. Para monitorar sua sessão do Spark, clique em Ver a interface do Spark na página de detalhes da sessão.

Botão "View Spark UI" na página de detalhes da sessão no console
Interactive Session Detail View: LINK
+------------+-----+
|        word|count|
+------------+-----+
|           '|   42|
|       ''All|    1|
|     ''Among|    1|
|       ''And|    1|
|       ''But|    1|
|    ''Gamut'|    1|
|       ''How|    1|
|        ''Lo|    1|
|      ''Look|    1|
|        ''My|    1|
|       ''Now|    1|
|         ''O|    1|
|      ''Od's|    1|
|       ''The|    1|
|       ''Tis|    4|
|      ''When|    1|
|       ''tis|    1|
|      ''twas|    1|
|          'A|   10|
|'ARTEMIDORUS|    1|
+------------+-----+
only showing top 20 rows

Tabela Iceberg

Executar o código do PySpark para criar uma tabela Iceberg com metadados do BigLake Metastore

O exemplo de código abaixo cria um sample_iceberg_table com metadados de tabela armazenados no BigLake Metastore e consulta a tabela.

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
# Create the Dataproc Serverless session.
session = Session()
# Set the session configuration for BigQuery Metastore with the Iceberg environment.
project = "PROJECT"
region = "REGION"
subnet_name = "SUBNET_NAME"
location = "LOCATION"
session.environment_config.execution_config.subnetwork_uri = f"{subnet_name}"
warehouse_dir = "gs://BUCKET/WAREHOUSE_DIRECTORY"
catalog = "CATALOG_NAME"
namespace = "NAMESPACE"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}"] = "org.apache.iceberg.spark.SparkCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.catalog-impl"] = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_project"] = f"{project_id}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.gcp_location"] = f"{location}"
session.runtime_config.properties[f"spark.sql.catalog.{catalog}.warehouse"] = f"{warehouse_dir}"
# Create the Spark Connect session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate()
)
# Create the namespace in BigQuery.
spark.sql(f"USE `{catalog}`;")
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS `{namespace}`;")
spark.sql(f"USE `{namespace}`;")
# Create the Iceberg table.
spark.sql("DROP TABLE IF EXISTS `sample_iceberg_table`");
spark.sql("CREATE TABLE sample_iceberg_table (id int, data string) USING ICEBERG;")
spark.sql("DESCRIBE sample_iceberg_table;")
# Insert table data and query the table.
spark.sql("INSERT INTO sample_iceberg_table VALUES (1, \"first row\");")
# Alter table, then query and display table data and schema.
spark.sql("ALTER TABLE sample_iceberg_table ADD COLUMNS (newDoubleCol double);")
spark.sql("DESCRIBE sample_iceberg_table;")
df = spark.sql("SELECT * FROM sample_iceberg_table")
df.show()
df.printSchema()

Observações:

  • PROJECT: o ID do projeto, que está listado na seção Informações do projeto do painel doTrusted Cloud console.
  • REGION e SUBNET_NAME: especifique a região do Compute Engine e o nome de uma sub-rede na região da sessão. O Dataproc Serverless ativa o Acesso particular do Google (PGA, na sigla em inglês) na sub-rede especificada.
  • LOCATION: o BigQuery_metastore_config.location e o spark.sql.catalog.{catalog}.gcp_location padrão são US, mas você pode escolher qualquer local do BigQuery com suporte.
  • BUCKET e WAREHOUSE_DIRECTORY: o bucket e a pasta do Cloud Storage usados para o diretório do repositório do Iceberg.
  • CATALOG_NAME e NAMESPACE: o nome do catálogo e o namespace do Iceberg se combinam para identificar a tabela Iceberg (catalog.namespace.table_name).
  • APP_NAME: um nome opcional para a sessão.

A saída da célula lista o sample_iceberg_table com a coluna adicionada e mostra um link para a página Detalhes da sessão interativa no console Trusted Cloud . Clique em Ver a interface do Spark na página de detalhes da sessão para monitorar sua sessão do Spark.

Interactive Session Detail View: LINK
+---+---------+------------+
| id|     data|newDoubleCol|
+---+---------+------------+
|  1|first row|        NULL|
+---+---------+------------+

root
 |-- id: integer (nullable = true)
 |-- data: string (nullable = true)
 |-- newDoubleCol: double (nullable = true)

Conferir os detalhes da tabela no BigQuery

Siga estas etapas para verificar os detalhes da tabela Iceberg no BigQuery:

  1. No console Trusted Cloud , acesse a página BigQuery.

    Acessar o BigQuery

  2. No painel de recursos do projeto, clique no seu projeto e, em seguida, no seu namespace para listar a tabela sample_iceberg_table. Clique na tabela Detalhes para conferir as informações da Configuração da tabela do catálogo aberto.

    Os formatos de entrada e saída são os formatos padrão da classe InputFormat e OutputFormat do Hadoop que o Iceberg usa.

    Metadados da tabela Iceberg listados na interface do BigQuery

Outros exemplos

Crie um DataFrame do Spark (sdf) a partir de um DataFrame do Pandas (df).

sdf = spark.createDataFrame(df)
sdf.show()

Execute agregações no DataFrames do Spark.

from pyspark.sql import functions as F

sdf.groupby("segment").agg(
   F.mean("total_spend_per_user").alias("avg_order_value"),
   F.approx_count_distinct("user_id").alias("unique_customers")
).show()

Leia do BigQuery usando o conector Spark-BigQuery.

spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","my-bigquery-dataset")

sdf = spark.read.format('bigquery') \
 .load(query)

Programar código Spark com o Gemini Code Assist

Você pode pedir ao Gemini Code Assist para gerar o código do PySpark no seu notebook. O Gemini Code Assist busca e usa tabelas relevantes do BigQuery e do Dataproc Metastore e os esquemas delas para gerar uma resposta de código.

Para gerar o código do Gemini Code Assist no seu notebook, faça o seguinte:

  1. Insira uma nova célula de código clicando em + Código na barra de ferramentas. A nova célula de código mostra Start coding or generate with AI. Clique em Gerar.

  2. No editor "Generate", insira um comando de linguagem natural e clique em enter. Inclua a palavra-chave spark ou pyspark no comando.

    Exemplo de comando:

    create a spark dataframe from order_items and filter to orders created in 2024
    

    Exemplo de resposta:

    spark.read.format("bigquery").option("table", "sqlgen-testing.pysparkeval_ecommerce.order_items").load().filter("year(created_at) = 2024").createOrReplaceTempView("order_items")
    df = spark.sql("SELECT * FROM order_items")
    

Dicas para a geração de código do Gemini Code Assist

  • Para permitir que o Gemini Code Assist busque tabelas e esquemas relevantes, ative a Sincronização do Data Catalog para instâncias do Dataproc Metastore.

  • Verifique se a conta de usuário tem acesso ao Data Catalog e às tabelas de consulta. Para fazer isso, atribua o papel DataCatalog.Viewer.

Encerrar a sessão do Spark

Você pode realizar qualquer uma das seguintes ações para interromper a sessão do Spark Connect no seu notebook do BigQuery Studio:

  • Execute spark.stop() em uma célula do notebook.
  • Encerre o ambiente de execução no notebook:
    1. Clique no seletor de ambiente de execução e em Gerenciar sessões.
      Gerenciar a seleção de sessões
    2. Na caixa de diálogo Sessões ativas, clique no ícone de encerramento e, em seguida, clique em Encerrar.
      Encerrar a seleção de sessão na caixa de diálogo "Sessões ativas"

Orquestrar o código do notebook do BigQuery Studio

É possível orquestrar o código do notebook do BigQuery Studio das seguintes maneiras:

Programar o código do notebook no console do Trusted Cloud

É possível programar o código de um notebook das seguintes maneiras:

  • Programe o notebook.
  • Se a execução de código do notebook fizer parte de um fluxo de trabalho, programe o notebook como parte de um pipeline.

Executar o código do notebook como uma carga de trabalho em lote do Dataproc Serverless

Siga as etapas abaixo para executar o código do notebook do BigQuery Studio como uma carga de trabalho em lote do Dataproc sem servidor.

  1. Faça o download do código do notebook em um arquivo em um terminal local ou no Cloud Shell.

    1. Abra o notebook no painel Explorer na página BigQuery Studio no console Trusted Cloud .

    2. Faça o download do código do notebook selecionando Download no menu File e depois Download .py.

      Menu "File" > "Download" na página do Explorer.
  2. Gerar requirements.txt.

    1. Instale o pipreqs no diretório em que você salvou o arquivo .py.
      pip install pipreqs
      
    2. Execute pipreqs para gerar requirements.txt.

      pipreqs filename.py
      

    3. Use a ferramenta gsutil para copiar o arquivo requirements.txt local para um bucket no Cloud Storage.

      gsutil cp requirements.txt gs://BUCKET/
      
  3. Atualize o código da sessão do Spark editando o arquivo .py salvo.

    1. Remova ou comente todos os comandos do script de shell.

    2. Remova o código que configura a sessão do Spark e, em seguida, especifique os parâmetros de configuração como parâmetros de envio de carga de trabalho em lote. Consulte Enviar uma carga de trabalho em lote do Spark.

      Exemplo:

      • Remova a seguinte linha de configuração da sub-rede de sessão do código:

        session.environment_config.execution_config.subnetwork_uri = "{subnet_name}"
        

      • Ao executar a carga de trabalho em lote, use a flag --subnet para especificar a sub-rede.

        gcloud dataproc batches submit pyspark \
        --subnet=SUBNET_NAME
        
    3. Use um snippet de código simples para criar sessões.

      • Exemplo de código de notebook transferido antes da simplificação.

        from google.cloud.dataproc_spark_connect import DataprocSparkSession
        from google.cloud.dataproc_v1 import Session
        

        session = Session() spark = DataprocSparkSession \     .builder \     .appName("CustomSparkSession")     .dataprocSessionConfig(session) \     .getOrCreate()

      • Código de carga de trabalho em lote após a simplificação.

        from pyspark.sql import SparkSession
        

        spark = SparkSession \ .builder \ .getOrCreate()

  4. Execute a carga de trabalho em lote.

    1. Consulte Enviar a carga de trabalho em lote do Spark para ver instruções.

      • Inclua a flag --deps-bucket para apontar para o bucket do Cloud Storage que contém seu arquivo requirements.txt.

        Exemplo:

      gcloud dataproc batches submit pyspark FILENAME.py \
          --region=REGION \
          --deps-bucket=BUCKET \
          --version=2.3 
      

      Observações:

      • FILENAME: o nome do arquivo de código do notebook transferido por download e editado.
      • REGION: a região do Compute Engine em que o cluster está localizado.
      • BUCKET O nome do bucket do Cloud Storage que contém o arquivo requirements.txt.
      • --version: a versão 2.3 do ambiente de execução do Spark é selecionada para executar a carga de trabalho em lote.
  5. Confirme seu código.

    1. Depois de testar o código da carga de trabalho em lote, você pode confirmar o arquivo .ipynb ou .py no repositório usando o cliente git, como GitHub, GitLab ou Bitbucket, como parte do pipeline de CI/CD.
  6. Programe sua carga de trabalho em lote com o Cloud Composer.

    1. Consulte Executar cargas de trabalho do Dataproc sem servidor com o Cloud Composer para ver instruções.

Resolver problemas de notebooks

Se ocorrer uma falha em uma célula que contém o código do Spark, você pode resolver o erro clicando no link Visualização de detalhes da sessão interativa na saída da célula. Consulte os exemplos de tabelas Wordcount e Iceberg.

Problemas conhecidos e soluções

Erro: um ambiente de execução de notebooks criado com a versão 3.10 do Python pode causar um erro PYTHON_VERSION_MISMATCH ao tentar se conectar à sessão do Spark.

Solução: recrie o ambiente de execução com a versão 3.11 do Python.

A seguir