Crie um tópico de importação do Confluent Cloud

Um tópico de importação do Confluent Cloud permite-lhe carregar continuamente dados do Confluent Cloud como uma origem externa e para o Pub/Sub. Em seguida, pode transmitir os dados para qualquer um dos destinos suportados pelo Pub/Sub.

Este documento mostra como criar e gerir tópicos de importação do Confluent Cloud. Para criar um tópico padrão, consulte o artigo Crie um tópico padrão.

Para mais informações sobre tópicos de importação, consulte o artigo Acerca dos tópicos de importação.

Antes de começar

Funções e autorizações necessárias

Para receber as autorizações de que precisa para criar e gerir tópicos de importação do Confluent Cloud, peça ao seu administrador para lhe conceder a função IAM de Editor do Pub/Sub (roles/pubsub.editor) no seu tópico ou projeto. Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Esta função predefinida contém as autorizações necessárias para criar e gerir tópicos de importação do Confluent Cloud. Para ver as autorizações exatas que são necessárias, expanda a secção Autorizações necessárias:

Autorizações necessárias

São necessárias as seguintes autorizações para criar e gerir tópicos de importação do Confluent Cloud:

  • Crie um tópico de importação: pubsub.topics.create
  • Elimine um tópico de importação: pubsub.topics.delete
  • Obtenha um tópico de importação: pubsub.topics.get
  • Indique um tópico de importação: pubsub.topics.list
  • Publicar num tópico de importação: pubsub.topics.publish and pubsub.serviceAgent
  • Atualize um tópico de importação: pubsub.topics.update
  • Obtenha a Política IAM para um tópico de importação: pubsub.topics.getIamPolicy
  • Configure a política IAM para um tópico de importação: pubsub.topics.setIamPolicy

Também pode conseguir estas autorizações com funções personalizadas ou outras funções predefinidas.

Pode configurar o controlo de acesso ao nível do projeto e ao nível do recurso individual.

Configure a identidade federada para aceder ao Confluent Cloud

A federação de identidades da carga de trabalho permite que os Trusted Cloud serviços acedam a cargas de trabalho executadas fora do Trusted Cloud. Com a federação de identidades, não precisa de manter nem transmitir credenciais para Trusted Cloud aceder aos seus recursos noutras nuvens. Em alternativa, pode usar as identidades das próprias cargas de trabalho para fazer a autenticação no Trusted Cloud e aceder aos recursos.

Crie uma conta de serviço em Trusted Cloud

Este passo é opcional. Se já tiver uma conta de serviço, pode usá-la neste procedimento em vez de criar uma nova conta de serviço. Se estiver a usar uma conta de serviço existente, aceda a Registe o ID exclusivo da conta de serviço para o passo seguinte.

Para tópicos de importação do Confluent Cloud, o Pub/Sub usa a conta de serviço como a identidade para aceder aos recursos do Confluent Cloud.

Para mais informações sobre como criar uma conta de serviço, incluindo pré-requisitos, funções e autorizações necessárias, e diretrizes de nomenclatura, consulte o artigo Crie contas de serviço. Depois de criar uma conta de serviço, pode ter de aguardar 60 segundos ou mais antes de usar a conta de serviço. Este comportamento ocorre porque as operações de leitura são eventualmente consistentes. Pode demorar algum tempo até que a nova conta de serviço fique visível.

Registe o ID exclusivo da conta de serviço

Precisa de um ID exclusivo da conta de serviço para configurar o fornecedor de identidade e o conjunto na consola do Confluent Cloud.

  1. Na Trusted Cloud consola, aceda à página de detalhes da conta de serviço.

    Aceder à conta de serviço

  2. Clique na conta de serviço que acabou de criar ou na que planeia usar.

  3. Na página Detalhes da conta de serviço, registe o número do ID exclusivo.

    Precisa do ID como parte do fluxo de trabalho para configurar o fornecedor de identidade e o conjunto na consola do Confluent Cloud.

Adicione a função de criador de tokens da conta de serviço à conta de serviço do Pub/Sub

A função de criador de tokens de contas de serviço (roles/iam.serviceAccountTokenCreator) permite que os principais criem credenciais de curta duração para uma conta de serviço. Estes tokens ou credenciais são usados para se fazer passar pela conta de serviço.

Para mais informações sobre a simulação da conta de serviço, consulte o artigo Simulação da conta de serviço.

Também pode adicionar a função de publicador do Pub/Sub (roles/pubsub.publisher) durante este procedimento. Para mais informações sobre a função e o motivo pelo qual a está a adicionar, consulte o artigo Adicione a função de publicador do Pub/Sub à conta de serviço do Pub/Sub.

  1. Na Trusted Cloud consola, aceda à página IAM.

    Aceda ao IAM

  2. Clique na caixa de verificação Incluir concessões de funções fornecidas pela Google. S3NS

  3. Procure a conta de serviço com o formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. Para esta conta de serviço, clique no botão Editar principal.

  5. Se necessário, clique em Adicionar outra função.

  6. Pesquise e clique na função Criador de tokens de conta de serviço (roles/iam.serviceAccountTokenCreator).

  7. Clique em Guardar.

Crie um Fornecedor de identidade no Confluent Cloud

Para fazer a autenticação no Confluent Cloud, a conta de serviço do Google Cloud precisa de um conjunto de identidades. Primeiro, tem de criar um fornecedor de identidade no Confluent Cloud.

Para mais informações sobre como criar um fornecedor de identidade no Confluent Cloud, visite a página Adicione um fornecedor de identidade OAuth/OIDC.

  1. Inicie sessão na consola do Confluent Cloud.

  2. No menu, clique em Contas e acesso.

  3. Clique em Identidades de carga de trabalho.

  4. Clique em Adicionar fornecedor.

  5. Clique em OAuth/OIDC e, de seguida, em Seguinte.

  6. Clique em Outro fornecedor OIDC e, de seguida, em Seguinte.

  7. Indique um nome e uma descrição da finalidade do fornecedor de identidade.

  8. Clique em Mostrar configuração avançada.

  9. No campo URI do emissor, introduza https://accounts.google.com.

  10. No campo URI JWKS, introduza https://www.googleapis.com/oauth2/v3/certs.

  11. Clique em Validar e guardar.

Crie um Identity Pool e conceda as funções adequadas no Confluent Cloud

Tem de criar um conjunto de identidades no seu perfil de identidade e conceder as funções necessárias para permitir que a conta de serviço do Pub/Sub se autentique e leia a partir de tópicos do Confluent Cloud Kafka.

Certifique-se de que o cluster é criado no Confluent Cloud antes de prosseguir com a criação de um conjunto de identidades.

Para mais informações sobre como criar um conjunto de identidades, visite a página Use conjuntos de identidades com o seu fornecedor de identidade OAuth/OIDC.

  1. Inicie sessão na consola do Confluent Cloud.

  2. No menu, clique em Contas e acesso.

  3. Clique em Identidades de carga de trabalho.

  4. Clique no Fornecedor de identidade que criou em Crie um Fornecedor de identidade no Confluent Cloud.

  5. Clique em Adicionar conjunto.

  6. Indique um nome e uma descrição para o pool de identidades.

  7. Defina a reivindicação de identidade como claims.

  8. Em Definir filtros, clique no separador Avançadas. Introduza o seguinte código:

    claims.iss=='https://accounts.google.com' && claims.sub=='<SERVICE_ACCOUNT_UNIQUE_ID>'
    

    Substitua <SERVICE_ACCOUNT_UNIQUE_ID> pelo ID exclusivo da sua conta de serviço que encontra em Registe o ID exclusivo da conta de serviço.

  9. Clicar em Seguinte.

  10. Clique em Adicionar nova autorização. Em seguida, clique em Seguinte.

  11. No cluster relevante, clique em Adicionar atribuição de função.

  12. Clique na função Operador e, de seguida, em Adicionar.

    Esta função concede acesso ao Pub/Sub. Acesso da conta de serviço ao cluster que contém o tópico do Confluent Kafka que quer carregar para o Pub/Sub.

  13. Abaixo do cluster, clique em Tópicos. Em seguida, clique em Adicionar atribuição de função.

  14. Selecione a função DeveloperRead.

  15. Clique na opção adequada e especifique o tópico ou o prefixo. Por exemplo, Tópico específico, Regra de prefixo ou Todos os tópicos.

  16. Clique em Adicionar.

  17. Clicar em Seguinte.

  18. Clique em Validar e guardar.

Adicione a função de publicador do Pub/Sub ao principal do Pub/Sub

Para ativar a publicação, tem de atribuir uma função de publicador à conta de serviço do Pub/Sub para que o Pub/Sub possa publicar no tópico de importação do Confluent Cloud.

Adicione a função de agente de serviço do Pub/Sub à conta de serviço do Pub/Sub

Para permitir que o Pub/Sub use a quota de publicação do projeto do tópico de importação, o agente de serviço do Pub/Sub requer a autorização serviceusage.services.use no projeto do tópico de importação.

Para conceder esta autorização, recomendamos que adicione a função de agente do serviço Pub/Sub à conta de serviço Pub/Sub.

Se a conta de serviço do Pub/Sub não tiver a função de agente de serviço do Pub/Sub, pode ser concedida da seguinte forma:

  1. Na Trusted Cloud consola, aceda à página IAM.

    Aceda ao IAM

  2. Clique na caixa de verificação Incluir concessões de funções fornecidas pela Google. S3NS

  3. Procure a conta de serviço com o formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. Para esta conta de serviço, clique no botão Editar principal.

  5. Se necessário, clique em Adicionar outra função.

  6. Pesquise e clique na função de agente do serviço Pub/Sub (roles/pubsub.serviceAgent).

  7. Clique em Guardar.

Ative a publicação de todos os tópicos

Use este método se não tiver criado tópicos de importação do Confluent Cloud.

  1. Na Trusted Cloud consola, aceda à página IAM.

    Aceda ao IAM

  2. Clique na caixa de verificação Incluir concessões de funções fornecidas pela Google. S3NS

  3. Procure a conta de serviço com o formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. Para esta conta de serviço, clique no botão Editar principal.

  5. Se necessário, clique em Adicionar outra função.

  6. Pesquise e clique na função de publicador do Pub/Sub (roles/pubsub.publisher).

  7. Clique em Guardar.

Ative a publicação a partir de um único tópico

Use este método apenas se o tópico de importação do Confluent Cloud já existir.

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    Substitua o seguinte:

    • TOPIC_ID: o ID do tópico do tópico de importação do Confluent Cloud.

    • PROJECT_NUMBER: o número do projeto. Para ver o número do projeto, consulte o artigo Identificar projetos.

  3. Adicione a função de utilizador da conta de serviço à conta de serviço

    A função Utilizador da conta de serviço (roles/iam.serviceAccountUser) inclui a autorização iam.serviceAccounts.actAs que permite a um principal anexar uma conta de serviço às definições de carregamento do tópico de importação do Confluent Cloud e usar essa conta de serviço para a identidade federada.

    1. Na Trusted Cloud consola, aceda à página IAM.

      Aceda ao IAM

    2. Para o principal que está a emitir as chamadas de criação ou atualização de tópicos, clique no botão Editar principal.

    3. Se necessário, clique em Adicionar outra função.

    4. Pesquise e clique na função Utilizador da conta de serviço (roles/iam.serviceAccountUser).

    5. Clique em Guardar.

    Use tópicos de importação do Confluent Cloud

    Pode criar um novo tópico de importação ou editar um tópico existente.

    Considerações

    • A criação do tópico e da subscrição em separado, mesmo que seja feita em rápida sucessão, pode levar à perda de dados. Existe um curto período durante o qual o tópico existe sem uma subscrição. Se forem enviados dados para o tópico durante este período, estes são perdidos. Se criar primeiro o tópico, criar a subscrição e, em seguida, converter o tópico num tópico de importação, garante que não perde nenhuma mensagem durante o processo de importação.

    • Se precisar de recriar o tópico do Kafka de um tópico de importação existente com o mesmo nome, não pode apenas eliminar o tópico do Kafka e recriá-lo. Esta ação pode invalidar a gestão de deslocamentos do Pub/Sub, o que pode levar à perda de dados. Para mitigar esta situação, siga estes passos:

      • Elimine o tópico de importação do Pub/Sub.
      • Elimine o tópico do Kafka.
      • Crie o tópico do Kafka.
      • Crie o tópico de importação do Pub/Sub.
    • Os dados de um tópico do Confluent Cloud Kafka são sempre lidos a partir do desvio mais antigo.

    Crie um tópico de importação do Confluent Cloud

    Para saber mais sobre as propriedades associadas a um tópico, consulte o artigo Propriedades de um tópico.

    Certifique-se de que concluiu os seguintes procedimentos:

    Para criar um tópico de importação do Confluent Cloud, siga estes passos:

    Consola

    1. Na Google Cloud Console, aceda à página Tópicos.

      Aceda a Tópicos

    2. Clique em Criar tópico.
    3. No campo ID do tópico, introduza um ID para o tópico de importação. Para mais informações sobre a atribuição de nomes a tópicos, consulte as diretrizes de nomenclatura.
    4. Selecione Adicionar uma subscrição predefinida.
    5. Selecione Ativar carregamento.
    6. Para a origem da carregamento, selecione Confluent Cloud.
    7. Introduza os seguintes detalhes:
      1. Servidor de arranque: o servidor de arranque do cluster que contém o tópico do Kafka que está a carregar para o Pub/Sub. O formato é o seguinte: hostname:port.
      2. ID do cluster: o ID do cluster que contém o tópico do Kafka que está a carregar para o Pub/Sub.
      3. Tópico: o nome do tópico do Kafka que está a carregar para o Pub/Sub.
      4. ID do conjunto de identidades: o ID do conjunto de identidades usado para autenticar com o Confluent Cloud.
      5. Conta de serviço: a conta de serviço que criou em Crie uma conta de serviço no Google Cloud.
    8. Clique em Criar tópico.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Execute o comando gcloud pubsub topics create:
      gcloud pubsub topics create TOPIC_ID 
      --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER
      --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID
      --confluent-cloud-ingestion-topic CONFLUENT_TOPIC
      --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID
      --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      Substitua o seguinte:

      • TOPIC_ID: o nome ou o ID do seu tópico do Pub/Sub.
      • CONFLUENT_BOOTSTRAP_SERVER: o servidor de arranque do seu cluster que contém o tópico do Kafka que está a carregar para o Pub/Sub. O formato é o seguinte: hostname:port.
      • CONFLUENT_CLUSTER_ID: o ID do cluster que contém o tópico do Kafka que está a carregar para o Pub/Sub.
      • CONFLUENT_TOPIC: o nome do tópico do Kafka que está a carregar para o Pub/Sub.
      • CONFLUENT_IDENTITY_POOL_ID: o ID do conjunto de identidades usado para autenticar com o Confluent Cloud.
      • PUBSUB_SERVICE_ACCOUNT: a conta de serviço que criou em Crie uma conta de serviço no Google Cloud.
    3. C++

      Antes de experimentar este exemplo, siga as instruções de configuração do C++ no artigo Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API C++ do Pub/Sub.

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string const& bootstrap_server,
         std::string const& cluster_id, std::string const& confluent_topic,
         std::string const& identity_pool_id,
         std::string const& gcp_service_account) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto* confluent_cloud = request.mutable_ingestion_data_source_settings()
                                    ->mutable_confluent_cloud();
        confluent_cloud->set_bootstrap_server(bootstrap_server);
        confluent_cloud->set_cluster_id(cluster_id);
        confluent_cloud->set_topic(confluent_topic);
        confluent_cloud->set_identity_pool_id(identity_pool_id);
        confluent_cloud->set_gcp_service_account(gcp_service_account);
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Ir

      O exemplo seguinte usa a versão principal da biblioteca de cliente Go Pub/Sub (v2). Se ainda estiver a usar a biblioteca v1, consulte o guia de migração para a v2. Para ver uma lista de exemplos de código da v1, consulte os exemplos de código descontinuados.

      Antes de experimentar este exemplo, siga as instruções de configuração do Go em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go do Pub/Sub.

      import (
      	"context"
      	"fmt"
      	"io"
      
      	"cloud.google.com/go/pubsub"
      )
      
      func createTopicWithConfluentCloudIngestion(w io.Writer, projectID, topicID, bootstrapServer, clusterID, confluentTopic, poolID, gcpSA string) error {
      	// projectID := "my-project-id"
      	// topicID := "my-topic"
      
      	// // Confluent Cloud ingestion settings.
      	// bootstrapServer := "bootstrap-server"
      	// clusterID := "cluster-id"
      	// confluentTopic := "confluent-topic"
      	// poolID := "identity-pool-id"
      	// gcpSA := "gcp-service-account"
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	cfg := &pubsub.TopicConfig{
      		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
      			Source: &pubsub.IngestionDataSourceConfluentCloud{
      				BootstrapServer:   bootstrapServer,
      				ClusterID:         clusterID,
      				Topic:             confluentTopic,
      				IdentityPoolID:    poolID,
      				GCPServiceAccount: gcpSA,
      			},
      		},
      	}
      	t, err := client.CreateTopicWithConfig(ctx, topicID, cfg)
      	if err != nil {
      		return fmt.Errorf("CreateTopic: %w", err)
      	}
      	fmt.Fprintf(w, "Created topic with Confluent Cloud ingestion: %v\n", t)
      	return nil
      }
      

      Java

      Antes de experimentar este exemplo, siga as instruções de configuração do Java no artigo Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Java do Pub/Sub.

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      
      public class CreateTopicWithConfluentCloudIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Confluent Cloud ingestion settings.
          String bootstrapServer = "bootstrap-server";
          String clusterId = "cluster-id";
          String confluentTopic = "confluent-topic";
          String identityPoolId = "identity-pool-id";
          String gcpServiceAccount = "gcp-service-account";
      
          createTopicWithConfluentCloudIngestionExample(
              projectId,
              topicId,
              bootstrapServer,
              clusterId,
              confluentTopic,
              identityPoolId,
              gcpServiceAccount);
        }
      
        public static void createTopicWithConfluentCloudIngestionExample(
            String projectId,
            String topicId,
            String bootstrapServer,
            String clusterId,
            String confluentTopic,
            String identityPoolId,
            String gcpServiceAccount)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
      
            IngestionDataSourceSettings.ConfluentCloud confluentCloud =
                IngestionDataSourceSettings.ConfluentCloud.newBuilder()
                    .setBootstrapServer(bootstrapServer)
                    .setClusterId(clusterId)
                    .setTopic(confluentTopic)
                    .setIdentityPoolId(identityPoolId)
                    .setGcpServiceAccount(gcpServiceAccount)
                    .build();
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println(
                "Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bootstrapServer = 'url:port';
      // const clusterId = 'YOUR_CLUSTER_ID';
      // const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
      // const identityPoolId = 'pool-ID';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithConfluentCloudIngestion(
        topicNameOrId,
        bootstrapServer,
        clusterId,
        confluentTopic,
        identityPoolId,
        gcpServiceAccount,
      ) {
        // Creates a new topic with Confluent Cloud ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            confluentCloud: {
              bootstrapServer,
              clusterId,
              topic: confluentTopic,
              identityPoolId,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
      }

      Node.ts

      Antes de experimentar este exemplo, siga as instruções de configuração do Node.js em Início rápido: usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Node.js do Pub/Sub.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const bootstrapServer = 'url:port';
      // const clusterId = 'YOUR_CLUSTER_ID';
      // const confluentTopic = 'YOUR_CONFLUENT_TOPIC';
      // const identityPoolId = 'pool-ID';
      // const gcpServiceAccount = 'ingestion-account@...';
      
      // Imports the Google Cloud client library
      import {PubSub} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithConfluentCloudIngestion(
        topicNameOrId: string,
        bootstrapServer: string,
        clusterId: string,
        confluentTopic: string,
        identityPoolId: string,
        gcpServiceAccount: string,
      ) {
        // Creates a new topic with Confluent Cloud ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            confluentCloud: {
              bootstrapServer,
              clusterId,
              topic: confluentTopic,
              identityPoolId,
              gcpServiceAccount,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with Confluent Cloud ingestion.`);
      }

      Python

      Antes de experimentar este exemplo, siga as instruções de configuração do Python em Início rápido: usar bibliotecas cliente. Para mais informações, consulte a documentação de referência da API Python Pub/Sub.

      from google.cloud import pubsub_v1
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # bootstrap_server = "your-bootstrap-server"
      # cluster_id = "your-cluster-id"
      # confluent_topic = "your-confluent-topic"
      # identity_pool_id = "your-identity-pool-id"
      # gcp_service_account = "your-gcp-service-account"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              confluent_cloud=IngestionDataSourceSettings.ConfluentCloud(
                  bootstrap_server=bootstrap_server,
                  cluster_id=cluster_id,
                  topic=confluent_topic,
                  identity_pool_id=identity_pool_id,
                  gcp_service_account=gcp_service_account,
              )
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with Confluent Cloud Ingestion Settings")

    Se tiver problemas, consulte o artigo Resolução de problemas de um tópico de importação do Confluent Cloud.

    Edite um tópico de importação do Confluent Cloud Hubs

    Para editar as definições da origem de dados de carregamento de um tópico de importação do Confluent Cloud, siga estes passos:

    Consola

    1. Na Trusted Cloud consola, aceda à página Tópicos.

      Aceda a Tópicos

    2. Clique no tópico de importação do Confluent Cloud.

    3. Na página de detalhes do tópico, clique em Editar.

    4. Atualize os campos que quer alterar.

    5. Clique em Atualizar.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

      Para evitar perder as definições do tópico de importação, certifique-se de que inclui todas as definições sempre que atualizar o tópico. Se omitir algo, o Pub/Sub repõe a definição para o valor predefinido original.

    2. Execute o comando gcloud pubsub topics update com todos os sinalizadores mencionados no seguinte exemplo:

      gcloud pubsub topics update TOPIC_ID \
         --confluent-cloud-ingestion-bootstrap-server CONFLUENT_BOOTSTRAP_SERVER \
         --confluent-cloud-ingestion-cluster-id CONFLUENT_CLUSTER_ID \
         --confluent-cloud-ingestion-topic CONFLUENT_TOPIC \
         --confluent-cloud-ingestion-identity-pool-id CONFLUENT_IDENTITY_POOL_ID \
         --confluent-cloud-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      Substitua o seguinte:

      • TOPIC_ID: o nome ou o ID do seu tópico do Pub/Sub.
      • CONFLUENT_BOOTSTRAP_SERVER: o servidor de arranque do cluster que contém o tópico do Kafka que está a carregar para o Pub/Sub. O formato é o seguinte: hostname:port.
      • CONFLUENT_CLUSTER_ID: o ID do cluster que contém o tópico do Kafka que está a carregar para o Pub/Sub
      • CONFLUENT_TOPIC: o nome do tópico do Kafka que está a carregar para o Pub/Sub.
      • CONFLUENT_IDENTITY_POOL_ID: O ID do conjunto do conjunto de identidades usado para autenticar com o Confluent Cloud.
      • CONFLUENT_IDENTITY_POOL_ID: a conta de serviço que criou em Crie uma conta de serviço no Google Cloud.

    Quotas e limites

    O débito do publicador para tópicos de importação está limitado pela quota de publicação do tópico. Para mais informações, consulte o artigo Quotas e limites do Pub/Sub.

    O que se segue?

    Apache Kafka® é uma marca comercial registada da The Apache Software Foundation ou das respetivas afiliadas nos Estados Unidos e/ou noutros países.