Mengekspor data ke Pub/Sub (ETL terbalik)

Untuk mengekspor data ke Pub/Sub, Anda harus menggunakan kueri berkelanjutan BigQuery berkelanjutan.

Dokumen ini menjelaskan cara menyiapkan ekstrak-transformasi-muat (ETL) terbalik dari BigQuery ke Pub/Sub. Anda dapat melakukannya dengan menggunakan pernyataan EXPORT DATA dalam kueri berkelanjutan untuk mengekspor data dari BigQuery ke topik Pub/Sub.

Anda dapat menggunakan alur kerja RETL ke Pub/Sub untuk menggabungkan kemampuan analisis BigQuery dengan layanan pesan global asinkron dan skalabel Pub/Sub. Alur kerja ini memungkinkan Anda menyajikan data ke aplikasi dan layanan downstream secara berbasis peristiwa.

Prasyarat

Anda harus membuat akun layanan. Akun layanan diperlukan untuk menjalankan kueri berkelanjutan yang mengekspor hasil ke topik Pub/Sub.

Anda harus membuat topik Pub/Sub untuk menerima hasil kueri berkelanjutan sebagai pesan, dan langganan Pub/Sub yang dapat digunakan aplikasi target untuk menerima pesan tersebut.

Peran yang diperlukan

Bagian ini memberikan informasi tentang peran dan izin yang diperlukan oleh akun pengguna yang membuat kueri berkelanjutan, dan akun layanan yang menjalankan kueri berkelanjutan.

Izin akun pengguna

Untuk membuat tugas di BigQuery, akun pengguna harus memiliki izin IAM bigquery.jobs.create. Setiap peran IAM berikut memberikan izin bigquery.jobs.create:

Untuk mengirimkan tugas yang berjalan menggunakan akun layanan, akun pengguna harus memiliki peran Pengguna Akun Layanan (roles/iam.serviceAccountUser). Jika Anda menggunakan akun pengguna yang sama untuk membuat akun layanan, akun pengguna harus memiliki peran Admin Akun Layanan (roles/iam.serviceAccountAdmin). Untuk mengetahui informasi tentang cara membatasi akses pengguna ke satu akun layanan, bukan ke semua akun layanan dalam project, lihat Memberikan satu peran.

Jika akun pengguna harus mengaktifkan API yang diperlukan untuk kasus penggunaan kueri berkelanjutan, akun pengguna harus memiliki peran Admin Penggunaan Layanan (roles/serviceusage.serviceUsageAdmin).

Izin akun layanan

Untuk mengekspor data dari tabel BigQuery, akun layanan harus memiliki izin IAM bigquery.tables.export. Setiap peran IAM berikut memberikan izin bigquery.tables.export:

Agar akun layanan dapat mengakses Pub/Sub, Anda harus memberikan kedua peran IAM berikut ke akun layanan:

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus.

Sebelum memulai

Aktifkan BigQuery dan Pub/Sub API.

Peran yang diperlukan untuk mengaktifkan API

Untuk mengaktifkan API, Anda memerlukan peran IAM Service Usage Admin (roles/serviceusage.serviceUsageAdmin), yang berisi izin serviceusage.services.enable. Pelajari cara memberikan peran.

Aktifkan API

Ekspor ke Pub/Sub

Gunakan pernyataan EXPORT DATA untuk mengekspor data ke topik Pub/Sub:

Konsol

  1. Di Cloud de Confiance konsol, buka halaman BigQuery.

    Buka BigQuery

  2. Di editor kueri, klik Edit > Query settings.

  3. Di bagian Kueri berkelanjutan, centang kotak Use continuous query mode.

  4. Di kotak Service account, pilih akun layanan yang Anda buat.

  5. Klik Save.

  6. Di editor kueri, masukkan pernyataan berikut:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • TOPIC_ID: ID topik Pub/Sub. Anda bisa mendapatkan ID topik dari halaman Topics di Cloud de Confiance konsol.
    • QUERY: pernyataan SQL untuk memilih data yang akan diekspor. Pernyataan SQL hanya boleh berisi operasi yang didukung. Anda harus menggunakan fungsi APPENDS dalam klausa FROM kueri berkelanjutan untuk menentukan titik waktu saat memulai pemrosesan data.
  7. Klik Run.

bq

  1. Di konsol, aktifkan Cloud Shell. Cloud de Confiance

    Aktifkan Cloud Shell

    Di bagian bawah konsol Cloud de Confiance , sesi Cloud Shell akan dimulai dan menampilkan prompt command line. Cloud Shell adalah lingkungan shell dengan Google Cloud CLI yang sudah terinstal, dan dengan nilai yang sudah ditetapkan untuk project Anda saat ini. Diperlukan waktu beberapa detik untuk melakukan inisialisasi pada sesi.

  2. Pada command line, jalankan kueri berkelanjutan menggunakan bq query perintah dengan flag berikut:

    • Tetapkan flag --continuous ke true agar kueri berkelanjutan.
    • Gunakan flag --connection_property untuk menentukan akun layanan yang akan digunakan.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • SERVICE_ACCOUNT_EMAIL: email akun layanan. Anda bisa mendapatkan email akun layanan di halaman **Service accounts** di Cloud de Confiance konsol.
    • QUERY: pernyataan SQL untuk memilih data yang akan diekspor. Pernyataan SQL hanya boleh berisi operasi yang didukung. Anda harus menggunakan fungsi APPENDS dalam klausa FROM kueri berkelanjutan untuk menentukan titik waktu saat memulai pemrosesan data.

API

  1. Jalankan kueri berkelanjutan dengan memanggil metode jobs.insert. Tetapkan kolom berikut dalam resource JobConfigurationQuery dari resource Job yang Anda teruskan:

    • Tetapkan kolom continuous ke true agar kueri berkelanjutan.
    • Gunakan kolom connection_property untuk menentukan akun layanan yang akan digunakan.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • QUERY: pernyataan SQL untuk memilih data yang akan diekspor. Pernyataan SQL hanya boleh berisi operasi yang didukung. Anda harus menggunakan fungsi APPENDS dalam klausa FROM kueri berkelanjutan untuk menentukan titik waktu saat memulai pemrosesan data.
    • SERVICE_ACCOUNT_EMAIL: email akun layanan. Anda bisa mendapatkan email akun layanan di halaman **Service accounts** di Cloud de Confiance konsol.

Mengekspor beberapa kolom ke Pub/Sub

Jika ingin menyertakan beberapa kolom dalam output, Anda dapat membuat kolom struct untuk berisi nilai kolom, lalu mengonversi nilai struct menjadi string JSON menggunakan fungsi TO_JSON_STRING. Contoh berikut mengekspor data dari empat kolom, yang diformat sebagai string JSON:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

Pengoptimalan ekspor

Jika performa tugas kueri berkelanjutan Anda tampaknya dibatasi oleh resource komputasi yang tersedia, coba tingkatkan ukuran penetapan reservasi slot BigQuery CONTINUOUS Anda.

Batasan

  • Data yang diekspor harus terdiri dari satu STRING atau BYTES kolom. Nama kolom dapat berupa apa pun yang Anda pilih.
  • Anda harus menggunakan a kueri berkelanjutan untuk mengekspor ke Pub/Sub.
  • Anda tidak dapat meneruskan skema ke topik Pub/Sub dalam kueri berkelanjutan.
  • Anda tidak dapat mengekspor data ke topik Pub/Sub yang menggunakan skema.
  • Saat mengekspor ke Pub/Sub, Anda dapat mengekspor rekaman berformat JSON yang beberapa nilainya adalah NULL, tetapi Anda tidak dapat mengekspor rekaman yang hanya terdiri dari nilai NULL. Anda dapat mengecualikan rekaman NULL dari hasil kueri dengan menyertakan filter WHERE message IS NOT NULL dalam kueri berkelanjutan.
  • Saat mengekspor data ke topik Pub/Sub yang dikonfigurasi dengan endpoint lokasi, endpoint harus dikonfigurasi dalam batas Cloud de Confiance regional yang sama dengan set data BigQuery yang berisi tabel yang Anda kueri.
  • Data yang diekspor tidak boleh melebihi kuota Pub/Sub.

Harga

Saat mengekspor data dalam kueri berkelanjutan, Anda akan ditagih menggunakan harga komputasi kapasitas BigQuery. Untuk menjalankan kueri berkelanjutan, Anda harus memiliki reservasi yang menggunakan edisi Enterprise atau Enterprise Plus, dan penetapan reservasi yang menggunakan jenis tugas CONTINUOUS.

Setelah data diekspor, Anda akan dikenai biaya untuk menggunakan Pub/Sub. Untuk mengetahui informasi selengkapnya, lihat harga Pub/Sub.