Membuat topik impor Cloud Storage

Topik impor Cloud Storage memungkinkan Anda terus-menerus menyerap data dari Cloud Storage ke Pub/Sub. Kemudian, Anda dapat melakukan streaming data ke tujuan mana pun yang didukung Pub/Sub. Pub/Sub secara otomatis mendeteksi objek baru yang ditambahkan ke bucket Cloud Storage dan menyerapnya.

Cloud Storage adalah layanan untuk menyimpan objek Anda di Trusted Cloud by S3NS. Objek adalah bagian data yang tidak dapat diubah dan terdiri dari file dalam format apa pun. Anda menyimpan objek di container yang disebut bucket. Bucket juga dapat berisi folder terkelola, yang Anda gunakan untuk memberikan akses yang diperluas ke grup objek dengan awalan nama yang sama.

Untuk mengetahui informasi selengkapnya tentang Cloud Storage, lihat dokumentasi Cloud Storage.

Untuk mengetahui informasi selengkapnya tentang topik impor, lihat Tentang topik impor.

Sebelum memulai

Peran dan izin yang diperlukan

Untuk mendapatkan izin yang Anda perlukan guna membuat dan mengelola topik impor Cloud Storage, minta administrator Anda untuk memberi Anda peran IAM Pub/Sub Editor (roles/pubsub.editor) di topik atau project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat dan mengelola topik impor Cloud Storage. Untuk melihat izin yang benar-benar diperlukan, luaskan bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat dan mengelola topik impor Cloud Storage:

  • Buat topik impor: pubsub.topics.create
  • Menghapus topik impor: pubsub.topics.delete
  • Dapatkan topik impor: pubsub.topics.get
  • Cantumkan topik impor: pubsub.topics.list
  • Memublikasikan ke topik impor: pubsub.topics.publish
  • Perbarui topik impor: pubsub.topics.update
  • Dapatkan kebijakan IAM untuk topik impor: pubsub.topics.getIamPolicy
  • Konfigurasi kebijakan IAM untuk topik impor: pubsub.topics.setIamPolicy

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Anda dapat mengonfigurasi kontrol akses di level project dan level resource individual.

Kebijakan penyimpanan pesan sesuai dengan lokasi bucket

Kebijakan penyimpanan pesan topik Pub/Sub harus tumpang-tindih dengan region tempat bucket Cloud Storage Anda berada. Kebijakan ini menentukan tempat Pub/Sub diizinkan menyimpan data pesan Anda.

  • Untuk bucket dengan jenis lokasi sebagai region: Kebijakan harus menyertakan region tertentu tersebut. Misalnya, jika bucket Anda berada di region us-central1, kebijakan penyimpanan pesan juga harus mencakup us-central1.

  • Untuk bucket dengan jenis lokasi dual-region atau multi-region: Kebijakan harus menyertakan setidaknya satu region dalam lokasi dual-region atau multi-region. Misalnya, jika bucket Anda berada di US multi-region, kebijakan penyimpanan pesan dapat mencakup us-central1, us-east1, atau region lain dalam US multi-region.

    Jika kebijakan tidak menyertakan region bucket, pembuatan topik akan gagal. Misalnya, jika bucket Anda berada di europe-west1 dan kebijakan penyimpanan pesan Anda hanya mencakup asia-east1, Anda akan menerima error.

    Jika kebijakan penyimpanan pesan hanya mencakup satu region yang tumpang-tindih dengan lokasi bucket, redundansi multi-region mungkin terganggu. Hal ini karena jika satu region tersebut menjadi tidak tersedia, data Anda mungkin tidak dapat diakses. Untuk memastikan redundansi penuh, sebaiknya sertakan setidaknya dua region dalam kebijakan penyimpanan pesan yang merupakan bagian dari lokasi multi-region atau dual-region bucket.

Untuk mengetahui informasi selengkapnya tentang lokasi bucket, lihat dokumentasi.

Mengaktifkan publikasi

Untuk mengaktifkan publikasi, Anda harus menetapkan peran Pub/Sub Publisher ke akun layanan Pub/Sub agar Pub/Sub dapat memublikasikan ke topik impor Cloud Storage.

Mengaktifkan publikasi ke semua topik impor Cloud Storage

Pilih opsi ini jika Anda tidak memiliki topik impor Cloud Storage yang tersedia di project Anda.

  1. Di konsol Trusted Cloud , buka halaman IAM.

    Buka IAM

  2. Centang kotak Include S3NS-provided role grants.

  3. Cari akun layanan Pub/Sub yang memiliki format:

    service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com

  4. Untuk akun layanan ini, klik tombol Edit Principal.

  5. Jika diperlukan, klik Tambahkan peran lain.

  6. Telusuri dan pilih peran Pub/Sub publisher (roles/pubsub.publisher).

  7. Klik Simpan.

Mengaktifkan publikasi ke satu topik impor Cloud Storage

Jika Anda ingin memberikan izin kepada Pub/Sub untuk memublikasikan ke topik impor Cloud Storage tertentu yang sudah ada, ikuti langkah-langkah berikut:

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID\
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com"\
       --role="roles/pubsub.publisher"

    Ganti kode berikut:

    • TOPIC_ID adalah ID atau nama topik impor Cloud Storage.

    • PROJECT_NUMBER adalah nomor project. Untuk melihat nomor project, lihat Mengidentifikasi project.

  3. Menetapkan peran Cloud Storage ke akun layanan Pub/Sub

    Untuk membuat topik impor Cloud Storage, akun layanan Pub/Sub harus memiliki izin untuk membaca dari bucket Cloud Storage tertentu. Izin berikut diperlukan:

    • storage.objects.list
    • storage.objects.get
    • storage.buckets.get

    Untuk menetapkan izin ini ke akun layanan Pub/Sub, pilih salah satu prosedur berikut:

    • Memberikan izin di tingkat bucket. Di bucket Cloud Storage tertentu, berikan peran Storage Legacy Object Reader (roles/storage.legacyObjectReader) dan peran Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) ke akun layanan Pub/Sub.

    • Jika Anda harus memberikan peran di tingkat project, Anda dapat memberikan peran Storage Admin (roles/storage.admin) di project yang berisi bucket Cloud Storage. Berikan peran ini ke akun layanan Pub/Sub.

    Izin bucket

    Lakukan langkah-langkah berikut untuk memberikan peran Storage Legacy Object Reader (roles/storage.legacyObjectReader) dan Storage Legacy Bucket Reader (roles/storage.legacyBucketReader) ke akun layanan Pub/Sub di tingkat bucket:

    1. Di Trusted Cloud konsol, buka halaman Cloud Storage.

      Buka Cloud Storage

    2. Klik bucket Cloud Storage tempat Anda ingin membaca pesan dan mengimpor ke topik impor Cloud Storage.

      Halaman Bucket details akan terbuka.

    3. Di halaman Detail bucket, klik tab Izin.

    4. Di tab Izin > Lihat menurut Principal, klik Berikan akses.

      Halaman Berikan akses akan terbuka.

    5. Di bagian Add Principals, masukkan nama akun layanan Pub/Sub Anda.

      Format akun layanan adalah service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com. Misalnya, untuk project dengan PROJECT_NUMBER=112233445566, akun layanan memiliki format service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

    6. Di drop-down Assign roles > Select a role, masukkan Object Reader dan pilih peran Storage Legacy Object Reader.

    7. Klik Add another role.

    8. Di drop-down Pilih peran, masukkan Bucket Reader, lalu pilih peran Storage Legacy Bucket Reader.

    9. Klik Simpan.

    Izin project

    Lakukan langkah-langkah berikut untuk memberikan peran Storage Admin (roles/storage.admin) di level project:

    1. Di konsol Trusted Cloud , buka halaman IAM.

      Buka IAM

    2. Di tab Izin > Lihat menurut Principal, klik Berikan akses.

      Halaman Berikan akses akan terbuka.

    3. Di bagian Add Principals, masukkan nama akun layanan Pub/Sub Anda.

      Format akun layanan adalah service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com. Misalnya, untuk project dengan PROJECT_NUMBER=112233445566, akun layanan memiliki format service-112233445566@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

    4. Di drop-down Assign roles > Select a role, masukkan Storage Admin, lalu pilih peran Storage Admin.

    5. Klik Simpan.

    Untuk mengetahui informasi selengkapnya tentang IAM Cloud Storage, lihat Cloud Storage Identity and Access Management.

    Properti topik impor Cloud Storage

    Untuk mengetahui informasi selengkapnya tentang properti umum di semua topik, lihat Properti topik.

    Nama bucket

    Ini adalah nama bucket Cloud Storage tempat Pub/Sub membaca data yang dipublikasikan ke topik impor Cloud Storage.

    Format input

    Saat membuat topik impor Cloud Storage, Anda dapat menentukan format objek yang akan di-ingest sebagai Teks, Avro, atau Pub/Sub Avro.

    • Teks. Objek diasumsikan menyimpan data dengan teks biasa. Format input ini mencoba menyerap semua objek dalam bucket selama objek memenuhi waktu pembuatan objek minimum dan cocok dengan kriteria pola glob.

      Pembatas. Anda juga dapat menentukan pembatas yang digunakan untuk memisahkan objek menjadi pesan. Jika tidak ditetapkan, nilai defaultnya adalah karakter baris baru (\n). Pemisah hanya boleh berupa satu karakter.

    • Avro. Objek dalam format biner Apache Avro. Objek apa pun yang tidak dalam format Apache Avro yang valid tidak akan diproses. Berikut adalah batasan terkait Avro:

      • Avro versi 1.1.0 dan 1.2.0 tidak didukung.
      • Ukuran maksimum blok Avro adalah 16 MB.
    • Pub/Sub Avro. Objek dalam format biner Apache Avro dengan skema yang cocok dengan objek yang ditulis ke Cloud Storage menggunakan langganan Cloud Storage Pub/Sub dengan format file Avro. Berikut beberapa panduan penting untuk Avro Pub/Sub:

      • Kolom data rekaman Avro digunakan untuk mengisi kolom data pesan Pub/Sub yang dihasilkan.

      • Jika write_metadata option ditentukan untuk langganan Cloud Storage, semua nilai di kolom atribut akan diisi sebagai atribut pesan Pub/Sub yang dibuat.

      • Jika kunci pengurutan ditentukan dalam pesan asli yang ditulis ke Cloud Storage, kolom ini akan diisi sebagai atribut dengan nama original_message_ordering_key dalam pesan Pub/Sub yang dihasilkan.

    Waktu pembuatan objek minimum

    Secara opsional, Anda dapat menentukan waktu pembuatan objek minimum saat membuat topik impor Cloud Storage. Hanya objek yang dibuat pada atau setelah stempel waktu ini yang di-ingest. Stempel waktu ini harus diberikan dalam format seperti YYYY-MM-DDThh:mm:ssZ. Tanggal apa pun, baik di masa lalu maupun di masa mendatang, dari 0001-01-01T00:00:00Z hingga 9999-12-31T23:59:59Z inklusif, valid.

    Cocokkan pola glob

    Secara opsional, Anda dapat menentukan pola glob kecocokan saat membuat topik impor Cloud Storage. Hanya objek dengan nama yang cocok dengan pola ini yang di-ingest. Misalnya, untuk menyerap semua objek dengan akhiran .txt, Anda dapat menentukan pola glob sebagai **.txt.

    Untuk mengetahui informasi tentang sintaksis yang didukung untuk pola glob, lihat dokumentasi Cloud Storage.

    Menggunakan topik impor Cloud Storage

    Anda dapat membuat topik impor baru atau mengedit topik yang ada.

    Pertimbangan

    • Membuat topik dan langganan secara terpisah, meskipun dilakukan secara berurutan dengan cepat, dapat menyebabkan hilangnya data. Ada jendela singkat tempat topik tersedia tanpa langganan. Jika ada data yang dikirim ke topik selama waktu ini, data tersebut akan hilang. Dengan membuat topik terlebih dahulu, membuat langganan, lalu mengonversi topik menjadi topik impor, Anda menjamin bahwa tidak ada pesan yang terlewat selama proses impor.

    Membuat topik impor Cloud Storage

    Untuk membuat topik impor Cloud Storage, ikuti langkah-langkah berikut:

    Konsol

    1. Di konsol Trusted Cloud , buka halaman Topics.

      Buka Topik

    2. Klik Create topic.

      Halaman detail topik akan terbuka.

    3. Di kolom Topic ID, masukkan ID untuk topik impor Cloud Storage Anda.

      Untuk mengetahui informasi selengkapnya tentang penamaan topik, lihat panduan penamaan.

    4. Pilih Tambahkan langganan default.

    5. Pilih Aktifkan penyerapan.

    6. Untuk sumber penyerapan, pilih Google Cloud Storage.

    7. Untuk bucket Cloud Storage, klik Browse.

      Halaman Select bucket akan terbuka. Pilih salah satu opsi berikut:

      • Pilih bucket yang ada dari project yang sesuai.

      • Klik ikon buat dan ikuti petunjuk di layar untuk membuat bucket baru. Setelah Anda membuat bucket, pilih bucket untuk topik impor Cloud Storage.

    8. Saat Anda menentukan bucket, Pub/Sub akan memeriksa izin yang sesuai di bucket untuk akun layanan Pub/Sub. Jika ada masalah izin, Anda akan melihat pesan yang serupa seperti berikut:

      Unable to verify if the Pub/Sub service agent has write permissions on this bucket. You may be lacking permissions to view or set permissions.

      Jika Anda mengalami masalah izin, klik Setel izin. Untuk mengetahui informasi selengkapnya, lihat Memberikan izin Cloud Storage ke akun layanan Pub/Sub.

    9. Untuk Object format, pilih Text, Avro, atau Pub/Sub Avro.

      Jika memilih Teks, Anda dapat secara opsional menentukan Pemisah Kolom untuk memisahkan objek menjadi pesan.

      Untuk mengetahui informasi selengkapnya tentang opsi ini, lihat Format input.

    10. Opsional. Anda dapat menentukan Waktu pembuatan objek minimum untuk topik Anda. Jika disetel, hanya objek yang dibuat setelah waktu pembuatan objek minimum yang akan di-ingest.

      Untuk mengetahui informasi selengkapnya, lihat Waktu pembuatan objek minimum.

    11. Anda harus menentukan Pola glob. Untuk menyerap semua objek dalam bucket, gunakan ** sebagai pola glob. Jika ditetapkan, hanya objek yang cocok dengan pola yang diberikan yang akan di-ingest.

      Untuk mengetahui informasi selengkapnya, lihat Mencocokkan pola glob.

    12. Pertahankan setelan default lainnya.

    13. Klik Create topic.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Jalankan perintah gcloud pubsub topics create:

      gcloud pubsub topics create TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME \
          --cloud-storage-ingestion-input-format=INPUT_FORMAT \
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER \
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME \
          --cloud-storage-ingestion-match-glob=MATCH_GLOB
      

      Dalam perintah, hanya TOPIC_ID, flag --cloud-storage-ingestion-bucket, dan flag --cloud-storage-ingestion-input-format yang diperlukan. Flag lainnya bersifat opsional dan dapat dihilangkan.

      Ganti kode berikut:

      • TOPIC_ID: Nama atau ID topik Anda.
      • BUCKET_NAME: Menentukan nama bucket yang ada. Contoh, prod_bucket. Nama bucket tidak boleh menyertakan ID project. Untuk membuat bucket, lihat Membuat bucket.
      • INPUT_FORMAT: Menentukan format objek yang di-ingest. Nilai ini dapat berupa text, avro, atau pubsub_avro. Untuk mengetahui informasi selengkapnya tentang opsi ini, lihat Format input.
      • TEXT_DELIMITER: Menentukan pemisah yang akan digunakan untuk memisahkan objek teks menjadi pesan Pub/Sub. Ini harus berupa satu karakter dan hanya boleh ditetapkan jika INPUT_FORMAT adalah text. Nilai defaultnya adalah karakter baris baru (\n).

        Saat menggunakan gcloud CLI untuk menentukan pemisah, perhatikan dengan cermat penanganan karakter khusus seperti baris baru \n. Gunakan format '\n' untuk memastikan pembatas ditafsirkan dengan benar. Hanya menggunakan \n tanpa tanda petik atau karakter escape akan menghasilkan pembatas "n".

      • MINIMUM_OBJECT_CREATE_TIME: Menentukan waktu minimum saat objek dibuat agar dapat diproses. Stempel waktu ini harus dalam UTC dengan format YYYY-MM-DDThh:mm:ssZ. Misalnya, 2024-10-14T08:30:30Z.

        Tanggal apa pun, baik masa lalu maupun masa mendatang, dari 0001-01-01T00:00:00Z hingga 9999-12-31T23:59:59Z inklusif, valid.

      • MATCH_GLOB: Menentukan pola glob yang akan dicocokkan agar objek dapat di-ingest. Saat Anda menggunakan gcloud CLI, glob kecocokan dengan karakter * harus memiliki karakter * yang diformat sebagai karakter yang di-escape dalam bentuk \*\*.txt atau seluruh glob kecocokan harus berada dalam tanda kutip "**.txt" atau '**.txt'. Untuk mengetahui informasi tentang sintaksis yang didukung untuk pola glob, lihat dokumentasi Cloud Storage.

    3. C++

      Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string bucket, std::string const& input_format,
         std::string text_delimiter, std::string match_glob,
         std::string const& minimum_object_create_time) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto& cloud_storage = *request.mutable_ingestion_data_source_settings()
                                   ->mutable_cloud_storage();
        cloud_storage.set_bucket(std::move(bucket));
        if (input_format == "text") {
          cloud_storage.mutable_text_format()->set_delimiter(
              std::move(text_delimiter));
        } else if (input_format == "avro") {
          cloud_storage.mutable_avro_format();
        } else if (input_format == "pubsub_avro") {
          cloud_storage.mutable_pubsub_avro_format();
        } else {
          std::cout << "input_format must be in ('text', 'avro', 'pubsub_avro'); "
                       "got value: "
                    << input_format << std::endl;
          return;
        }
      
        if (!match_glob.empty()) {
          cloud_storage.set_match_glob(std::move(match_glob));
        }
      
        if (!minimum_object_create_time.empty()) {
          google::protobuf::Timestamp timestamp;
          if (!google::protobuf::util::TimeUtil::FromString(
                  minimum_object_create_time,
                  cloud_storage.mutable_minimum_object_create_time())) {
            std::cout << "Invalid minimum object create time: "
                      << minimum_object_create_time << std::endl;
          }
        }
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      Contoh berikut menggunakan library klien Go Pub/Sub versi utama (v2). Jika Anda masih menggunakan library v1, lihat panduan migrasi ke v2. Untuk melihat daftar contoh kode v1, lihat contoh kode yang tidak digunakan lagi.

      Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

      import (
      	"context"
      	"fmt"
      	"io"
      	"time"
      
      	"cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      	"google.golang.org/protobuf/types/known/timestamppb"
      )
      
      func createTopicWithCloudStorageIngestion(w io.Writer, projectID, topicID, bucket, matchGlob, minimumObjectCreateTime, delimiter string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      	// bucket := "my-bucket"
      	// matchGlob := "**.txt"
      	// minimumObjectCreateTime := "2006-01-02T15:04:05Z"
      	// delimiter := ","
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	minCreateTime, err := time.Parse(time.RFC3339, minimumObjectCreateTime)
      	if err != nil {
      		return err
      	}
      
      	topicpb := &pubsubpb.Topic{
      		Name: fmt.Sprintf("projects/%s/topics/%s", projectID, topicID),
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_CloudStorage_{
      				CloudStorage: &pubsubpb.IngestionDataSourceSettings_CloudStorage{
      					Bucket: bucket,
      					// Alternatively, can be Avro or PubSubAvro formats. See
      					InputFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat_{
      						TextFormat: &pubsubpb.IngestionDataSourceSettings_CloudStorage_TextFormat{
      							Delimiter: &delimiter,
      						},
      					},
      					MatchGlob:               matchGlob,
      					MinimumObjectCreateTime: timestamppb.New(minCreateTime),
      				},
      			},
      		},
      	}
      	t, err := client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Cloud storage topic created: %v\n", t)
      	return nil
      }
      

      Java

      Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Java API.

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.protobuf.util.Timestamps;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      import java.text.ParseException;
      
      public class CreateTopicWithCloudStorageIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Cloud Storage ingestion settings.
          // bucket and inputFormat are required arguments.
          String bucket = "your-bucket";
          String inputFormat = "text";
          String textDelimiter = "\n";
          String matchGlob = "**.txt";
          String minimumObjectCreateTime = "YYYY-MM-DDThh:mm:ssZ";
      
          createTopicWithCloudStorageIngestionExample(
              projectId, topicId, bucket, inputFormat, textDelimiter, matchGlob, minimumObjectCreateTime);
        }
      
        public static void createTopicWithCloudStorageIngestionExample(
            String projectId,
            String topicId,
            String bucket,
            String inputFormat,
            String textDelimiter,
            String matchGlob,
            String minimumObjectCreateTime)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            IngestionDataSourceSettings.CloudStorage.Builder cloudStorageBuilder =
                IngestionDataSourceSettings.CloudStorage.newBuilder().setBucket(bucket);
            switch (inputFormat) {
              case "text":
                cloudStorageBuilder.setTextFormat(
                    IngestionDataSourceSettings.CloudStorage.TextFormat.newBuilder()
                        .setDelimiter(textDelimiter)
                        .build());
                break;
              case "avro":
                cloudStorageBuilder.setAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.AvroFormat.getDefaultInstance());
                break;
              case "pubsub_avro":
                cloudStorageBuilder.setPubsubAvroFormat(
                    IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat.getDefaultInstance());
                break;
              default:
                throw new IllegalArgumentException(
                    "inputFormat must be in ('text', 'avro', 'pubsub_avro'); got value: " + inputFormat);
            }
      
            if (matchGlob != null && !matchGlob.isEmpty()) {
              cloudStorageBuilder.setMatchGlob(matchGlob);
            }
      
            if (minimumObjectCreateTime != null && !minimumObjectCreateTime.isEmpty()) {
              try {
                cloudStorageBuilder.setMinimumObjectCreateTime(Timestamps.parse(minimumObjectCreateTime));
              } catch (ParseException e) {
                System.err.println("Unable to parse timestamp: " + minimumObjectCreateTime);
              }
            }
      
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder()
                    .setCloudStorage(cloudStorageBuilder.build())
                    .build();
      
            TopicName topicName = TopicName.of(projectId, topicId);
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Cloud Storage ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId,
        bucket,
        inputFormat,
        textDelimiter,
        matchGlob,
        minimumObjectCreateTime,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings.cloudStorage.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Node.ts

      Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bucket = 'YOUR_BUCKET_NAME';
      // const inputFormat = 'text';
      // const textDelimiter = '\n';
      // const matchGlob = '**.txt';
      // const minimumObjectCreateTime = 'YYYY-MM-DDThh:mm:ssZ;
      
      // Imports the Google Cloud client library
      import {PubSub, TopicMetadata} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithCloudStorageIngestion(
        topicNameOrId: string,
        bucket: string,
        inputFormat: string,
        textDelimiter: string,
        matchGlob: string,
        minimumObjectCreateTime: string,
      ) {
        const minimumDate = Date.parse(minimumObjectCreateTime);
        const topicMetadata: TopicMetadata = {
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            cloudStorage: {
              bucket,
              minimumObjectCreateTime: {
                seconds: minimumDate / 1000,
                nanos: (minimumDate % 1000) * 1000,
              },
              matchGlob,
            },
          },
        };
      
        // Make a format appropriately.
        switch (inputFormat) {
          case 'text':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.textFormat = {
              delimiter: textDelimiter,
            };
            break;
          case 'avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.avroFormat = {};
            break;
          case 'pubsub_avro':
            topicMetadata.ingestionDataSourceSettings!.cloudStorage!.pubsubAvroFormat =
              {};
            break;
          default:
            console.error('inputFormat must be in ("text", "avro", "pubsub_avro")');
            return;
        }
      
        // Creates a new topic with Cloud Storage ingestion.
        await pubSubClient.createTopic(topicMetadata);
        console.log(`Topic ${topicNameOrId} created with Cloud Storage ingestion.`);
      }

      Python

      Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

      from google.cloud import pubsub_v1
      from google.protobuf import timestamp_pb2
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bucket = "your-bucket"
      # input_format = "text"  (can be one of "text", "avro", "pubsub_avro")
      # text_delimiter = "\n"
      # match_glob = "**.txt"
      # minimum_object_create_time = "YYYY-MM-DDThh:mm:ssZ"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      cloud_storage_settings = IngestionDataSourceSettings.CloudStorage(
          bucket=bucket,
      )
      if input_format == "text":
          cloud_storage_settings.text_format = (
              IngestionDataSourceSettings.CloudStorage.TextFormat(
                  delimiter=text_delimiter
              )
          )
      elif input_format == "avro":
          cloud_storage_settings.avro_format = (
              IngestionDataSourceSettings.CloudStorage.AvroFormat()
          )
      elif input_format == "pubsub_avro":
          cloud_storage_settings.pubsub_avro_format = (
              IngestionDataSourceSettings.CloudStorage.PubSubAvroFormat()
          )
      else:
          print(
              "Invalid input_format: "
              + input_format
              + "; must be in ('text', 'avro', 'pubsub_avro')"
          )
          return
      
      if match_glob:
          cloud_storage_settings.match_glob = match_glob
      
      if minimum_object_create_time:
          try:
              minimum_object_create_time_timestamp = timestamp_pb2.Timestamp()
              minimum_object_create_time_timestamp.FromJsonString(
                  minimum_object_create_time
              )
              cloud_storage_settings.minimum_object_create_time = (
                  minimum_object_create_time_timestamp
              )
          except ValueError:
              print("Invalid minimum_object_create_time: " + minimum_object_create_time)
              return
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              cloud_storage=cloud_storage_settings,
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Cloud Storage Ingestion Settings")

    Jika Anda mengalami masalah, lihat Topik pemecahan masalah impor Cloud Storage.

    Mengedit topik impor Cloud Storage

    Anda dapat mengedit topik impor Cloud Storage untuk memperbarui propertinya.

    Misalnya, untuk memulai ulang penyerapan, Anda dapat mengubah bucket atau memperbarui waktu pembuatan objek minimum.

    Untuk mengedit topik impor Cloud Storage, lakukan langkah-langkah berikut:

    Konsol

    1. Di konsol Trusted Cloud , buka halaman Topics.

      Buka Topik

    2. Klik topik impor Cloud Storage.

    3. Di halaman detail topik, klik Edit.

    4. Perbarui kolom yang ingin Anda ubah.

    5. Klik Perbarui.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Agar tidak kehilangan setelan untuk topik impor, pastikan untuk menyertakan semuanya setiap kali Anda memperbarui topik. Jika Anda tidak menyertakan sesuatu, Pub/Sub akan mereset setelan ke nilai default aslinya.

      Jalankan perintah gcloud pubsub topics update dengan semua flag yang disebutkan dalam contoh berikut:

      gcloud pubsub topics update TOPIC_ID \
          --cloud-storage-ingestion-bucket=BUCKET_NAME\
          --cloud-storage-ingestion-input-format=INPUT_FORMAT\
          --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
          --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
          --cloud-storage-ingestion-match-glob=MATCH_GLOB

      Ganti kode berikut:

      • TOPIC_ID adalah ID atau nama topik. Kolom ini tidak dapat diperbarui.

      • BUCKET_NAME: Menentukan nama bucket yang ada. Contoh, prod_bucket. Nama bucket tidak boleh menyertakan ID project. Untuk membuat bucket, lihat Membuat bucket.

      • INPUT_FORMAT: Menentukan format objek yang di-ingest. Nilai ini dapat berupa text, avro, atau pubsub_avro. Lihat Format input untuk mengetahui informasi selengkapnya tentang opsi ini.

      • TEXT_DELIMITER: Menentukan pemisah yang akan digunakan untuk memisahkan objek teks menjadi pesan Pub/Sub. Ini harus berupa satu karakter dan hanya boleh ditetapkan jika INPUT_FORMAT adalah text. Nilai defaultnya adalah karakter baris baru (\n).

        Saat menggunakan gcloud CLI untuk menentukan pemisah, perhatikan dengan cermat penanganan karakter khusus seperti baris baru \n. Gunakan format '\n' untuk memastikan pemisah ditafsirkan dengan benar. Hanya menggunakan \n tanpa tanda petik atau karakter escape akan menghasilkan pembatas "n".

      • MINIMUM_OBJECT_CREATE_TIME: Menentukan waktu minimum saat objek dibuat agar dapat diproses. Stempel waktu ini harus dalam UTC dengan format YYYY-MM-DDThh:mm:ssZ. Misalnya, 2024-10-14T08:30:30Z.

        Tanggal apa pun, baik masa lalu maupun masa mendatang, dari 0001-01-01T00:00:00Z hingga 9999-12-31T23:59:59Z inklusif, valid.

      • MATCH_GLOB: Menentukan pola glob yang akan dicocokkan agar objek dapat di-ingest. Saat Anda menggunakan gcloud CLI, glob kecocokan dengan karakter * harus memiliki karakter * yang diformat sebagai karakter yang di-escape dalam bentuk \*\*.txt atau seluruh glob kecocokan harus berada dalam tanda kutip "**.txt" atau '**.txt'. Untuk mengetahui informasi tentang sintaksis yang didukung untuk pola glob, lihat dokumentasi Cloud Storage.

    Kuota dan batas untuk topik impor Cloud Storage

    Throughput penayang untuk topik impor dibatasi oleh kuota publikasi topik. Untuk mengetahui informasi selengkapnya, lihat Kuota dan batas Pub/Sub.

    Langkah berikutnya