Java 用 Cloud クライアント ライブラリのストリーミング呼び出し

ストリーミング呼び出しを使用すると、単純なリクエスト/レスポンスよりも複雑なインタラクション パターンが可能になり、単一の接続で複数のメッセージを送受信できます。

Java 用 Cloud クライアント ライブラリは、次の 3 種類のストリーミング呼び出しをサポートしています。

  • サーバー ストリーミング: サーバーからレスポンスのストリームが返されます。
  • クライアント ストリーミング: クライアントからサーバーにリクエストのストリームが送信されます。
  • 双方向ストリーミング: クライアントからサーバーにリクエストのストリームを送信でき、サーバーからクライアントにレスポンスのストリームを返信できます。

ストリーミング実装は、サーバー、クライアント、双方向ストリーミングの gRPC-Java 実装をモデルにしています。

トランスポート間のストリーミングのサポート

ストリーミングは gRPC を使用する場合は完全にサポートされますが、HttpJson の場合は部分的にのみサポートされます。ストリーミングのサポートについては、次の表をご覧ください。

ストリーミングのタイプ gRPC HttpJson
サーバー ストリーミング サポート対象 サポート対象
クライアント ストリーミング サポート対象 サポート対象外
双方向ストリーミング サポート対象 サポート対象外

単項呼び出し(ストリーミング以外)は、gRPC と HttpJson の両方でサポートされています。

ストリーミングのタイプを決定する

呼び出しのストリーミング タイプを判別するには、返された Callable タイプを確認します。

  • ServerStreamingCallable: サーバー ストリーミング。
  • ClientStreamingCallable: クライアント ストリーミング。
  • BidiStreamingCallable: 双方向ストリーミング。

たとえば、Java-Aiplatform と Java-Speech を使用する場合は次のようにします。

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

ストリーミング呼び出しを行う

ストリーミング呼び出しを行う方法は、サーバー ストリーミングと双方向ストリーミングのどちらを使用するかによって異なります。

サーバー ストリーミング

サーバー ストリーミングでは、追加の実装は必要ありません。ServerStream クラスを使用すると、レスポンスのストリームを反復処理できます。Java-Maps-Routing を例として、サーバー ストリーミング API を呼び出す方法を次に示します。

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

この例では、クライアントは単一の ComputeRouteMatrixRequest を送信し、レスポンスのストリームを受信します。

双方向ストリーミング

双方向ストリーミングでは、呼び出しを行うために追加の実装が必要です。 Java-Speech を例として、双方向ストリーミング呼び出しを行う実装例を次の手順で示します。

まず、次のコードをガイドラインとして使用して、ResponseObserver インターフェースを実装します。

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

次に、以下の手順を行います。

  1. オブザーバーのインスタンスを作成します。

    BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
    
  2. オブザーバーを呼び出し可能オブジェクトに渡します。

    ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
    
  3. リクエストをサーバーに送信し、完了したらストリームを閉じます。

    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    // ... other requests ...
    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    clientStream.closeSend();
    
  4. レスポンスを反復処理します。

    List<StreamingRecognizeResponse> responses = responseObserver.getFuture().get();
    
    for (StreamingRecognizeResponse response : responses) {
      // Do something with response
    }
    

サポートされていないストリーミング エラー

gRPC と HTTP/JSON トランスポートの両方をサポートするクライアント ライブラリの場合、サポートされていないストリーミング呼び出しを呼び出すようにクライアント ライブラリを誤って構成する可能性があります。たとえば、次の構成は、双方向ストリーミング呼び出しを行う Java-Speech の HttpJson クライアントを示しています。

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

コンパイル エラーは発生しませんが、実行時エラーとして表示されます。

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.