Crea un tema de importación de Amazon Kinesis Data Streams

Un tema de importación de Amazon Kinesis Data Streams te permite transferir datos de forma continua desde Amazon Kinesis Data Streams como fuente externa a Pub/Sub. Luego, puedes transmitir los datos a cualquiera de los destinos que admite Pub/Sub.

Para obtener más información sobre los temas de importación, consulta Acerca de los temas de importación.

Antes de comenzar

Roles y permisos requeridos

Para obtener los permisos que necesitas para crear y administrar temas de importación de Amazon Kinesis Data Streams, pídele a tu administrador que te otorgue el rol de IAM de Editor de Pub/Sub (roles/pubsub.editor) en tu tema o proyecto. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

Este rol predefinido contiene los permisos necesarios para crear y administrar temas de importación de Amazon Kinesis Data Streams. Para ver los permisos exactos que son necesarios, expande la sección Permisos requeridos:

Permisos necesarios

Se requieren los siguientes permisos para crear y administrar temas de importación de Amazon Kinesis Data Streams:

  • Crea un tema de importación: pubsub.topics.create
  • Borra un tema de importación: pubsub.topics.delete
  • Obtén un tema de importación: pubsub.topics.get
  • Enumera un tema de importación: pubsub.topics.list
  • Publica en un tema de importación: pubsub.topics.publish and pubsub.serviceAgent
  • Actualiza un tema de importación: pubsub.topics.update
  • Obtén la política de IAM para un tema de importación: pubsub.topics.getIamPolicy
  • Configura la política de IAM para un tema de importación: pubsub.topics.setIamPolicy

También puedes obtener estos permisos con roles personalizados o con otros roles predefinidos.

Puedes configurar el control de acceso a nivel del proyecto y de los recursos individuales.

Configura la identidad federada para acceder a Amazon Kinesis Data Streams

La federación de identidades para cargas de trabajo permite que los Trusted Cloud servicios accedan a las cargas de trabajo que se ejecutan fuera de Trusted Cloud. Con la federación de identidades, no es necesario que mantengas ni pases credenciales a Trusted Cloud para acceder a tus recursos en otras nubes. En su lugar, puedes usar las identidades de las cargas de trabajo para autenticarte en Trusted Cloud y acceder a los recursos.

Crea una cuenta de servicio en Trusted Cloud

Este paso es opcional. Si ya tienes una cuenta de servicio, puedes usarla en este procedimiento en lugar de crear una nueva. Si usas una cuenta de servicio existente, ve a Registra el ID único de la cuenta de servicio para el siguiente paso.

En el caso de los temas de importación de Amazon Kinesis Data Streams, Pub/Sub usa la cuenta de servicio como identidad para acceder a los recursos de AWS.

Para obtener más información sobre cómo crear una cuenta de servicio, incluidos los requisitos previos, los roles y permisos necesarios, y los lineamientos de nomenclatura, consulta Crea cuentas de servicio. Después de crear una cuenta de servicio, es posible que debas esperar 60 segundos o más antes de usarla. Este comportamiento sucede porque las operaciones de lectura tienen coherencia eventual. Es posible que la nueva cuenta de servicio tarde un poco más en ser visible.

Registra el ID único de la cuenta de servicio

Necesitas un ID único de la cuenta de servicio para configurar un rol en AWS.

  1. En la Trusted Cloud consola, ve a la página de detalles de la cuenta de servicio.

    Ir a la cuenta de servicio

  2. Haz clic en la cuenta de servicio que acabas de crear o en la que planeas usar.

  3. En la página Detalles de la cuenta de servicio, registra el número de ID único.

    Necesitas el ID como parte del flujo de trabajo para configurar un rol en AWS.

Agrega el rol de creador de tokens de cuenta de servicio a la cuenta de servicio de Pub/Sub

El rol de creador de tokens de cuentas de servicio (roles/iam.serviceAccountTokenCreator) permite que las principales creen credenciales de corta duración para una cuenta de servicio. Estos tokens o credenciales se usan para suplantar la identidad de la cuenta de servicio.

Para obtener más información sobre el uso de la identidad de cuentas de servicio, consulta Uso de la identidad de cuentas de servicio.

También puedes agregar el rol de publicador de Pub/Sub (roles/pubsub.publisher) durante este procedimiento. Para obtener más información sobre el rol y por qué lo agregas, consulta Cómo agregar el rol de publicador de Pub/Sub a la cuenta de servicio de Pub/Sub.

  1. En la consola de Trusted Cloud , ve a la página IAM.

    Ir a IAM

  2. Haz clic en la casilla de verificación Incluir asignaciones de roles proporcionadas por S3NS.

  3. Busca la cuenta de servicio que tiene el formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Agregar otro rol.

  6. Busca y haz clic en el rol de creador de tokens de cuenta de servicio (roles/iam.serviceAccountTokenCreator).

  7. Haz clic en Guardar.

Crea una política en AWS

Necesitas una política en AWS para permitir que Pub/Sub se autentique en AWS, de modo que Pub/Sub pueda transferir datos desde Amazon Kinesis Data Streams.

  • Para obtener más métodos e información sobre cómo crear una política en AWS, consulta Crea políticas de IAM.

Para crear una política en AWS, sigue estos pasos:

  1. Accede a la consola de administración de AWS y abre la consola de IAM.

  2. En el panel de navegación de la consola de IAM, haz clic en Administración de acceso > Políticas.

  3. Haz clic en Crear política.

  4. En Selecciona un servicio, haz clic en Kinesis.

  5. En Acción permitida, haz clic en lo siguiente:

    • List > ListShards.

      Esta acción otorga permiso para enumerar los fragmentos en una transmisión y proporciona información sobre cada fragmento.

    • Read > SubscribeToShard.

      Esta acción otorga permiso para escuchar un fragmento específico con una mayor difusión.

    • Leer > DescribeStreamConsumer.

      Esta acción otorga permiso para obtener la descripción de un consumidor de transmisión registrado.

    Estos permisos abarcan la lectura del flujo. Pub/Sub solo admite la lectura de una transmisión de Kinesis con Enhanced Fan-Out a través de la API de Streaming SubscribeToShard.

  6. En el caso de los recursos, si deseas restringir la política a un flujo o consumidor específico (recomendado), especifica el ARN del consumidor y el ARN del flujo.

  7. Haz clic en Agregar más permisos.

  8. En Selecciona un servicio, haz clic en STS.

  9. En Acción permitida, haz clic en Write > AssumeRoleWithWebIdentity.

    Esta acción otorga permiso para obtener un conjunto de credenciales de seguridad temporales para que Pub/Sub se autentique en Amazon Kinesis Data Streams con la federación de identidades.

  10. Haz clic en Siguiente.

  11. Ingresa un nombre y una descripción para la política.

  12. Haz clic en Crear política.

Crea un rol en AWS con una política de confianza personalizada

Debes crear un rol en AWS para que Pub/Sub pueda autenticarse en AWS y transferir datos de Amazon Kinesis Data Streams.

  1. Accede a la consola de administración de AWS y abre la consola de IAM.

  2. En el panel de navegación de la consola de IAM, haz clic en Roles.

  3. Haz clic en Crear rol.

  4. En Seleccionar entidad de confianza, haz clic en Política de confianza personalizada.

  5. En la sección Política de confianza personalizada, ingresa o pega lo siguiente:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    Reemplaza <SERVICE_ACCOUNT_UNIQUE_ID> por el ID único de la cuenta de servicio que registraste en Registra el ID único de la cuenta de servicio.

  6. Haz clic en Siguiente.

  7. En Agregar permisos, busca y haz clic en la política personalizada que acabas de crear.

  8. Haz clic en Siguiente.

  9. Ingresa un nombre y una descripción para el rol.

  10. Haz clic en Crear rol.

Agrega el rol de publicador de Pub/Sub a la principal de Pub/Sub

Para habilitar la publicación, debes asignar un rol de publicador a la cuenta de servicio de Pub/Sub para que Pub/Sub pueda publicar en el tema de importación de Amazon Kinesis Data Streams.

Agrega el rol de agente de servicio de Pub/Sub a la cuenta de servicio de Pub/Sub

Para permitir que Pub/Sub use la cuota de publicación del proyecto del tema de importación, el agente de servicio de Pub/Sub requiere el permiso serviceusage.services.use en el proyecto del tema de importación.

Para otorgar este permiso, te recomendamos que agregues el rol de agente de servicio de Pub/Sub a la cuenta de servicio de Pub/Sub.

Si la cuenta de servicio de Pub/Sub no tiene el rol de agente de servicio de Pub/Sub, se puede otorgar de la siguiente manera:

  1. En la consola de Trusted Cloud , ve a la página IAM.

    Ir a IAM

  2. Haz clic en la casilla de verificación Incluir asignaciones de roles proporcionadas por S3NS.

  3. Busca la cuenta de servicio que tiene el formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Agregar otro rol.

  6. Busca y haz clic en el rol de agente de servicio de Pub/Sub (roles/pubsub.serviceAgent).

  7. Haz clic en Guardar.

Habilita la publicación desde todos los temas

Usa este método si no creaste ningún tema de importación de Amazon Kinesis Data Streams.

  1. En la consola de Trusted Cloud , ve a la página IAM.

    Ir a IAM

  2. Haz clic en la casilla de verificación Incluir asignaciones de roles proporcionadas por S3NS.

  3. Busca la cuenta de servicio que tiene el formato service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. En esta cuenta de servicio, haz clic en el botón Editar principal.

  5. Si es necesario, haz clic en Agregar otro rol.

  6. Busca y haz clic en el rol de publicador de Pub/Sub (roles/pubsub.publisher).

  7. Haz clic en Guardar.

Habilita la publicación desde un solo tema

Usa este método solo si ya existe el tema de importación de Amazon Kinesis Data Streams.

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Ejecuta el comando gcloud pubsub topics add-iam-policy-binding:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    Reemplaza lo siguiente:

    • TOPIC_ID: Es el ID del tema de importación de Amazon Kinesis Data Streams.

    • PROJECT_NUMBER: Número del proyecto Para ver el número de proyecto, consulta Identifica proyectos.

  3. Agrega el rol de usuario de cuenta de servicio a la cuenta de servicio

    El rol de usuario de cuenta de servicio (roles/iam.serviceAccountUser) incluye el permiso iam.serviceAccounts.actAs que permite que una principal adjunte una cuenta de servicio a la configuración de transferencia del tema de importación de Amazon Kinesis Data Streams y use esa cuenta de servicio para la identidad federada.

    1. En la consola de Trusted Cloud , ve a la página IAM.

      Ir a IAM

    2. En el principal que emite las llamadas para crear o actualizar temas, haz clic en el botón Editar principal.

    3. Si es necesario, haz clic en Agregar otro rol.

    4. Busca y haz clic en el rol de usuario de cuenta de servicio (roles/iam.serviceAccountUser).

    5. Haz clic en Guardar.

    Usa temas de Amazon Kinesis Data Streams

    Puedes crear un tema de importación nuevo o editar uno existente.

    Consideraciones

    • Crear el tema y la suscripción por separado, incluso si se hace de forma rápida y sucesiva, puede provocar la pérdida de datos. Hay un período breve en el que el tema existe sin una suscripción. Si se envían datos al tema durante este período, se perderán. Si primero creas el tema, luego la suscripción y, por último, conviertes el tema en un tema de importación, garantizas que no se pierda ningún mensaje durante el proceso de importación.

    Crea un tema de importación de Amazon Kinesis Data Streams

    Para obtener más información sobre las propiedades asociadas a un tema, consulta Propiedades de un tema.

    Asegúrate de haber completado los siguientes procedimientos:

    Para crear un tema de importación de Amazon Kinesis Data Streams, sigue estos pasos:

    Console

    1. En la consola de Trusted Cloud , ve a la página Temas.

      Ir a temas

    2. Haz clic en Crear un tema.

    3. En el campo ID del tema, ingresa un ID para el tema de importación de Amazon Kinesis Data Streams.

      Para obtener más información sobre cómo nombrar temas, consulta los lineamientos para asignar nombres.

    4. Selecciona Agregar una suscripción predeterminada.

    5. Selecciona Habilitar transferencia.

    6. En la fuente de transferencia, selecciona Amazon Kinesis Data Streams.

    7. Ingresa los siguientes detalles:

      • ARN de transmisión de Kinesis: Es el ARN de la transmisión de datos de Kinesis que planeas transferir a Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

      • ARN del consumidor de Kinesis: Es el ARN del recurso de consumidor que está registrado en la transmisión de datos de AWS Kinesis. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.

      • ARN de rol de AWS: Es el ARN del rol de AWS. El formato del ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}

      • Cuenta de servicio: La cuenta de servicio que creaste en Crea una cuenta de servicio en Trusted Cloud.

    8. Haz clic en Crear un tema.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Ejecuta el comando gcloud pubsub topics create:

      gcloud pubsub topics create TOPIC_ID \
          --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN \
          --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN \
          --kinesis-ingestion-role-arn KINESIS_ROLE_ARN \
          --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

      Reemplaza lo siguiente:

      • TOPIC_ID: Es el ID del tema.
      • KINESIS_STREAM_ARN: Es el ARN de los flujos de datos de Kinesis que planeas transferir a Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.
      • KINESIS_CONSUMER_ARN: ARN del recurso de consumidor registrado en AWS Kinesis Data Streams. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.
      • KINESIS_ROLE_ARN: ARN del rol de AWS. El formato del ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.
      • PUBSUB_SERVICE_ACCOUNT: Es la cuenta de servicio que creaste en Crea una cuenta de servicio en Google Cloud.
    3. C++

      Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string stream_arn, std::string consumer_arn,
         std::string aws_role_arn, std::string gcp_service_account) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto* aws_kinesis =
            request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
        aws_kinesis->set_stream_arn(stream_arn);
        aws_kinesis->set_consumer_arn(consumer_arn);
        aws_kinesis->set_aws_role_arn(aws_role_arn);
        aws_kinesis->set_gcp_service_account(gcp_service_account);
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      En el siguiente ejemplo, se usa la versión principal de la biblioteca cliente de Pub/Sub de Go (v2). Si aún usas la biblioteca de la versión 1, consulta la guía de migración a la versión 2. Para ver una lista de muestras de código de la versión 1, consulta las muestras de código obsoletas.

      Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

      import (
      	"context"
      	"fmt"
      	"io"
      
      	pubsub "cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      )
      
      func createTopicWithKinesisIngestion(w io.Writer, projectID, topic string) error {
      	// projectID := "my-project-id"
      	// topicID := "projects/my-project-id/topics/my-topic"
      	streamARN := "stream-arn"
      	consumerARN := "consumer-arn"
      	awsRoleARN := "aws-role-arn"
      	gcpServiceAccount := "gcp-service-account"
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	topicpb := &pubsubpb.Topic{
      		Name: topic,
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
      				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
      					StreamArn:         streamARN,
      					ConsumerArn:       consumerARN,
      					AwsRoleArn:        awsRoleARN,
      					GcpServiceAccount: gcpServiceAccount,
      				},
      			},
      		},
      	}
      	topicpb, err = client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("failed to create topic with kinesis: %w", err)
      	}
      	fmt.Fprintf(w, "Kinesis topic created: %v\n", topicpb)
      	return nil
      }
      

      Java

      Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      
      public class CreateTopicWithKinesisIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Kinesis ingestion settings.
          String streamArn = "stream-arn";
          String consumerArn = "consumer-arn";
          String awsRoleArn = "aws-role-arn";
          String gcpServiceAccount = "gcp-service-account";
      
          createTopicWithKinesisIngestionExample(
              projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
        }
      
        public static void createTopicWithKinesisIngestionExample(
            String projectId,
            String topicId,
            String streamArn,
            String consumerArn,
            String awsRoleArn,
            String gcpServiceAccount)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
      
            IngestionDataSourceSettings.AwsKinesis awsKinesis =
                IngestionDataSourceSettings.AwsKinesis.newBuilder()
                    .setStreamArn(streamArn)
                    .setConsumerArn(consumerArn)
                    .setAwsRoleArn(awsRoleArn)
                    .setGcpServiceAccount(gcpServiceAccount)
                    .build();
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      // const streamArn = 'arn:aws:kinesis:...';
      // const consumerArn = 'arn:aws:kinesis:...';
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithKinesisIngestion(
        topicNameOrId,
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      ) {
        // Creates a new topic with Kinesis ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsKinesis: {
              awsRoleArn,
              gcpServiceAccount,
              streamArn,
              consumerArn,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
      }

      Node.ts

      Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      // const streamArn = 'arn:aws:kinesis:...';
      // const consumerArn = 'arn:aws:kinesis:...';
      
      // Imports the Google Cloud client library
      import {PubSub} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithKinesisIngestion(
        topicNameOrId: string,
        awsRoleArn: string,
        gcpServiceAccount: string,
        streamArn: string,
        consumerArn: string,
      ) {
        // Creates a new topic with Kinesis ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsKinesis: {
              awsRoleArn,
              gcpServiceAccount,
              streamArn,
              consumerArn,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
      }

      Python

      Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

      from google.cloud import pubsub_v1
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # stream_arn = "your-stream-arn"
      # consumer_arn = "your-consumer-arn"
      # aws_role_arn = "your-aws-role-arn"
      # gcp_service_account = "your-gcp-service-account"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                  stream_arn=stream_arn,
                  consumer_arn=consumer_arn,
                  aws_role_arn=aws_role_arn,
                  gcp_service_account=gcp_service_account,
              )
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

    Para obtener más información sobre los ARN, consulta Amazon Resource Names (ARNs) y IAM Identifiers.

    Si tienes problemas, consulta Soluciona problemas relacionados con un tema de importación de Amazon Kinesis Data Streams.

    Cómo editar un tema de importación de Amazon Kinesis Data Streams

    Puedes editar la configuración de la fuente de datos de transferencia de un tema de importación de Amazon Kinesis Data Streams. Sigue los siguientes pasos:

    Console

    1. En la consola de Trusted Cloud , ve a la página Temas.

      Ir a temas

    2. Haz clic en el tema de importación de Amazon Kinesis Data Streams.

    3. En la página de detalles del tema, haz clic en Editar.

    4. Actualiza los campos que deseas cambiar.

    5. Haz clic en Actualizar.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Para no perder la configuración del tema de importación, asegúrate de incluirla toda cada vez que actualices el tema. Si omites algo, Pub/Sub restablece el parámetro de configuración a su valor predeterminado original.

      Ejecuta el comando gcloud pubsub topics update con todas las marcas mencionadas en el siguiente ejemplo:

      gcloud pubsub topics update TOPIC_ID 
      --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
      --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
      --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
      --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      Reemplaza lo siguiente:

      • TOPIC_ID es el ID del tema. Este campo no se puede actualizar.

      • KINESIS_STREAM_ARN es el ARN de Kinesis Data Streams que planeas transferir a Pub/Sub. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

      • KINESIS_CONSUMER_ARN es el ARN del recurso de consumidor que está registrado en AWS Kinesis Data Streams. El formato del ARN es el siguiente: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.

      • KINESIS_ROLE_ARN es el ARN del rol de AWS. El formato del ARN del rol es el siguiente: arn:aws:iam::${Account}:role/${RoleName}.

      • PUBSUB_SERVICE_ACCOUNT es la cuenta de servicio que creaste en Crea una cuenta de servicio en Trusted Cloud.

    Cuotas y límites para los temas de importación de Amazon Kinesis Data Streams

    El rendimiento del publicador para los temas de importación está limitado por la cuota de publicación del tema. Para obtener más información, consulta Cuotas y límites de Pub/Sub.

    ¿Qué sigue?

    Apache Kafka® es una marca registrada de The Apache Software Foundation o sus afiliados en Estados Unidos y otros países.