Membuat kueri berkelanjutan
Dokumen ini menjelaskan cara menjalankan kueri berkelanjutan di BigQuery.
Kueri berkelanjutan BigQuery adalah pernyataan SQL yang berjalan secara berkelanjutan. Kueri berkelanjutan memungkinkan Anda menganalisis data yang masuk di BigQuery secara real time, lalu mengekspor hasilnya ke Bigtable, Pub/Sub, atau Spanner, atau menulis hasilnya ke tabel BigQuery.
Pilih jenis akun
Anda dapat membuat dan menjalankan tugas kueri berkelanjutan menggunakan akun pengguna, atau Anda dapat membuat tugas kueri berkelanjutan menggunakan akun pengguna, lalu menjalankannya menggunakan akun layanan. Anda harus menggunakan akun layanan untuk menjalankan kueri berkelanjutan yang mengekspor hasil ke topik Pub/Sub.
Saat Anda menggunakan akun pengguna, kueri berkelanjutan berjalan hingga dua hari. Saat Anda menggunakan akun layanan, kueri berkelanjutan akan berjalan hingga 150 hari. Untuk mengetahui informasi selengkapnya, lihat Otorisasi.
Izin yang diperlukan
Bagian ini menjelaskan izin yang Anda perlukan untuk membuat dan menjalankan kueri berkelanjutan. Sebagai alternatif untuk peran Identity and Access Management (IAM) yang disebutkan, Anda bisa mendapatkan izin yang diperlukan melalui peran kustom.
Izin saat menggunakan akun pengguna
Bagian ini memberikan informasi tentang peran dan izin yang diperlukan untuk membuat dan menjalankan kueri berkelanjutan menggunakan akun pengguna.
Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create
. Setiap peran IAM berikut memberikan izin bigquery.jobs.create
:
- Pengguna BigQuery (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - Admin BigQuery (
roles/bigquery.admin
)
Untuk mengekspor data dari tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.export
. Setiap peran IAM berikut memberikan izin bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - Admin BigQuery (
roles/bigquery.admin
)
Untuk memperbarui data dalam tabel BigQuery, akun pengguna harus memiliki izin IAM bigquery.tables.updateData
. Setiap peran IAM berikut memberikan izin bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - Admin BigQuery (
roles/bigquery.admin
)
Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kasus penggunaan kueri berkelanjutan, akun pengguna harus memiliki peran Service Usage Admin (roles/serviceusage.serviceUsageAdmin
).
Izin saat menggunakan akun layanan
Bagian ini memberikan informasi tentang peran dan izin yang diperlukan oleh akun pengguna yang membuat kueri berkelanjutan, dan akun layanan yang menjalankan kueri berkelanjutan.
Izin akun pengguna
Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create
. Setiap peran IAM berikut memberikan izin bigquery.jobs.create
:
- Pengguna BigQuery (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - Admin BigQuery (
roles/bigquery.admin
)
Untuk mengirimkan tugas yang berjalan menggunakan akun layanan, akun pengguna harus memiliki peran
Service Account User (roles/iam.serviceAccountUser
). Jika Anda menggunakan akun pengguna yang sama untuk membuat akun layanan, maka akun pengguna tersebut harus memiliki peran
Service Account Admin (roles/iam.serviceAccountAdmin
). Untuk mengetahui informasi tentang cara membatasi akses pengguna ke satu akun layanan, bukan ke semua akun layanan dalam project, lihat Memberikan satu peran.
Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kasus penggunaan kueri berkelanjutan, akun pengguna harus memiliki peran Service Usage Admin (roles/serviceusage.serviceUsageAdmin
).
Izin akun layanan
Untuk mengekspor data dari tabel BigQuery, akun layanan harus memiliki izin IAM bigquery.tables.export
. Setiap peran IAM berikut memberikan izin bigquery.tables.export
:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - Admin BigQuery (
roles/bigquery.admin
)
bigquery.tables.updateData
. Setiap peran IAM berikut memberikan izin bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - Admin BigQuery (
roles/bigquery.admin
)
Sebelum memulai
-
In the Trusted Cloud console, on the project selector page, select or create a Trusted Cloud project.
-
Verify that billing is enabled for your Trusted Cloud project.
-
Enable the BigQuery API.
Membuat pemesanan
Buat pemesanan edisi Enterprise atau Enterprise Plus, lalu
buat penetapan pemesanan
dengan jenis tugas CONTINUOUS
. Reservasi ini dapat menggunakan
penskalaan otomatis.
Ada
batasan pemesanan
yang berlaku untuk penetapan pemesanan untuk kueri berkelanjutan.
Ekspor ke Pub/Sub
API tambahan, izin IAM, dan resource Trusted Cloud by S3NS diperlukan untuk mengekspor data ke Pub/Sub. Untuk mengetahui informasi selengkapnya, lihat artikel Mengekspor ke Pub/Sub.
Menyematkan atribut kustom sebagai metadata dalam pesan Pub/Sub
Anda dapat menggunakan atribut Pub/Sub untuk memberikan informasi tambahan tentang pesan, seperti prioritas, asal, tujuan, atau metadata tambahan. Anda juga dapat menggunakan atribut untuk memfilter pesan pada langganan.
Dalam hasil kueri berkelanjutan, jika kolom diberi nama _ATTRIBUTES
, maka nilainya akan disalin ke atribut pesan Pub/Sub.
Kolom yang disediakan dalam _ATTRIBUTES
digunakan sebagai kunci atribut.
Kolom _ATTRIBUTES
harus berjenis JSON
, dalam format
ARRAY<STRUCT<STRING, STRING>>
atau STRUCT<STRING>
.
Untuk melihat contohnya, lihat mengekspor data ke topik Pub/Sub.
Mengekspor ke Bigtable
API, izin IAM, dan resource Trusted Cloud tambahan diperlukan untuk mengekspor data ke Bigtable. Untuk mengetahui informasi selengkapnya, lihat Mengekspor ke Bigtable.
Mengekspor ke Spanner
API, izin IAM, dan Trusted Cloud resource tambahan diperlukan untuk mengekspor data ke Spanner. Untuk mengetahui informasi selengkapnya, lihat Mengekspor ke Spanner (ETL terbalik).
Menulis data ke tabel BigQuery
Anda dapat menulis data ke tabel BigQuery menggunakan
pernyataan INSERT
.
Menggunakan fungsi AI
API, izin IAM, dan Trusted Cloud resource tambahan diperlukan untuk menggunakan fungsi AI yang didukung dalam kueri berkelanjutan. Untuk mengetahui informasi selengkapnya, lihat salah satu topik berikut, berdasarkan kasus penggunaan Anda:
- Membuat teks menggunakan fungsi
ML.GENERATE_TEXT
- Membuat embedding teks menggunakan fungsi
ML.GENERATE_EMBEDDING
- Memahami teks dengan fungsi
ML.UNDERSTAND_TEXT
- Menerjemahkan teks dengan fungsi
ML.TRANSLATE
Saat Anda menggunakan fungsi AI dalam kueri berkelanjutan, pertimbangkan apakah output kueri akan tetap berada dalam kuota untuk fungsi tersebut. Jika melebihi kuota, Anda mungkin harus menangani secara terpisah data yang tidak diproses.
Menentukan titik awal
Anda harus menggunakan fungsi APPENDS
dalam klausa FROM
dari kueri berkelanjutan untuk menentukan data paling awal yang akan
diproses. Misalnya,
APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
memberi tahu
BigQuery untuk memproses data yang ditambahkan ke tabel my_table
paling lama 10 menit sebelum dimulainya kueri berkelanjutan.
Data yang ditambahkan ke my_table
diproses saat masuk. Tidak ada
keterlambatan yang diberlakukan pada pemrosesan data.
Jangan berikan argumen end_timestamp
ke fungsi APPENDS
saat Anda menggunakannya dalam kueri berkelanjutan.
Contoh berikut menunjukkan cara memulai kueri berkelanjutan dari titik waktu tertentu menggunakan fungsi APPENDS
, saat membuat kueri tabel BigQuery yang menerima informasi perjalanan taksi streaming:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
Tentukan titik awal yang lebih awal dari waktu saat ini
Jika ingin memproses data dari sebelum titik waktu saat ini, Anda dapat menggunakan
fungsi APPENDS
untuk menentukan titik awal yang lebih awal untuk kueri. Titik awal yang Anda tentukan harus berada dalam periode perjalanan waktu untuk tabel yang Anda pilih. Periode perjalanan waktu mencakup tujuh hari terakhir secara default.
Untuk menyertakan data yang berada di luar periode perjalanan waktu, gunakan kueri standar untuk menyisipkan atau mengekspor data hingga waktu tertentu, lalu mulai kueri berkelanjutan dari waktu tersebut.
Contoh
Contoh berikut menunjukkan cara memuat data lama dari tabel BigQuery yang menerima informasi perjalanan taksi streaming hingga titik waktu tertentu ke dalam tabel, lalu memulai kueri berkelanjutan dari titik batas untuk data lama.
Jalankan kueri standar untuk mengisi ulang data hingga titik waktu tertentu:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this point in time. -- This timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
Menjalankan kueri berkelanjutan dari titik waktu saat kueri berhenti:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
Menjalankan kueri berkelanjutan menggunakan akun pengguna
Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun pengguna. Setelah kueri berkelanjutan berjalan, Anda dapat menutup Trusted Cloud konsol, jendela terminal, atau aplikasi tanpa mengganggu eksekusi kueri. Kueri berkelanjutan yang dijalankan oleh akun pengguna berjalan maksimal selama dua hari, lalu berhenti secara otomatis. Untuk terus memproses data masuk baru, mulai kueri berkelanjutan baru dan tentukan titik awal. Untuk mengotomatiskan proses ini, lihat coba lagi kueri yang gagal.
Ikuti langkah-langkah berikut untuk menjalankan kueri berkelanjutan:
Konsol
Di Trusted Cloud konsol, buka halaman BigQuery.
Di editor kueri, klik
More.- Di bagian Pilih mode kueri, pilih Kueri berkelanjutan.
- Klik Konfirmasi.
- Opsional: Untuk mengontrol durasi kueri berjalan, klik Setelan kueri dan tetapkan Waktu tunggu tugas dalam milidetik.
Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Klik Run.
bq
-
In the Trusted Cloud console, activate Cloud Shell.
Di Cloud Shell, jalankan kueri berkelanjutan menggunakan perintah
bq query
dengan flag--continuous
:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Ganti
QUERY
dengan pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung. Anda dapat mengontrol durasi kueri berjalan menggunakan flag--job_timeout_ms
.PROJECT_ID
: project ID Anda.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
API
Jalankan kueri berkelanjutan dengan memanggil
metode jobs.insert
.
Anda harus menetapkan kolom continuous
ke true
di
JobConfigurationQuery
dari resource Job
yang Anda teruskan.
Anda dapat mengontrol berapa lama kueri berjalan secara opsional dengan menyetel
kolom jobTimeoutMs
.
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
Ganti kode berikut:
Menjalankan kueri berkelanjutan menggunakan akun layanan
Bagian ini menjelaskan cara menjalankan kueri berkelanjutan menggunakan akun layanan. Setelah kueri berkelanjutan berjalan, Anda dapat menutup Trusted Cloud konsol, jendela terminal, atau aplikasi tanpa mengganggu eksekusi kueri. Kueri berkelanjutan yang dijalankan menggunakan akun layanan dapat berjalan hingga 150 hari, lalu berhenti secara otomatis. Untuk terus memproses data masuk baru, mulai kueri berkelanjutan baru dan tentukan titik awal. Untuk mengotomatiskan proses ini, lihat coba lagi kueri yang gagal.
Ikuti langkah-langkah berikut untuk menggunakan akun layanan guna menjalankan kueri berkelanjutan:
Konsol
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
Di Trusted Cloud konsol, buka halaman BigQuery.
Di editor kueri, klik Lainnya.
Di bagian Pilih mode kueri, pilih Kueri berkelanjutan.
Klik Konfirmasi.
Di editor kueri, klik Lainnya > Setelan kueri.
Di bagian Continuous query, gunakan kotak Service account untuk memilih akun layanan yang Anda buat.
Opsional: Untuk mengontrol durasi kueri berjalan, tetapkan Waktu tunggu tugas dalam milidetik.
Klik Simpan.
Di editor kueri, ketik pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
Klik Run.
bq
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
-
In the Trusted Cloud console, activate Cloud Shell.
Pada command line, jalankan kueri berkelanjutan menggunakan perintah
bq query
dengan flag berikut:- Tetapkan tanda
--continuous
ketrue
untuk membuat kueri berkelanjutan. - Gunakan flag
--connection_property
untuk menentukan akun layanan yang akan digunakan. - Opsional: Tetapkan flag
--job_timeout_ms
untuk membatasi runtime kueri.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Ganti kode berikut:
PROJECT_ID
: project ID Anda.SERVICE_ACCOUNT_EMAIL
: email akun layanan. Anda bisa mendapatkan email akun layanan dari halaman Akun layanan di konsol Trusted Cloud .QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.
- Tetapkan tanda
- Membuat akun layanan
- Berikan izin yang diperlukan ke akun layanan.
Jalankan kueri berkelanjutan dengan memanggil metode
jobs.insert
. Tetapkan kolom berikut di resourceJobConfigurationQuery
dari resourceJob
yang Anda teruskan:- Tetapkan kolom
continuous
ketrue
untuk membuat kueri berkelanjutan. - Gunakan kolom
connectionProperties
untuk menentukan akun layanan yang akan digunakan.
Anda dapat secara opsional mengontrol durasi kueri berjalan dengan menetapkan kolom
jobTimeoutMs
di resourceJobConfiguration
.curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
Ganti kode berikut:
PROJECT_ID
: project ID Anda.QUERY
: pernyataan SQL untuk kueri berkelanjutan. Pernyataan SQL hanya boleh berisi operasi yang didukung.SERVICE_ACCOUNT_EMAIL
: email akun layanan. Anda bisa mendapatkan email akun layanan di halaman Akun layanan di konsol Trusted Cloud .
- Tetapkan kolom
API
Membuat ID tugas kustom
Setiap tugas kueri diberi ID tugas yang dapat Anda gunakan untuk menelusuri dan mengelola tugas. Secara default, ID tugas dibuat secara acak. Untuk mempermudah pencarian ID tugas kueri berkelanjutan menggunakan histori tugas atau penjelajah tugas, Anda dapat menetapkan awalan ID tugas kustom:
Di Trusted Cloud konsol, buka halaman BigQuery.
Di editor kueri, klik Lainnya.
Di bagian Pilih mode kueri, pilih Kueri berkelanjutan.
Klik Konfirmasi.
Di editor kueri, klik Lainnya > Setelan kueri.
Di bagian Awalan ID tugas kustom, masukkan awalan nama kustom.
Klik Simpan.
Contoh
Contoh SQL berikut menunjukkan kasus penggunaan umum untuk kueri berkelanjutan.
Mengekspor data ke topik Pub/Sub
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan memublikasikan data ke topik Pub/Sub secara real time dengan atribut pesan:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Mengekspor data ke tabel Bigtable
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, dan mengekspor data ke tabel Bigtable secara real time:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Mengekspor data ke tabel Spanner
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, lalu mengekspor data ke tabel Spanner secara real time:
EXPORT DATA OPTIONS ( format = 'CLOUD_SPANNER', uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides', spanner_options ="""{ "table": "rides", -- To ensure data is written to Spanner in the correct sequence -- during a continuous export, use the change_timestamp_column -- option. This should be mapped to a timestamp column from your -- BigQuery data. If your source data lacks a timestamp, the -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function -- will be automatically mapped to the "change_timestamp" column. "change_timestamp_column": "change_timestamp" }""" ) AS ( SELECT ride_id, latitude, longitude, meter_reading, ride_status, passenger_count FROM APPENDS( TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
Menulis data ke tabel BigQuery
Contoh berikut menunjukkan kueri berkelanjutan yang memfilter dan mengubah data dari tabel BigQuery yang menerima informasi perjalanan taksi streaming, lalu menulis data ke tabel BigQuery lain secara real time. Hal ini membuat data tersedia untuk analisis hilir lebih lanjut.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
Memproses data dengan menggunakan model Vertex AI
Contoh berikut menunjukkan kueri berkelanjutan yang menggunakan model Vertex AI untuk membuat iklan bagi penumpang taksi berdasarkan lintang dan bujur saat ini, lalu mengekspor hasilnya ke topik Pub/Sub secara real time:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Mengubah SQL kueri berkelanjutan
Anda tidak dapat memperbarui SQL yang digunakan dalam kueri berkelanjutan saat tugas kueri berkelanjutan sedang berjalan. Anda harus membatalkan tugas kueri berkelanjutan, mengubah SQL, lalu memulai tugas kueri berkelanjutan baru dari titik tempat Anda menghentikan tugas kueri berkelanjutan asli.
Ikuti langkah-langkah berikut untuk mengubah SQL yang digunakan dalam kueri berkelanjutan:
- Lihat detail tugas untuk tugas kueri berkelanjutan yang ingin Anda perbarui, dan catat ID tugasnya.
- Jika memungkinkan, jeda pengumpulan data hulu. Jika Anda tidak dapat melakukannya, Anda mungkin mendapatkan beberapa duplikasi data saat kueri berkelanjutan dimulai ulang.
- Batalkan kueri berkelanjutan yang ingin Anda ubah.
Dapatkan nilai
end_time
untuk tugas kueri berkelanjutan asli menggunakan tabel virtualJOBS
:INFORMATION_SCHEMA
SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Ganti kode berikut:
PROJECT_ID
: project ID Anda.REGION
: region yang digunakan oleh project Anda.JOB_ID
: ID tugas kueri berkelanjutan yang Anda identifikasi di Langkah 1.
Ubah pernyataan SQL kueri berkelanjutan untuk memulai kueri berkelanjutan dari titik waktu tertentu, menggunakan nilai
end_time
yang Anda ambil di Langkah 5 sebagai titik awal.Ubah pernyataan SQL kueri berkelanjutan untuk mencerminkan perubahan yang Anda perlukan.
Jalankan kueri berkelanjutan yang telah diubah.
Membatalkan kueri berkelanjutan
Anda dapat membatalkan tugas kueri berkelanjutan seperti tugas lainnya. Mungkin perlu waktu hingga satu menit agar kueri berhenti berjalan setelah tugas dibatalkan.
Jika Anda membatalkan lalu memulai ulang kueri, kueri yang dimulai ulang akan berperilaku seperti kueri baru yang independen. Kueri yang dimulai ulang tidak mulai memproses data di tempat tugas sebelumnya berhenti dan tidak dapat mereferensikan hasil kueri sebelumnya. Lihat Memulai kueri berkelanjutan dari titik waktu tertentu.
Memantau kueri dan menangani error
Kueri berkelanjutan dapat terganggu karena faktor-faktor seperti inkonsistensi data, perubahan skema, gangguan layanan sementara, atau pemeliharaan. Meskipun BigQuery menangani beberapa error sementara, praktik terbaik untuk meningkatkan ketahanan tugas mencakup hal berikut: