Amazon Kinesis Data Streams-Importthema erstellen

Mit einem Amazon Kinesis Data Streams-Importthema können Sie Daten kontinuierlich aus Amazon Kinesis Data Streams als externe Quelle in Pub/Sub aufnehmen. Anschließend können Sie die Daten in ein beliebiges Ziel streamen, das von Pub/Sub unterstützt wird.

Weitere Informationen zu Importthemen finden Sie unter Importthemen.

Hinweise

Erforderliche Rollen und Berechtigungen

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Pub/Sub-Bearbeiter (roles/pubsub.editor) für Ihr Thema oder Projekt zuzuweisen, damit Sie die Berechtigungen zum Erstellen und Verwalten von Importthemen für Amazon Kinesis Data Streams erhalten. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen und Verwalten von Importthemen für Amazon Kinesis Data Streams erforderlich sind. Erweitern Sie den Abschnitt Erforderliche Berechtigungen, um die erforderlichen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind erforderlich, um Amazon Kinesis Data Streams-Importthemen zu erstellen und zu verwalten:

  • Importthema erstellen: pubsub.topics.create
  • Importthema löschen: pubsub.topics.delete
  • Importthema abrufen: pubsub.topics.get
  • Importthema auflisten: pubsub.topics.list
  • In einem Importthema veröffentlichen: pubsub.topics.publish and pubsub.serviceAgent
  • Importthema aktualisieren: pubsub.topics.update
  • IAM-Richtlinie für ein Importthema abrufen: pubsub.topics.getIamPolicy
  • Konfigurieren Sie die IAM-Richtlinie für ein Importthema: pubsub.topics.setIamPolicy

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Sie können die Zugriffssteuerung auf Projekt- und auf der Ebene einzelner Ressourcen konfigurieren.

Föderierte Identität für den Zugriff auf Amazon Kinesis Data Streams einrichten

Mit der Workload Identity-Föderation können Trusted Cloud Dienste auf Arbeitslasten zugreifen, die außerhalb von Trusted Cloudausgeführt werden. Bei der Identitätsföderation müssen Sie keine Anmeldedaten für Trusted Cloud verwalten oder übergeben, um auf Ihre Ressourcen in anderen Clouds zuzugreifen. Stattdessen können Sie die Identitäten der Arbeitslasten selbst verwenden, um sich bei Trusted Cloud zu authentifizieren und auf Ressourcen zuzugreifen.

Dienstkonto in Trusted Clouderstellen

Dieser Schritt ist optional. Wenn Sie bereits ein Dienstkonto haben, können Sie es in diesem Verfahren verwenden, anstatt ein neues Dienstkonto zu erstellen. Wenn Sie ein vorhandenes Dienstkonto verwenden, fahren Sie mit dem nächsten Schritt fort: Eindeutige ID des Dienstkontos notieren.

Bei Importthemen für Amazon Kinesis Data Streams verwendet Pub/Sub das Dienstkonto als Identität für den Zugriff auf Ressourcen von AWS.

Weitere Informationen zum Erstellen eines Dienstkontos, einschließlich Voraussetzungen, erforderlicher Rollen und Berechtigungen sowie Namensrichtlinien, finden Sie unter Dienstkonten erstellen. Nachdem Sie ein Dienstkonto erstellt haben, müssen Sie möglicherweise 60 Sekunden oder länger warten, bis Sie das Dienstkonto verwenden können. Dieses Verhalten tritt auf, weil Lesevorgänge Eventual Consistency haben. Es kann einige Zeit dauern, bis das neue Dienstkonto sichtbar ist.

Eindeutige ID des Dienstkontos notieren

Sie benötigen eine eindeutige Dienstkonto-ID, um eine Rolle in AWS einzurichten.

  1. Rufen Sie in der Trusted Cloud Console die Detailseite Dienstkonto auf.

    Zum Dienstkonto

  2. Klicken Sie auf das Dienstkonto, das Sie gerade erstellt haben oder das Sie verwenden möchten.

  3. Notieren Sie sich auf der Seite Dienstkontodetails die eindeutige ID-Nummer.

    Sie benötigen die ID als Teil des Workflows, um eine Rolle in AWS einzurichten.

Rolle „Ersteller von Dienstkonto-Tokens“ dem Pub/Sub-Dienstkonto hinzufügen

Mit der Rolle „Ersteller von Dienstkonto-Tokens“ (roles/iam.serviceAccountTokenCreator) können Hauptkonten kurzlebige Anmeldedaten für ein Dienstkonto erstellen. Diese Tokens oder Anmeldedaten werden verwendet, um die Identität des Dienstkontos zu übernehmen.

Weitere Informationen zu Identitätswechsel für Dienstkonten.

Sie können auch die Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher) hinzufügen. Weitere Informationen zur Rolle und dazu, warum Sie sie hinzufügen, finden Sie unter Pub/Sub-Publisher-Rolle dem Pub/Sub-Dienstkonto hinzufügen.

  1. Rufen Sie in der Trusted Cloud Console die Seite IAM auf.

    IAM aufrufen

  2. Klicken Sie auf das Kästchen Von S3NSbereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Dienstkonto im Format service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Suchen Sie nach der Rolle „Ersteller von Dienstkonto-Tokens“ (roles/iam.serviceAccountTokenCreator) und klicken Sie darauf.

  7. Klicken Sie auf Speichern.

Richtlinie in AWS erstellen

Sie benötigen eine Richtlinie in AWS, damit Pub/Sub sich bei AWS authentifizieren kann und Daten aus Amazon Kinesis Data Streams aufnehmen kann.

So erstellen Sie eine Richtlinie in AWS:

  1. Melden Sie sich in der AWS Management Console an und öffnen Sie die IAM-Konsole.

  2. Klicken Sie in der Console im Navigationsbereich für IAM auf Zugriffsverwaltung > Richtlinien.

  3. Klicken Sie auf Richtlinie erstellen.

  4. Klicken Sie unter Select a service (Dienst auswählen) auf Kinesis.

  5. Klicken Sie unter Zulässige Aktion auf Folgendes:

    • Liste > ListShards.

      Mit dieser Aktion wird die Berechtigung zum Auflisten der Shards in einem Stream gewährt und es werden Informationen zu den einzelnen Shards bereitgestellt.

    • Lesen > SubscribeToShard.

      Mit dieser Aktion wird die Berechtigung erteilt, einen bestimmten Shard mit erweitertem Fan-out zu überwachen.

    • Lesen > DescribeStreamConsumer.

      Mit dieser Aktion wird die Berechtigung zum Abrufen der Beschreibung eines registrierten Stream-Consumers erteilt.

    Diese Berechtigungen umfassen das Lesen aus dem Stream. Pub/Sub unterstützt das Lesen aus einem Kinesis-Stream mit Enhanced Fan-Out nur über die Streaming-API „SubscribeToShard“.

  6. Wenn Sie die Richtlinie für Ressourcen auf einen bestimmten Stream oder Nutzer beschränken möchten (empfohlen), geben Sie den Nutzer-ARN und den Stream-ARN an.

  7. Klicken Sie auf Weitere Berechtigungen hinzufügen.

  8. Klicken Sie unter Dienst auswählen auf STS.

  9. Klicken Sie für Action allowed (Aktion zulässig) auf Write (Schreiben) > AssumeRoleWithWebIdentity.

    Mit dieser Aktion wird die Berechtigung erteilt, eine Reihe temporärer Sicherheitsanmeldedaten für Pub/Sub abzurufen, um sich mithilfe der Identitätsföderation bei Amazon Kinesis Data Streams zu authentifizieren.

  10. Klicken Sie auf Weiter.

  11. Geben Sie einen Richtliniennamen und eine Beschreibung ein.

  12. Klicken Sie auf Richtlinie erstellen.

Rolle in AWS mit einer benutzerdefinierten Trust-Richtlinie erstellen

Sie müssen eine Rolle in AWS erstellen, damit Pub/Sub sich bei AWS authentifizieren kann, um Daten aus Amazon Kinesis Data Streams aufzunehmen.

  1. Melden Sie sich in der AWS Management Console an und öffnen Sie die IAM-Konsole.

  2. Klicken Sie im Navigationsbereich der Console für IAM auf Rollen.

  3. Klicken Sie auf Rolle erstellen.

  4. Klicken Sie unter Vertrauenswürdige Entität auswählen auf Benutzerdefinierte Vertrauensrichtlinie.

  5. Geben Sie im Abschnitt Benutzerdefinierte Vertrauensrichtlinie Folgendes ein oder fügen Sie es ein:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
         "Effect": "Allow",
         "Principal": {
            "Federated": "accounts.google.com"
         },
         "Action": "sts:AssumeRoleWithWebIdentity",
         "Condition": {
             "StringEquals": {
               "accounts.google.com:sub": "<SERVICE_ACCOUNT_UNIQUE_ID>"
             }
          }
        }
      ]
    }
    

    Ersetzen Sie <SERVICE_ACCOUNT_UNIQUE_ID> durch die eindeutige ID des Dienstkontos, die Sie unter Eindeutige ID des Dienstkontos notieren notiert haben.

  6. Klicken Sie auf Weiter.

  7. Suchen Sie unter Berechtigungen hinzufügen nach der benutzerdefinierten Richtlinie, die Sie gerade erstellt haben, und klicken Sie darauf.

  8. Klicken Sie auf Weiter.

  9. Geben Sie einen Rollennamen und eine Beschreibung ein.

  10. Klicken Sie auf Rolle erstellen.

Pub/Sub-Publisher-Rolle dem Pub/Sub-Principal hinzufügen

Damit die Veröffentlichung möglich ist, müssen Sie dem Pub/Sub-Dienstkonto eine Publisher-Rolle zuweisen, damit Pub/Sub im Amazon Kinesis Data Streams-Importthema veröffentlichen kann.

Pub/Sub-Dienst-Agent-Rolle dem Pub/Sub-Dienstkonto hinzufügen

Damit Pub/Sub das Veröffentlichungskontingent des Projekts Ihres Importthemas verwenden kann, benötigt der Pub/Sub-Dienst-Agent die Berechtigung serviceusage.services.use für das Projekt Ihres Importthemas.

Um diese Berechtigung zu erteilen, empfehlen wir, dem Pub/Sub-Dienstkonto die Rolle „Pub/Sub-Dienst-Agent“ hinzuzufügen.

Wenn das Pub/Sub-Dienstkonto nicht über die Rolle „Pub/Sub-Dienst-Agent“ verfügt, kann sie so zugewiesen werden:

  1. Rufen Sie in der Trusted Cloud Console die Seite IAM auf.

    IAM aufrufen

  2. Klicken Sie auf das Kästchen Von S3NSbereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Dienstkonto im Format service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Suchen Sie nach der Rolle „Pub/Sub-Dienst-Agent“ (roles/pubsub.serviceAgent) und klicken Sie darauf.

  7. Klicken Sie auf Speichern.

Veröffentlichung von allen Themen aus aktivieren

Verwenden Sie diese Methode, wenn Sie noch keine Importthemen für Amazon Kinesis Data Streams erstellt haben.

  1. Rufen Sie in der Trusted Cloud Console die Seite IAM auf.

    IAM aufrufen

  2. Klicken Sie auf das Kästchen Von S3NSbereitgestellte Rollenzuweisungen einschließen.

  3. Suchen Sie nach dem Dienstkonto im Format service-{PROJECT_NUMBER}@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com.

  4. Klicken Sie für dieses Dienstkonto auf die Schaltfläche Hauptkonto bearbeiten.

  5. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

  6. Suchen Sie nach der Rolle „Pub/Sub-Publisher“ (roles/pubsub.publisher) und klicken Sie darauf.

  7. Klicken Sie auf Speichern.

Veröffentlichung aus einem einzelnen Thema aktivieren

Verwenden Sie diese Methode nur, wenn das Amazon Kinesis Data Streams-Importthema bereits vorhanden ist.

  1. In the Trusted Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud pubsub topics add-iam-policy-binding aus:

    gcloud pubsub topics add-iam-policy-binding TOPIC_ID \
       --member="serviceAccount:service-PROJECT_NUMBER@gcp-sa-pubsub.s3ns-system.iam.gserviceaccount.com" \
       --role="roles/pubsub.publisher"

    Ersetzen Sie Folgendes:

    • TOPIC_ID: die Themen-ID des Amazon Kinesis Data Streams-Importthemas.

    • PROJECT_NUMBER: die Projektnummer Informationen zum Aufrufen der Projektnummer finden Sie unter Projekte identifizieren.

  3. Rolle „Dienstkontonutzer“ dem Dienstkonto hinzufügen

    Die Rolle „Dienstkontonutzer“ (roles/iam.serviceAccountUser) umfasst die Berechtigung iam.serviceAccounts.actAs, mit der ein Hauptkonto ein Dienstkonto an die Aufnahmeeinstellungen des Importthemas für Amazon Kinesis Data Streams anhängen und dieses Dienstkonto für die Identitätsföderation verwenden kann.

    1. Rufen Sie in der Trusted Cloud Console die Seite IAM auf.

      IAM aufrufen

    2. Klicken Sie für das Hauptkonto, das die Aufrufe zum Erstellen oder Aktualisieren von Themen ausgibt, auf die Schaltfläche Hauptkonto bearbeiten.

    3. Klicken Sie bei Bedarf auf Weitere Rolle hinzufügen.

    4. Suchen Sie nach der Rolle „Dienstkontonutzer“ (roles/iam.serviceAccountUser) und klicken Sie darauf.

    5. Klicken Sie auf Speichern.

    Amazon Kinesis Data Streams-Themen verwenden

    Sie können ein neues Importthema erstellen oder ein vorhandenes Thema bearbeiten.

    Hinweise

    • Wenn Sie das Thema und das Abo separat erstellen, kann es zu Datenverlusten kommen, auch wenn Sie die Schritte schnell nacheinander ausführen. Es gibt einen kurzen Zeitraum, in dem das Thema ohne Abo vorhanden ist. Wenn während dieser Zeit Daten an das Thema gesendet werden, gehen sie verloren. Wenn Sie zuerst das Thema und dann das Abo erstellen und das Thema anschließend in ein Importthema umwandeln, gehen während des Importvorgangs keine Nachrichten verloren.

    Amazon Kinesis Data Streams-Importthema erstellen

    Weitere Informationen zu den Attributen eines Themas

    Achten Sie darauf, dass Sie die folgenden Schritte ausgeführt haben:

    So erstellen Sie ein Amazon Kinesis Data Streams-Importthema:

    Console

    1. Rufen Sie in der Trusted Cloud Console die Seite Themen auf.

      Themen aufrufen

    2. Klicken Sie auf Thema erstellen.

    3. Geben Sie im Feld Themen-ID eine ID für das Importthema für Amazon Kinesis Data Streams ein.

      Weitere Informationen zur Benennung von Themen finden Sie in den Benennungsrichtlinien.

    4. Wählen Sie Standardabo hinzufügen aus.

    5. Wählen Sie Aufnahme aktivieren aus.

    6. Wählen Sie als Quelle für die Aufnahme Amazon Kinesis Data Streams aus.

    7. Geben Sie die folgenden Informationen ein:

      • ARN des Kinesis-Streams: Der ARN für den Kinesis Data Stream, den Sie in Pub/Sub aufnehmen möchten. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

      • ARN des Kinesis-Nutzers: Der ARN der Consumer-Ressource, die im AWS Kinesis Data Stream registriert ist. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.

      • ARN der AWS-Rolle: Der ARN der AWS-Rolle. Der ARN der Rolle hat folgendes Format: arn:aws:iam::${Account}:role/${RoleName}

      • Dienstkonto: Das Dienstkonto, das Sie in Dienstkonto in Trusted Clouderstellen erstellt haben.

    8. Klicken Sie auf Thema erstellen.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Führen Sie den Befehl gcloud pubsub topics create aus:

      gcloud pubsub topics create TOPIC_ID \
          --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN \
          --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN \
          --kinesis-ingestion-role-arn KINESIS_ROLE_ARN \
          --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

      Ersetzen Sie Folgendes:

      • TOPIC_ID: Die Themen-ID.
      • KINESIS_STREAM_ARN: Der ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.
      • KINESIS_CONSUMER_ARN: Der ARN der Consumer-Ressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.
      • KINESIS_ROLE_ARN: Der ARN der AWS-Rolle. Der ARN der Rolle hat folgendes Format: arn:aws:iam::${Account}:role/${RoleName}.
      • PUBSUB_SERVICE_ACCOUNT: Das Dienstkonto, das Sie unter Dienstkonto in Google Cloud erstellen erstellt haben.
    3. C++

      Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

      namespace pubsub = ::google::cloud::pubsub;
      namespace pubsub_admin = ::google::cloud::pubsub_admin;
      [](pubsub_admin::TopicAdminClient client, std::string project_id,
         std::string topic_id, std::string stream_arn, std::string consumer_arn,
         std::string aws_role_arn, std::string gcp_service_account) {
        google::pubsub::v1::Topic request;
        request.set_name(
            pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
        auto* aws_kinesis =
            request.mutable_ingestion_data_source_settings()->mutable_aws_kinesis();
        aws_kinesis->set_stream_arn(stream_arn);
        aws_kinesis->set_consumer_arn(consumer_arn);
        aws_kinesis->set_aws_role_arn(aws_role_arn);
        aws_kinesis->set_gcp_service_account(gcp_service_account);
      
        auto topic = client.CreateTopic(request);
        // Note that kAlreadyExists is a possible error when the library retries.
        if (topic.status().code() == google::cloud::StatusCode::kAlreadyExists) {
          std::cout << "The topic already exists\n";
          return;
        }
        if (!topic) throw std::move(topic).status();
      
        std::cout << "The topic was successfully created: " << topic->DebugString()
                  << "\n";
      }

      Go

      Im folgenden Beispiel wird die Hauptversion der Go Pub/Sub-Clientbibliothek (v2) verwendet. Wenn Sie noch die v1-Bibliothek verwenden, finden Sie hier den Migrationsleitfaden für v2. Eine Liste der Codebeispiele für Version 1 finden Sie unter Eingestellte Codebeispiele.

      Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Go API.

      import (
      	"context"
      	"fmt"
      	"io"
      
      	pubsub "cloud.google.com/go/pubsub/v2"
      	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
      )
      
      func createTopicWithKinesisIngestion(w io.Writer, projectID, topic string) error {
      	// projectID := "my-project-id"
      	// topicID := "projects/my-project-id/topics/my-topic"
      	streamARN := "stream-arn"
      	consumerARN := "consumer-arn"
      	awsRoleARN := "aws-role-arn"
      	gcpServiceAccount := "gcp-service-account"
      
      	ctx := context.Background()
      	client, err := pubsub.NewClient(ctx, projectID)
      	if err != nil {
      		return fmt.Errorf("pubsub.NewClient: %w", err)
      	}
      	defer client.Close()
      
      	topicpb := &pubsubpb.Topic{
      		Name: topic,
      		IngestionDataSourceSettings: &pubsubpb.IngestionDataSourceSettings{
      			Source: &pubsubpb.IngestionDataSourceSettings_AwsKinesis_{
      				AwsKinesis: &pubsubpb.IngestionDataSourceSettings_AwsKinesis{
      					StreamArn:         streamARN,
      					ConsumerArn:       consumerARN,
      					AwsRoleArn:        awsRoleARN,
      					GcpServiceAccount: gcpServiceAccount,
      				},
      			},
      		},
      	}
      	topicpb, err = client.TopicAdminClient.CreateTopic(ctx, topicpb)
      	if err != nil {
      		return fmt.Errorf("failed to create topic with kinesis: %w", err)
      	}
      	fmt.Fprintf(w, "Kinesis topic created: %v\n", topicpb)
      	return nil
      }
      

      Java

      Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.

      
      import com.google.cloud.pubsub.v1.TopicAdminClient;
      import com.google.pubsub.v1.IngestionDataSourceSettings;
      import com.google.pubsub.v1.Topic;
      import com.google.pubsub.v1.TopicName;
      import java.io.IOException;
      
      public class CreateTopicWithKinesisIngestionExample {
        public static void main(String... args) throws Exception {
          // TODO(developer): Replace these variables before running the sample.
          String projectId = "your-project-id";
          String topicId = "your-topic-id";
          // Kinesis ingestion settings.
          String streamArn = "stream-arn";
          String consumerArn = "consumer-arn";
          String awsRoleArn = "aws-role-arn";
          String gcpServiceAccount = "gcp-service-account";
      
          createTopicWithKinesisIngestionExample(
              projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
        }
      
        public static void createTopicWithKinesisIngestionExample(
            String projectId,
            String topicId,
            String streamArn,
            String consumerArn,
            String awsRoleArn,
            String gcpServiceAccount)
            throws IOException {
          try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            TopicName topicName = TopicName.of(projectId, topicId);
      
            IngestionDataSourceSettings.AwsKinesis awsKinesis =
                IngestionDataSourceSettings.AwsKinesis.newBuilder()
                    .setStreamArn(streamArn)
                    .setConsumerArn(consumerArn)
                    .setAwsRoleArn(awsRoleArn)
                    .setGcpServiceAccount(gcpServiceAccount)
                    .build();
            IngestionDataSourceSettings ingestionDataSourceSettings =
                IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();
      
            Topic topic =
                topicAdminClient.createTopic(
                    Topic.newBuilder()
                        .setName(topicName.toString())
                        .setIngestionDataSourceSettings(ingestionDataSourceSettings)
                        .build());
      
            System.out.println("Created topic with Kinesis ingestion settings: " + topic.getAllFields());
          }
        }
      }

      Node.js

      Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      // const streamArn = 'arn:aws:kinesis:...';
      // const consumerArn = 'arn:aws:kinesis:...';
      
      // Imports the Google Cloud client library
      const {PubSub} = require('@google-cloud/pubsub');
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithKinesisIngestion(
        topicNameOrId,
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      ) {
        // Creates a new topic with Kinesis ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsKinesis: {
              awsRoleArn,
              gcpServiceAccount,
              streamArn,
              consumerArn,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
      }

      Node.ts

      Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

      /**
       * TODO(developer): Uncomment these variables before running the sample.
       */
      // const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
      // const roleArn = 'arn:aws:iam:...';
      // const gcpServiceAccount = 'ingestion-account@...';
      // const streamArn = 'arn:aws:kinesis:...';
      // const consumerArn = 'arn:aws:kinesis:...';
      
      // Imports the Google Cloud client library
      import {PubSub} from '@google-cloud/pubsub';
      
      // Creates a client; cache this for further use
      const pubSubClient = new PubSub();
      
      async function createTopicWithKinesisIngestion(
        topicNameOrId: string,
        awsRoleArn: string,
        gcpServiceAccount: string,
        streamArn: string,
        consumerArn: string,
      ) {
        // Creates a new topic with Kinesis ingestion.
        await pubSubClient.createTopic({
          name: topicNameOrId,
          ingestionDataSourceSettings: {
            awsKinesis: {
              awsRoleArn,
              gcpServiceAccount,
              streamArn,
              consumerArn,
            },
          },
        });
        console.log(`Topic ${topicNameOrId} created with AWS Kinesis ingestion.`);
      }

      Python

      Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

      from google.cloud import pubsub_v1
      from google.pubsub_v1.types import Topic
      from google.pubsub_v1.types import IngestionDataSourceSettings
      
      # TODO(developer)
      # project_id = "your-project-id"
      # topic_id = "your-topic-id"
      # stream_arn = "your-stream-arn"
      # consumer_arn = "your-consumer-arn"
      # aws_role_arn = "your-aws-role-arn"
      # gcp_service_account = "your-gcp-service-account"
      
      publisher = pubsub_v1.PublisherClient()
      topic_path = publisher.topic_path(project_id, topic_id)
      
      request = Topic(
          name=topic_path,
          ingestion_data_source_settings=IngestionDataSourceSettings(
              aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                  stream_arn=stream_arn,
                  consumer_arn=consumer_arn,
                  aws_role_arn=aws_role_arn,
                  gcp_service_account=gcp_service_account,
              )
          ),
      )
      
      topic = publisher.create_topic(request=request)
      
      print(f"Created topic: {topic.name} with AWS Kinesis Ingestion Settings")

    Weitere Informationen zu ARNs finden Sie unter Amazon-Ressourcennamen (ARNs) und IAM-Kennungen.

    Falls Probleme auftreten, lesen Sie den Abschnitt Fehlerbehebung bei einem Amazon Kinesis Data Streams-Importthema.

    Amazon Kinesis Data Streams-Importthema bearbeiten

    Sie können die Einstellungen der Datenquelle für die Aufnahme eines Amazon Kinesis Data Streams-Importthemas bearbeiten. Führen Sie diese Schritte aus:

    Console

    1. Rufen Sie in der Trusted Cloud Console die Seite Themen auf.

      Themen aufrufen

    2. Klicken Sie auf das Importthema für Amazon Kinesis Data Streams.

    3. Klicken Sie auf der Seite mit den Themendetails auf Bearbeiten.

    4. Aktualisieren Sie die Felder, die Sie ändern möchten.

    5. Klicken Sie auf Aktualisieren.

    gcloud

    1. In the Trusted Cloud console, activate Cloud Shell.

      Activate Cloud Shell

      At the bottom of the Trusted Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

    2. Damit die Einstellungen für das Importthema nicht verloren gehen, müssen Sie sie bei jeder Aktualisierung des Themas angeben. Wenn Sie etwas weglassen, wird die Einstellung von Pub/Sub auf den ursprünglichen Standardwert zurückgesetzt.

      Führen Sie den Befehl gcloud pubsub topics update mit allen im folgenden Beispiel genannten Flags aus:

      gcloud pubsub topics update TOPIC_ID 
      --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN
      --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN
      --kinesis-ingestion-role-arn KINESIS_ROLE_ARN
      --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT

      Ersetzen Sie Folgendes:

      • TOPIC_ID ist die Themen-ID. Dieses Feld kann nicht aktualisiert werden.

      • KINESIS_STREAM_ARN ist der ARN für die Kinesis Data Streams, die Sie in Pub/Sub aufnehmen möchten. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}.

      • KINESIS_CONSUMER_ARN ist der ARN der Consumer-Ressource, die bei AWS Kinesis Data Streams registriert ist. Das ARN-Format ist wie folgt: arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimestamp}.

      • KINESIS_ROLE_ARN ist der ARN der AWS-Rolle. Der ARN der Rolle hat folgendes Format: arn:aws:iam::${Account}:role/${RoleName}.

      • PUBSUB_SERVICE_ACCOUNT ist das Dienstkonto, das Sie in Dienstkonto in Trusted Clouderstellen erstellt haben.

    Kontingente und Grenzwerte für Amazon Kinesis Data Streams-Importthemen

    Der Publisher-Durchsatz für Importthemen ist an das Veröffentlichungskontingent des Themas gebunden. Weitere Informationen finden Sie unter Pub/Sub-Kontingente und ‑Limits.

    Nächste Schritte

    Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder ihrer Tochtergesellschaften in den USA und/oder anderen Ländern.