与简单的请求/响应相比,流式调用支持更复杂的互动模式,允许通过单个连接发送或接收多条消息。
Java 版 Cloud 客户端库支持三种类型的流式调用:
- 服务器流式传输 :服务器向您发送一系列响应。
- 客户端流式传输 :您向服务器发送一系列请求。
- 双向流式传输 :您可以向服务器发送一系列请求,服务器可以向您发送一系列响应。
流式传输实现以 gRPC-Java 实现为模型,用于服务器、客户端和双向流式传输。
跨传输方式的流式传输支持
使用 gRPC 时,流式传输完全受支持,但对于 HttpJson,流式传输仅部分受支持。如需了解流式传输支持,请参阅下表。
| 流式传输类型 | gRPC | HttpJson |
|---|---|---|
| 服务器流式传输 | 支持 | 支持 |
| 客户端流式传输 | 支持 | 不支持 |
| 双向流式传输 | 支持 | 不支持 |
gRPC 和 HttpJson 都支持一元调用(非流式传输)。
确定流式传输的类型
如需确定调用的流式传输类型,请检查返回的 Callable 类型:
ServerStreamingCallable:服务器流式传输。ClientStreamingCallable:客户端流式传输。BidiStreamingCallable:双向流式传输。
例如,使用 Java-Aiplatform 和 Java-Speech:
// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();
// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
发出流式调用
发出流式调用会有所不同,具体取决于您使用的是服务器 流式传输还是双向流式传输。
服务器流式传输
服务器流式传输不需要额外的实现。ServerStream 类可让您迭代响应流。以 Java-Maps-Routing 为例,以下展示了如何调用服务器流式传输 API:
try (RoutesClient routesClient = RoutesClient.create()) {
ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
routesClient.computeRouteMatrixCallable();
ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
ComputeRouteMatrixRequest.newBuilder().build());
for (RouteMatrixElement element : stream) {
// Do something with response
}
}
在此示例中,客户端发送单个 ComputeRouteMatrixRequest 并接收一系列响应。
双向流式传输
双向流式传输需要额外的实现才能进行调用。 以 Java-Speech 为例,以下步骤提供了一个示例实现,用于发出双向流式传输调用。
首先,使用以下代码作为指南来实现 ResponseObserver 接口:
class BidiResponseObserver<T> implements ResponseObserver<T> {
private final List<T> responses = new ArrayList<>();
private final SettableApiFuture<List<T>> future = SettableApiFuture.create();
@Override
public void onStart(StreamController controller) {
// no-op
}
@Override
public void onResponse(T response) {
responses.add(response);
}
@Override
public void onError(Throwable t) {
future.setException(t);
}
@Override
public void onComplete() {
future.set(responses);
}
public SettableApiFuture<List<T>> getFuture() {
return future;
}
}
然后按照以下步骤操作:
创建观察器实例:
BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();将观察器传入可调用对象:
ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);将请求发送到服务器,并在完成后关闭流:
clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.send(StreamingRecognizeRequest.newBuilder().build()); // ... other requests ... clientStream.send(StreamingRecognizeRequest.newBuilder().build()); clientStream.closeSend();迭代响应:
List<StreamingRecognizeResponse> responses = responseObserver.getFuture().get(); for (StreamingRecognizeResponse response : responses) { // Do something with response }
不支持的流式传输错误
对于同时支持 gRPC 和 HTTP/JSON 传输方式的客户端库,可能会意外地将客户端库配置为调用不支持的流式传输调用。例如,以下配置显示了 Java-Speech 的 HttpJson 客户端发出双向流式传输调用:
// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
// Bidi Callable is not supported in HttpJson
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
...
}
这不会导致编译错误,但会显示为运行时错误:
Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.