ストリーミング呼び出しを使用すると、単純なリクエスト/レスポンスよりも複雑なインタラクション パターンが可能になり、単一の接続で複数のメッセージを送受信できます。
Java 用 Cloud クライアント ライブラリは、次の 3 種類のストリーミング呼び出しをサポートしています。
- サーバー ストリーミング: サーバーからレスポンスのストリームが返されます。
- クライアント ストリーミング: クライアントからサーバーにリクエストのストリームが送信されます。
- 双方向ストリーミング: クライアントからサーバーにリクエストのストリームを送信でき、サーバーからクライアントにレスポンスのストリームを返信できます。
ストリーミング実装は、サーバー、クライアント、双方向ストリーミングの gRPC-Java 実装をモデルにしています。
トランスポート間のストリーミングのサポート
ストリーミングは gRPC を使用する場合は完全にサポートされますが、HttpJson の場合は部分的にのみサポートされます。ストリーミングのサポートについては、次の表をご覧ください。
| ストリーミングのタイプ | gRPC | HttpJson |
|---|---|---|
| サーバー ストリーミング | サポート対象 | サポート対象 |
| クライアント ストリーミング | サポート対象 | サポート対象外 |
| 双方向ストリーミング | サポート対象 | サポート対象外 |
単項呼び出し(ストリーミング以外)は、gRPC と HttpJson の両方でサポートされています。
ストリーミングのタイプを決定する
呼び出しのストリーミング タイプを判別するには、返された Callable タイプを確認します。
ServerStreamingCallable: サーバー ストリーミング。ClientStreamingCallable: クライアント ストリーミング。BidiStreamingCallable: 双方向ストリーミング。
たとえば、Java-Aiplatform と Java-Speech を使用する場合は次のようにします。
// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();
// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
ストリーミング呼び出しを行う
ストリーミング呼び出しを行う方法は、サーバー ストリーミングと双方向ストリーミングのどちらを使用するかによって異なります。
サーバー ストリーミング
サーバー ストリーミングでは、追加の実装は必要ありません。ServerStream クラスを使用すると、レスポンスのストリームを反復処理できます。Java-Maps-Routing を例として、サーバー ストリーミング API を呼び出す方法を次に示します。
try (RoutesClient routesClient = RoutesClient.create()) {
ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
routesClient.computeRouteMatrixCallable();
ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
ComputeRouteMatrixRequest.newBuilder().build());
for (RouteMatrixElement element : stream) {
// Do something with response
}
}
この例では、クライアントは単一の ComputeRouteMatrixRequest を送信し、レスポンスのストリームを受信します。
双方向ストリーミング
双方向ストリーミングでは、呼び出しを行うために追加の実装が必要です。 Java-Speech を例として、双方向ストリーミング呼び出しを行う実装例を次の手順で示します。
まず、次のコードをガイドラインとして使用して、ResponseObserver インターフェースを実装します。
class BidiResponseObserver<T> implements ResponseObserver<T> {
private final List<T> responses = new ArrayList<>();
private final SettableApiFuture<List<T>> future = SettableApiFuture.create();
@Override
public void onStart(StreamController controller) {
// no-op
}
@Override
public void onResponse(T response) {
responses.add(response);
}
@Override
public void onError(Throwable t) {
future.setException(t);
}
@Override
public void onComplete() {
future.set(responses);
}
public SettableApiFuture<List<T>> getFuture() {
return future;
}
}
次に、以下の手順を行います。
オブザーバーのインスタンスを作成します。
BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();オブザーバーを呼び出し可能オブジェクトに渡します。
ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);リクエストをサーバーに送信し、完了したらストリームを閉じます。
clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend();レスポンスを反復処理します。
List<StreamingRecognizeResponse> responses = responseObserver.getFuture().get(); for (StreamingRecognizeResponse response : responses) { // Do something with response }
サポートされていないストリーミング エラー
gRPC と HTTP/JSON トランスポートの両方をサポートするクライアント ライブラリの場合、サポートされていないストリーミング呼び出しを呼び出すようにクライアント ライブラリを誤って構成する可能性があります。たとえば、次の構成は、双方向ストリーミング呼び出しを行う Java-Speech の HttpJson クライアントを示しています。
// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
// Bidi Callable is not supported in HttpJson
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
...
}
コンパイル エラーは発生しませんが、実行時エラーとして表示されます。
Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.