Ordinare i messaggi

L'ordinamento dei messaggi è una funzionalità di Pub/Sub che consente di ricevere i messaggi nei client sottoscrittori nell'ordine in cui sono stati pubblicati dai client publisher.

Ad esempio, supponiamo che un client publisher in una regione pubblichi i messaggi 1, 2 e 3 in ordine. Con l'ordinamento dei messaggi, il client sottoscrittore riceve i messaggi pubblicati nello stesso ordine. Per essere recapitati in ordine, il client publisher deve pubblicare i messaggi nella stessa regione. Tuttavia, gli abbonati possono connettersi a qualsiasi regione e la garanzia di ordinazione viene comunque mantenuta.

L'ordinamento dei messaggi è una funzionalità utile per scenari come l'acquisizione delle modifiche al database, il monitoraggio delle sessioni utente e le applicazioni di streaming in cui è importante preservare la cronologia degli eventi.

Questa pagina spiega il concetto di ordinamento dei messaggi e come configurare i client abbonati per ricevere i messaggi in ordine. Per configurare i client publisher per l'ordinamento dei messaggi, consulta Utilizzare le chiavi di ordinamento per pubblicare un messaggio.

Panoramica dell'ordinamento dei messaggi

L'ordinamento in Pub/Sub è determinato da quanto segue:

  • Chiave di ordinamento: è una stringa utilizzata nei metadati dei messaggi Pub/Sub e rappresenta l'entità per la quale i messaggi devono essere ordinati. La chiave di ordinamento può contenere fino a 1 KB. Per ricevere un insieme di messaggi ordinati in una regione, devi pubblicare tutti i messaggi con la stessa chiave di ordinamento nella stessa regione. Alcuni esempi di chiavi di ordinamento sono gli ID cliente e la chiave primaria di una riga in un database.

    La velocità effettiva di pubblicazione su ogni chiave di ordinamento è limitata a 1 MBps. Il throughput per tutte le chiavi di ordinamento di un argomento è limitato alla quota disponibile in una regione di pubblicazione. Questo limite può essere aumentato fino a molte unità di GBps.

    Una chiave di ordinamento non equivale a una partizione in un sistema di messaggistica basato su partizioni, in quanto le chiavi di ordinamento dovrebbero avere una cardinalità molto più elevata rispetto alle partizioni.

  • Attiva ordinamento messaggi: si tratta di un'impostazione dell'abbonamento. Quando l'ordinamento dei messaggi è abilitato per un abbonamento, i client abbonati ricevono i messaggi pubblicati nella stessa regione con la stessa chiave di ordinamento nell'ordine in cui sono stati ricevuti dal servizio. Devi abilitare questa impostazione nell'abbonamento.

    Supponiamo di avere due abbonamenti A e B collegati allo stesso argomento T. La sottoscrizione A è configurata con l'ordinamento dei messaggi abilitato e la sottoscrizione B è configurata senza l'ordinamento dei messaggi abilitato. In questa architettura, entrambi gli abbonamenti A e B ricevono lo stesso insieme di messaggi dall'argomento T. Se pubblichi messaggi con chiavi di ordinamento nella stessa regione, l'abbonamento A riceve i messaggi nell'ordine in cui sono stati pubblicati. Mentre, la sottoscrizione B riceve i messaggi senza alcun ordine previsto.

In generale, se la tua soluzione richiede ai client publisher di inviare messaggi ordinati e non ordinati, crea argomenti separati, uno per i messaggi ordinati e l'altro per i messaggi non ordinati.

Considerazioni sull'utilizzo della messaggistica ordinata

Il seguente elenco contiene informazioni importanti sul comportamento della messaggistica ordinata in Pub/Sub:

  • Ordinamento all'interno della chiave: i messaggi pubblicati con la stessa chiave di ordinamento devono essere ricevuti in ordine. Supponiamo che per la chiave di ordinamento A tu pubblichi i messaggi 1, 2 e 3. Se l'ordinamento è attivato, 1 deve essere consegnato prima di 2 e 2 deve essere consegnato prima di 3.

  • Ordinamento tra chiavi: non è previsto che i messaggi pubblicati con chiavi di ordinamento diverse vengano ricevuti in ordine. Supponiamo di avere le chiavi di ordinamento A e B. Per la chiave di ordinamento A, i messaggi 1 e 2 vengono pubblicati in ordine. Per la chiave B, i messaggi 3 e 4 vengono pubblicati in ordine. Tuttavia, il messaggio 1 potrebbe arrivare prima o dopo il messaggio 4.

  • Riconsegna dei messaggi: Pub/Sub consegna ogni messaggio almeno una volta, quindi il servizio Pub/Sub potrebbe riconsegnare i messaggi. Le riconsegne di un messaggio attivano la riconsegna di tutti i messaggi successivi per quella chiave, anche di quelli confermati. Supponiamo che un client sottoscrittore riceva i messaggi 1, 2 e 3 per una chiave di ordinamento specifica. Se il messaggio 2 viene recapitato nuovamente (perché la scadenza della conferma è scaduta o la conferma con il criterio del "best effort" non è rimasta in Pub/Sub), viene recapitato nuovamente anche il messaggio 3. Se in una sottoscrizione sono abilitati sia l'ordinamento dei messaggi che un argomento messaggi non recapitabili, questo comportamento potrebbe non essere vero perché Pub/Sub inoltra i messaggi agli argomenti messaggi non recapitabili secondo il criterio del "best effort".

  • Ritardi di riconoscimento e argomenti messaggi non recapitabili: i messaggi non riconosciuti per una determinata chiave di ordinamento possono potenzialmente ritardare la consegna dei messaggi per altre chiavi di ordinamento, soprattutto durante i riavvii del server o le modifiche al traffico. Per mantenere l'ordine durante questi eventi, assicurati di rispondere tempestivamente a tutti i messaggi. Se non è possibile confermare la ricezione in modo tempestivo, valuta la possibilità di utilizzare un argomento messaggi non recapitabili per evitare la conservazione indefinita dei messaggi. Tieni presente che l'ordine potrebbe non essere mantenuto quando i messaggi vengono scritti in un argomento messaggi non recapitabili.

  • Affinità dei messaggi (client StreamingPull): i messaggi per la stessa chiave vengono di solito inviati allo stesso client sottoscrittore StreamingPull. L'affinità è prevista quando i messaggi sono in attesa per una chiave di ordinamento per un client abbonato specifico. Se non ci sono messaggi in sospeso, l'affinità potrebbe cambiare per il bilanciamento del carico o le disconnessioni dei client.

    Per garantire un'elaborazione fluida anche in caso di potenziali modifiche all'affinità, è fondamentale progettare l'applicazione streamingPull in modo che possa gestire i messaggi in qualsiasi client per una determinata chiave di ordinamento.

  • Integrazione con Dataflow: non attivare l'ordinamento dei messaggi per gli abbonamenti quando configuri Dataflow con Pub/Sub. Dataflow ha un proprio meccanismo per l'ordinamento totale dei messaggi, che garantisce l'ordine cronologico di tutti i messaggi nell'ambito delle operazioni di raggruppamento in finestre. Questo metodo di ordinamento è diverso dall'approccio basato sulla chiave di ordinamento di Pub/Sub. L'utilizzo di chiavi di ordinamento con Dataflow può potenzialmente ridurre le prestazioni della pipeline.

  • Scalabilità automatica: la pubblicazione ordinata di Pub/Sub viene scalata a miliardi di chiavi di ordinamento. Un numero maggiore di chiavi di ordinamento consente una distribuzione più parallela agli abbonati, poiché l'ordinamento si applica a tutti i messaggi con la stessa chiave di ordinamento.

  • Compromessi in termini di prestazioni: la pubblicazione ordinata comporta alcuni compromessi. Rispetto alla pubblicazione non ordinata, la pubblicazione ordinata riduce la disponibilità di pubblicazione e aumenta la latenza di pubblicazione end-to-end dei messaggi. Nel caso di consegna ordinata, il failover richiede il coordinamento per garantire che i messaggi vengano scritti e letti nell'ordine corretto.

  • Tasto di scelta rapida: quando utilizzi l'ordinamento dei messaggi, tutti i messaggi con la stessa chiave di ordinamento vengono inviati al client abbonato nell'ordine in cui vengono ricevuti dal servizio. Il callback dell'utente non viene eseguito finché il callback non viene completato per il messaggio precedente. La velocità effettiva massima per i messaggi che condividono la stessa chiave di ordinamento durante la consegna ai sottoscrittori non è limitata da Pub/Sub , ma dalla velocità di elaborazione del client sottoscrittore. Una hot key si verifica quando si accumula un backlog su una singola chiave di ordinamento perché il numero di messaggi prodotti al secondo supera il numero di messaggi che il sottoscrittore può elaborare al secondo. Per ridurre i tasti di scelta rapida, utilizza le chiavi più granulari possibili e riduci al minimo il tempo di elaborazione per messaggio. Puoi anche monitorare la metrica subscription/oldest_unacked_message_age per un valore in aumento, che potrebbe indicare un tasto di scelta rapida.

Per ulteriori informazioni su come utilizzare l'ordinamento dei messaggi, consulta i seguenti argomenti sulle best practice:

Comportamento del client sottoscrittore per l'ordinamento dei messaggi

I client sottoscrittori ricevono i messaggi nell'ordine in cui sono stati pubblicati in una regione specifica. Pub/Sub supporta diversi modi per ricevere messaggi, ad esempio i client sottoscrittori connessi alle sottoscrizioni pull e push. Le librerie client utilizzano streamingPull (ad eccezione di PHP).

Per saperne di più su questi tipi di abbonamento, consulta Scegliere un tipo di abbonamento.

Le sezioni seguenti descrivono cosa significa ricevere i messaggi in ordine per ogni tipo di client abbonato.

Client sottoscrittori StreamingPull

Quando utilizzi le librerie client con streamingPull, devi specificare un callback utente che viene eseguito ogni volta che un messaggio viene ricevuto da un client abbonato. Con le librerie client, per una determinata chiave di ordinamento, il callback viene eseguito completamente sui messaggi nell'ordine corretto. Se i messaggi vengono riconosciuti all'interno di questo callback, tutti i calcoli su un messaggio vengono eseguiti in ordine. Tuttavia, se il callback utente pianifica altro lavoro asincrono sui messaggi, il client abbonato deve assicurarsi che il lavoro asincrono venga eseguito in ordine. Una possibilità è aggiungere i messaggi a una coda di lavoro locale che viene elaborata in ordine.

Client pull sottoscrittore

Per i client sottoscrittori connessi alle sottoscrizioni pull, l'ordinamento dei messaggi Pub/Sub supporta quanto segue:

  • Tutti i messaggi per una chiave di ordinamento in PullResponse sono nell'ordine corretto nell'elenco.

  • È possibile avere in sospeso un solo batch di messaggi per una chiave di ordinamento alla volta.

Il requisito che prevede che possa essere in attesa un solo batch di messaggi alla volta è necessario per mantenere la consegna ordinata, poiché il servizio Pub/Sub non può garantire la riuscita o la latenza della risposta che invia per una richiesta pull di un abbonato.

Client sottoscrittori push

Le limitazioni al push sono ancora più rigide di quelle al pull. Per un abbonamento push, Pub/Sub supporta un solo messaggio in sospeso per ogni chiave di ordinamento alla volta. Ogni messaggio viene inviato a un endpoint push come richiesta separata. Pertanto, l'invio delle richieste in parallelo avrebbe lo stesso problema della distribuzione di più batch di messaggi per la stessa chiave di ordinamento per estrarre i sottoscrittori contemporaneamente. Le sottoscrizioni push potrebbero non essere una buona scelta per gli argomenti in cui i messaggi vengono pubblicati di frequente con la stessa chiave di ordinamento o in cui la latenza è estremamente importante.

Esportare i client abbonati

L'esportazione delle iscrizioni supporta i messaggi ordinati. Per le sottoscrizioni BigQuery, i messaggi con la stessa chiave di ordinamento vengono scritti nella tabella BigQuery in ordine. Per gli abbonamenti Cloud Storage, i messaggi con la stessa chiave di ordinamento potrebbero non essere tutti scritti nello stesso file. All'interno dello stesso file, i messaggi per una chiave di ordinamento sono in ordine. Quando distribuiti su più file, i messaggi successivi per una chiave di ordinamento possono essere visualizzati in un file con un nome che ha un timestamp precedente rispetto al timestamp nel nome del file con i messaggi precedenti.

Attivare l'ordinamento dei messaggi

Per ricevere i messaggi in ordine, imposta la proprietà di ordinamento nella sottoscrizione dalla quale ricevi i messaggi. La ricezione dei messaggi in ordine potrebbe aumentare la latenza. Non puoi modificare la proprietà di ordinamento dei messaggi dopo aver creato un abbonamento.

Puoi impostare la proprietà di ordinamento nella sottoscrizione al momento di creare una sottoscrizione utilizzando la console Trusted Cloud , Google Cloud CLI o l'API Pub/Sub.

Console

Per creare una sottoscrizione con la proprietà di ordinamento dei messaggi:

  1. Nella console Trusted Cloud , vai alla pagina Abbonamenti.

Vai agli abbonamenti

  1. Fai clic su Crea sottoscrizione.

  2. Inserisci un ID abbonamento.

  3. Scegli un argomento da cui vuoi ricevere messaggi.

  4. Nella sezione Ordinamento dei messaggi, seleziona Ordina i messaggi con una chiave di ordinamento.

  5. Fai clic su Crea.

gcloud

Per creare una sottoscrizione con la proprietà di ordinamento dei messaggi, utilizza il comando gcloud pubsub subscriptions create e il flag --enable-message-ordering:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --enable-message-ordering

Sostituisci SUBSCRIPTION_ID con l'ID dell'abbonamento.

Se la richiesta va a buon fine, la riga di comando visualizza una conferma:

Created subscription [SUBSCRIPTION_ID].

REST

Per creare una sottoscrizione con la proprietà di ordinamento dei messaggi, invia una richiesta PUT come la seguente:

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth application-default print-access-token)

Sostituisci quanto segue:

  • PROJECT_ID: l'ID progetto del progetto con l'argomento
  • SUBSCRIPTION_ID: l'ID dell'abbonamento

Nel corpo della richiesta, specifica quanto segue:

{
  "topic": TOPIC_ID,
  "enableMessageOrdering": true,
}

Sostituisci TOPIC_ID con l'ID dell'argomento da allegare all'abbonamento.

Se la richiesta riesce, la risposta è l'abbonamento in formato JSON:

{
  "name": projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,
  "topic": projects/PROJECT_ID/topics/TOPIC_ID,
  "enableMessageOrdering": true,
}

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_message_ordering(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithOrderingSample
{
    public Subscription CreateSubscriptionWithOrdering(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableMessageOrdering = true
        };

        Subscription subscription = null;
        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Vai

L'esempio seguente utilizza la versione principale della libreria client Go Pub/Sub (v2). Se utilizzi ancora la libreria v1, consulta la guida alla migrazione alla v2. Per visualizzare un elenco di esempi di codice della versione 1, consulta gli esempi di codice ritirati.

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

func createWithOrdering(w io.Writer, projectID, topic, subscription string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Message ordering can only be set when creating a subscription.
	sub, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:                  subscription,
		Topic:                 topic,
		EnableMessageOrdering: true,
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription: %v\n", sub)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Java di Pub/Sub.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithOrdering {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithOrderingExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithOrderingExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Set message ordering to true for ordered messages in the subscription.
                  .setEnableMessageOrdering(true)
                  .build());

      System.out.println("Created a subscription with ordering: " + subscription.getAllFields());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella Guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId,
  subscriptionNameOrId,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella Guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithOrdering(
  topicNameOrId: string,
  subscriptionNameOrId: string,
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableMessageOrdering: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with ordering enabled.`,
  );
  console.log(
    'To process messages in order, remember to add an ordering key to your messages.',
  );
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_message_ordering": True,
        }
    )
    print(f"Created subscription with ordering: {subscription}")

Ruby

Il seguente esempio utilizza la libreria client Ruby Pub/Sub v3. Se utilizzi ancora la libreria v2, consulta la guida alla migrazione alla v3. Per visualizzare un elenco di esempi di codice Ruby v2, consulta gli esempi di codice ritirati.

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby nella guida rapida all'utilizzo delle librerie client. Per saperne di più, consulta la documentazione di riferimento dell'API Pub/Sub Ruby.

# topic_id        = "your-topic-id"
# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  enable_message_ordering: true

puts "Pull subscription #{subscription_id} created with message ordering."

Passaggi successivi