Bestätigungszeit mit Lease-Management verlängern

Wenn eine Nachricht an einen Pull-Abonnenten gesendet wird, muss der Abonnent die Nachricht innerhalb des Bestätigungszeitlimits verarbeiten und bestätigen. Andernfalls muss der Abonnent die Frist durch einen Anruf verlängern, um die Bestätigungsfrist zu ändern.

Die Pub/Sub-Clientbibliotheken auf hoher Ebene bieten die Lease-Verwaltung als Funktion, die die Frist einer noch nicht bestätigten Nachricht automatisch verlängert. Standardmäßig können die Clientbibliotheken die Frist durch regelmäßige modifyAckDeadline-Anfragen auf eine Stunde verlängern. Die Clientbibliotheken auf hoher Ebene für Python, Go und Java verwenden das 99. Perzentil der Bestätigungsverzögerung, um die Länge jeder Verlängerung zu bestimmen.

Mit der Verwaltung von Leases haben Sie eine genauere Kontrolle über die Bestätigungsfrist für Nachrichten als bei der Konfiguration des Attributs auf Aboebene. Wenn Sie nur die Bestätigungsfrist auf Aboebene verwenden, müssen Sie einen Kompromiss zwischen einem niedrigen und einem hohen Wert eingehen. Ein niedriger Wert erhöht die Wahrscheinlichkeit von Duplikaten und ein hoher Wert verzögert die erneute Zustellung fehlgeschlagener Nachrichten. Den richtigen Wert zu ermitteln, kann schwierig sein, insbesondere wenn die erwartete Bearbeitungszeit für verschiedene Nachrichten stark variiert.

Weitere Informationen zu den Attributen eines Abos, einschließlich der Bestätigungsfrist, finden Sie unter Abo-Attribute.

Konfiguration der Leasingverwaltung

Sie können die folgenden Eigenschaften in den Clientbibliotheken auf hoher Ebene konfigurieren, um die Verwaltung von Leases zu steuern.

  • Maximale Verlängerungsfrist für die Bestätigung: Die maximale Zeitspanne, um die die Clientbibliothek die Bestätigungsfrist einer Nachricht mit der modify acknowledgment deadline-Anfrage verlängern kann. Mit dieser Eigenschaft können Sie festlegen, wie lange die Abonnentenclients Nachrichten verarbeiten sollen.

  • Maximale Dauer für jede Bestätigungserweiterung Die maximale Zeit, um die die Bestätigungsfrist für jede der modify acknowledgment deadline Anfragen verlängert werden kann. Mit dieser Eigenschaft können Sie festlegen, wie lange Pub/Sub benötigt, um eine Nachricht noch einmal zu senden. Die erneute Zustellung erfolgt, wenn der erste Abonnent, der die Nachricht verarbeitet, abstürzt oder fehlerhaft wird und die modify acknowledgment deadline-Anfrage nicht mehr senden kann.

  • Mindestdauer für jede Bestätigungserweiterung Die Mindestzeit, um die die Bestätigungsfrist für jede der modify acknowledgment deadline-Anfragen verlängert werden soll. Mit dieser Eigenschaft können Sie die Mindestzeit angeben, die vergehen muss, bevor eine Nachricht noch einmal gesendet wird.

Es kann nicht garantiert werden, dass Bestätigungsfristen eingehalten werden, wenn Sie die genau einmalige Zustellung nicht aktivieren.

Bestätigungsfristen manuell verwalten

Wenn Sie die Ablaufzeit und erneute Zustellung von Nachrichten bei Verwendung von Unary Pull oder den Clientbibliotheken auf niedriger Ebene vermeiden möchten, verwenden Sie die modify acknowledgment deadline-Anfrage, um die Bestätigungsfristen zu verlängern. Ausnahmen sind die allgemeinen Go- und C++-Clientbibliotheken, die die Freigabeverwaltung bei Verwendung von Unary Pull bieten. Hier sind einige Beispiele für das Pull-Modell mit Einzelkauf und Leasingverwaltung:

C#

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für C# in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub C# API.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;

public class PullMessageWithLeaseManagementSample
{
    public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();

        var ackIds = new List<string>();
        try
        {
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);

            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                ackIds.Add(msg.AckId);
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");

                // Modify the ack deadline of each received message from the default 10 seconds to 30.
                // This prevents the server from redelivering the message after the default 10 seconds
                // have passed.
                subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && ackIds.Count > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, ackIds);
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return ackIds.Count;
    }
}

Java

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Java API.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncWithLeaseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncWithLeaseExample(
      String projectId, String subscriptionId, Integer numOfMessages)
      throws IOException, InterruptedException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20 MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {

      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);

      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");
        return;
      }

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        ackIds.add(message.getAckId());

        // Modify the ack deadline of each received message from the default 10 seconds to 30.
        // This prevents the server from redelivering the message after the default 10 seconds
        // have passed.
        ModifyAckDeadlineRequest modifyAckDeadlineRequest =
            ModifyAckDeadlineRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAckIds(message.getAckId())
                .setAckDeadlineSeconds(30)
                .build();

        subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für PHP in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Node.js API.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithLeaseManagement() {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const maxMessages = 1;
  const newAckDeadlineSeconds = 30;
  const request = {
    subscription: formattedSubscription,
    maxMessages: maxMessages,
    allowExcessMessages: false,
  };

  let isProcessed = false;

  // The worker function is meant to be non-blocking. It starts a long-
  // running process, such as writing the message to a table, which may
  // take longer than the default 10-sec acknowledge deadline.
  function worker(message) {
    console.log(`Processing "${message.message.data}"...`);

    setTimeout(() => {
      console.log(`Finished procesing "${message.message.data}".`);
      isProcessed = true;
    }, 30000);
  }

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Obtain the first message.
  const message = response.receivedMessages[0];

  // Send the message to the worker function.
  worker(message);

  let waiting = true;
  while (waiting) {
    await new Promise(r => setTimeout(r, 10000));
    // If the message has been processed..
    if (isProcessed) {
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
      };

      //..acknowledges the message.
      await subClient.acknowledge(ackRequest);
      console.log(`Acknowledged: "${message.message.data}".`);
      // Exit after the message is acknowledged.
      waiting = false;
      console.log('Done.');
    } else {
      // If the message is not yet processed..
      const modifyAckRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
        ackDeadlineSeconds: newAckDeadlineSeconds,
      };

      //..reset its ack deadline.
      await subClient.modifyAckDeadline(modifyAckRequest);

      console.log(
        `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`,
      );
    }
  }
}

synchronousPullWithLeaseManagement().catch(console.error);

Python

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Python in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Python API.

import logging
import multiprocessing
import sys
import time

from google.api_core import retry
from google.cloud import pubsub_v1

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = dict()

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

response = subscriber.pull(
    request={"subscription": subscription_path, "max_messages": 3},
    retry=retry.Retry(deadline=300),
)

if len(response.received_messages) == 0:
    return

# Start a process for each message based on its size modulo 10.
for message in response.received_messages:
    process = multiprocessing.Process(
        target=time.sleep, args=(sys.getsizeof(message) % 10,)
    )
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    # Take a break every second.
    if processes:
        time.sleep(1)

    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is running, reset the ack deadline.
        if process.is_alive():
            subscriber.modify_ack_deadline(
                request={
                    "subscription": subscription_path,
                    "ack_ids": [ack_id],
                    # Must be between 10 and 600.
                    "ack_deadline_seconds": 15,
                }
            )
            logger.debug(f"Reset ack deadline for {msg_data}.")

        # If the process is complete, acknowledge the message.
        else:
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [ack_id]}
            )
            logger.debug(f"Acknowledged {msg_data}.")
            processes.pop(process)
print(
    f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)

# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()

Ruby

Im folgenden Beispiel wird die Ruby-Pub/Sub-Clientbibliothek v3 verwendet. Wenn Sie noch die v2-Bibliothek verwenden, finden Sie hier die Migrationsanleitung für v3. Eine Liste der Ruby v2-Codebeispiele finden Sie unter Eingestellte Codebeispiele.

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Ruby im Schnellstart: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zu Pub/Sub Ruby API.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id
new_ack_deadline = 30
processed = false

# The subscriber pulls a specified number of messages.
received_messages = subscriber.pull immediate: false, max: 1
# Obtain the first message.
message = received_messages.first

# Send the message to a non-blocking worker that starts a long-running
# process, such as writing the message to a table, which may take longer than
# the default 10-second acknowledge deadline.
Thread.new do
  sleep 15
  processed = true
  puts "Finished processing \"#{message.data}\"."
end

loop do
  sleep 1
  if processed
    # If the message has been processed, acknowledge the message.
    message.acknowledge!
    puts "Done."
    # Exit after the message is acknowledged.
    break
  else
    # If the message has not yet been processed, reset its ack deadline.
    message.modify_ack_deadline! new_ack_deadline
    puts "Reset ack deadline for \"#{message.data}\" for " \
         "#{new_ack_deadline} seconds."
  end
end

Nächste Schritte

Weitere Auslieferungsoptionen, die Sie für ein Abo konfigurieren können: