Gestire i picchi temporanei con il controllo del flusso

A volte le Data pipelines registrano picchi nel traffico pubblicato. I picchi di traffico possono sopraffare gli abbonati se non ti prepari in anticipo. Una soluzione semplice per evitare picchi di traffico consiste nell'aumentare dinamicamente le risorse dei sottoscrittori Pub/Sub per elaborare più messaggi. Tuttavia, questa soluzione potrebbe aumentare i costi o non funzionare immediatamente. Ad esempio, potresti aver bisogno di molte VM.

Il controllo del flusso sul lato del sottoscrittore consente a quest'ultimo di regolare la velocità di inserimento dei messaggi. Il controllo del flusso gestisce quindi i picchi di traffico senza aumentare i costi o fino a quando l'abbonato non viene scalato.

Il controllo del flusso è una funzionalità disponibile nella libreria client di alto livello Pub/Sub. Puoi anche implementare la tua programmazione di controllo del flusso quando utilizzi una libreria client di basso livello.

La necessità di un controllo del flusso indica che i messaggi vengono pubblicati a una velocità superiore a quella di consumo. Se questo scenario è uno stato persistente anziché un picco transitorio nel volume dei messaggi, valuta la possibilità di aumentare il numero di istanze client del sottoscrittore.

Configurazione del controllo del flusso

Il controllo del flusso consente di configurare il numero massimo di byte allocati per le richieste in sospeso e il numero massimo di messaggi in sospeso consentiti. Imposta questi limiti in base alla capacità di throughput delle macchine client.

I valori predefiniti per le variabili di controllo del flusso e i nomi delle variabili potrebbero variare a seconda delle librerie client. Ad esempio, nella libreria client Java, le seguenti variabili configurano il controllo del flusso:

  • setMaxOutstandingElementCount(): definisce il numero massimo di messaggi per i quali Pub/Sub non ha ricevuto conferme o conferme negative.

  • setMaxOutstandingRequestBytes(): definisce la dimensione massima dei messaggi per i quali Pub/Sub non ha ricevuto conferme o conferme negative.

Se viene superato il limite per setMaxOutstandingElementCount() o setMaxOutstandingRequestBytes(), il client abbonato non recupera altri messaggi. Questo comportamento continua finché i messaggi già recuperati non vengono confermati o confermati negativamente. In questo modo possiamo allineare il throughput al costo associato all'esecuzione di più sottoscrizioni.

Esempi di codice per il controllo del flusso

Per controllare la velocità con cui il client sottoscrittore riceve i messaggi, utilizza le funzionalità di controllo del flusso del sottoscrittore. Queste funzionalità di controllo del flusso sono illustrate negli esempi seguenti:

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Change the flow control watermarks, by default the client library uses
  // 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
  // size watermarks. Recall that the library stops requesting messages if
  // any of the high watermarks are reached, and the library resumes
  // requesting messages when *both* low watermarks are reached.
  auto constexpr kMiB = 1024 * 1024L;
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxOutstandingMessagesOption>(1000)
          .set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));

  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        std::move(h).ack();
        std::cout << "Received message " << m << "\n";
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C#.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithFlowControlAsyncSample
{
    public async Task<int> PullMessagesWithFlowControlAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            Settings = new SubscriberClient.Settings
            {
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
            }
        }.BuildAsync();
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Vai

L'esempio seguente utilizza la versione principale della libreria client Go Pub/Sub (v2). Se utilizzi ancora la libreria v1, consulta la guida alla migrazione alla v2. Per visualizzare un elenco di esempi di codice della versione 1, consulta gli esempi di codice ritirati.

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsFlowControlSettings(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	// MaxOutstandingMessages is the maximum number of unprocessed messages the
	// subscriber client will pull from the server before pausing. This also configures
	// the maximum number of concurrent handlers for received messages.
	//
	// For more information, see https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
	sub.ReceiveSettings.MaxOutstandingMessages = 100
	// MaxOutstandingBytes is the maximum size of unprocessed messages,
	// that the subscriber client will pull from the server before pausing.
	sub.ReceiveSettings.MaxOutstandingBytes = 1e8
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Pub/Sub.


import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithFlowControlSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithFlowControlSettingsExample(projectId, subscriptionId);
  }

  public static void subscribeWithFlowControlSettingsExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;

    // The subscriber will pause the message stream and stop receiving more messsages from the
    // server if any one of the conditions is met.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.newBuilder()
            // 1,000 outstanding messages. Must be >0. It controls the maximum number of messages
            // the subscriber receives before pausing the message stream.
            .setMaxOutstandingElementCount(1000L)
            // 100 MiB. Must be >0. It controls the maximum size of messages the subscriber
            // receives before pausing the message stream.
            .setMaxOutstandingRequestBytes(100L * 1024L * 1024L)
            .build();

    try {
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setFlowControlSettings(flowControlSettings)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella Guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const maxInProgress = 5;
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function subscribeWithFlowControlSettings(
  subscriptionNameOrId,
  maxInProgress,
  timeout,
) {
  const subscriberOptions = {
    flowControl: {
      maxMessages: maxInProgress,
    },
  };

  // References an existing subscription.
  // Note that flow control settings are not persistent across subscribers.
  const subscription = pubSubClient.subscription(
    subscriptionNameOrId,
    subscriberOptions,
  );

  console.log(
    `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`,
  );

  const messageHandler = message => {
    console.log(`Received message: ${message.id}`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(async () => {
    await subscription.close();
  }, timeout * 1000);
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella Guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const maxInProgress = 5;
// const timeout = 10;

// Imports the Google Cloud client library
import {Message, PubSub, SubscriberOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function subscribeWithFlowControlSettings(
  subscriptionNameOrId: string,
  maxInProgress: number,
  timeout: number,
) {
  const subscriberOptions: SubscriberOptions = {
    flowControl: {
      maxMessages: maxInProgress,
    },
  };

  // References an existing subscription.
  // Note that flow control settings are not persistent across subscribers.
  const subscription = pubSubClient.subscription(
    subscriptionNameOrId,
    subscriberOptions,
  );

  console.log(
    `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`,
  );

  const messageHandler = (message: Message) => {
    console.log(`Received message: ${message.id}`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(async () => {
    await subscription.close();
  }, timeout * 1000);
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Python.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data!r}.")
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control
)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Il seguente esempio utilizza la libreria client Ruby Pub/Sub v3. Se utilizzi ancora la libreria v2, consulta la guida alla migrazione alla v3. Per visualizzare un elenco di esempi di codice Ruby v2, consulta gli esempi di codice ritirati.

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Ruby.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

listener = subscriber.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

Passaggi successivi

Scopri le altre opzioni di pubblicazione che puoi configurare per un abbonamento: