Streaming-Aufrufe in Cloud-Clientbibliotheken für Java

Streaming-Aufrufe ermöglichen komplexere Interaktionsmuster als einfache Anfragen/Antworten, da mehrere Nachrichten über eine einzige Verbindung gesendet oder empfangen werden können.

Cloud-Clientbibliotheken für Java unterstützen drei Arten von Streaming-Aufrufen:

  • Server-Streaming:Der Server sendet einen Stream von Antworten an Sie zurück.
  • Client-Streaming:Sie senden einen Stream von Anfragen an den Server.
  • Bidirektionales Streaming:Sie können einen Stream von Anfragen an den Server senden und der Server kann einen Stream von Antworten an Sie zurücksenden.

Die Streaming-Implementierungen sind nach gRPC-Java-Implementierungen für Server-, Client- und bidirektionales Streaming modelliert.

Streaming-Unterstützung für verschiedene Transporte

Streaming wird bei Verwendung von gRPC vollständig unterstützt, bei HttpJson jedoch nur teilweise. In der folgenden Tabelle finden Sie Informationen zur Streaming-Unterstützung.

Streamingtyp gRPC HttpJson
Server-Streaming Unterstützt Unterstützt
Client-Streaming Unterstützt Nicht unterstützt
Bidirektionales Streaming Unterstützt Nicht unterstützt

Unäre Aufrufe (ohne Streaming) werden sowohl für gRPC als auch für HttpJson unterstützt.

Streamingtyp bestimmen

Prüfen Sie den zurückgegebenen Callable-Typ, um den Streamingtyp des Aufrufs zu bestimmen:

  • ServerStreamingCallable:Server-Streaming
  • ClientStreamingCallable:Client-Streaming
  • BidiStreamingCallable:Bidirektionales Streaming

Beispiel mit Java-Aiplatform und Java-Speech:

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

Streaming-Aufrufe durchführen

Die Durchführung eines Streaming-Aufrufs hängt davon ab, ob Sie Server- Streaming oder bidirektionales Streaming verwenden.

Server-Streaming

Für das Server-Streaming ist keine zusätzliche Implementierung erforderlich. Mit der Klasse ServerStream können Sie den Stream von Antworten durchlaufen. Am Beispiel von Java-Maps-Routing wird im Folgenden gezeigt, wie die Server-Streaming-API aufgerufen wird:

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

In diesem Beispiel sendet der Client eine einzelne ComputeRouteMatrixRequest und empfängt einen Stream von Antworten.

Bidirektionales Streaming

Für das bidirektionale Streaming ist eine zusätzliche Implementierung erforderlich, um den Aufruf durchzuführen. Am Beispiel von Java-Speech wird in den folgenden Schritten eine Beispielimplementierung für einen bidirektionalen Streaming-Aufruf gezeigt.

Implementieren Sie zuerst die Schnittstelle ResponseObserver. Verwenden Sie dazu den folgenden Code als Richtlinie:

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

Führen Sie dann folgende Schritte aus:

  1. Erstellen Sie eine Instanz des Beobachters:

    BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
    
  2. Übergeben Sie den Beobachter an das aufrufbare Objekt:

    ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
    
  3. Senden Sie die Anfragen an den Server und schließen Sie den Stream nach Abschluss:

    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    // ... other requests ...
    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    clientStream.closeSend();
    
  4. Durchlaufen Sie die Antworten:

    List<StreamingRecognizeResponse> responses = responseObserver.getFuture().get();
    
    for (StreamingRecognizeResponse response : responses) {
      // Do something with response
    }
    

Nicht unterstützte Streamingfehler

Bei Clientbibliotheken, die sowohl gRPC- als auch HTTP/JSON-Transporte unterstützen, kann es vorkommen, dass die Clientbibliothek versehentlich so konfiguriert wird, dass ein nicht unterstützter Streaming-Aufruf aufgerufen wird. Die folgende Konfiguration zeigt beispielsweise, dass der HttpJson-Client von Java-Speech einen bidirektionalen Streaming-Aufruf durchführt:

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

Dies führt nicht zu einem Kompilierungsfehler, sondern zu einem Laufzeitfehler:

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.