Java 版 Cloud 客户端库上的流式调用

与简单的请求/响应相比,流式调用支持更复杂的互动模式,允许通过单个连接发送或接收多条消息。

Java 版 Cloud 客户端库支持三种类型的流式调用:

  • 服务器流式传输 :服务器向您发送一系列响应。
  • 客户端流式传输 :您向服务器发送一系列请求。
  • 双向流式传输 :您可以向服务器发送一系列请求,服务器可以向您发送一系列响应。

流式传输实现以 gRPC-Java 实现为模型,用于服务器、客户端和双向流式传输。

跨传输方式的流式传输支持

使用 gRPC 时,流式传输完全受支持,但对于 HttpJson,流式传输仅部分受支持。如需了解流式传输支持,请参阅下表。

流式传输类型 gRPC HttpJson
服务器流式传输 支持 支持
客户端流式传输 支持 不支持
双向流式传输 支持 不支持

gRPC 和 HttpJson 都支持一元调用(非流式传输)。

确定流式传输的类型

如需确定调用的流式传输类型,请检查返回的 Callable 类型:

  • ServerStreamingCallable :服务器流式传输。
  • ClientStreamingCallable :客户端流式传输。
  • BidiStreamingCallable :双向流式传输。

例如,使用 Java-Aiplatform 和 Java-Speech:

// Server Streaming
ServerStreamingCallable<ReadTensorboardBlobDataRequest, ReadTensorboardBlobDataResponse> callable = aiplatformClient.readTensorboardBlobDataCallable();

// Bidirectional Streaming
BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();

发出流式调用

发出流式调用会有所不同,具体取决于您使用的是服务器 流式传输还是双向流式传输。

服务器流式传输

服务器流式传输不需要额外的实现。ServerStream 类可让您迭代响应流。以 Java-Maps-Routing 为例,以下展示了如何调用服务器流式传输 API:

try (RoutesClient routesClient = RoutesClient.create()) {
  ServerStreamingCallable<ComputeRouteMatrixRequest, RouteMatrixElement> computeRouteMatrix =
    routesClient.computeRouteMatrixCallable();  
  ServerStream<RouteMatrixElement> stream = computeRouteMatrix.call(
    ComputeRouteMatrixRequest.newBuilder().build());
  for (RouteMatrixElement element : stream) {
    // Do something with response
  }
}

在此示例中,客户端发送单个 ComputeRouteMatrixRequest 并接收一系列响应。

双向流式传输

双向流式传输需要额外的实现才能进行调用。 以 Java-Speech 为例,以下步骤提供了一个示例实现,用于发出双向流式传输调用。

首先,使用以下代码作为指南来实现 ResponseObserver 接口:

class BidiResponseObserver<T> implements ResponseObserver<T> {
  private final List<T> responses = new ArrayList<>();
  private final SettableApiFuture<List<T>> future = SettableApiFuture.create();

  @Override
  public void onStart(StreamController controller) {
    // no-op
  }

  @Override
  public void onResponse(T response) {
    responses.add(response);
  }

  @Override
  public void onError(Throwable t) {
    future.setException(t);
  }

  @Override
  public void onComplete() {
    future.set(responses);
  }

  public SettableApiFuture<List<T>> getFuture() {
    return future;
  }
}

然后按照以下步骤操作:

  1. 创建观察器实例:

    BidiResponseObserver<StreamingRecognizeResponse> responseObserver = new BidiResponseObserver<>();
    
  2. 将观察器传入可调用对象:

    ClientStream<EchoRequest> clientStream = speechClient.streamingRecognizeCallable().splitCall(responseObserver);
    
  3. 将请求发送到服务器,并在完成后关闭流:

    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    // ... other requests ...
    clientStream.send(StreamingRecognizeRequest.newBuilder().build());
    clientStream.closeSend();
    
  4. 迭代响应:

    List<StreamingRecognizeResponse> responses = responseObserver.getFuture().get();
    
    for (StreamingRecognizeResponse response : responses) {
      // Do something with response
    }
    

不支持的流式传输错误

对于同时支持 gRPC 和 HTTP/JSON 传输方式的客户端库,可能会意外地将客户端库配置为调用不支持的流式传输调用。例如,以下配置显示了 Java-Speech 的 HttpJson 客户端发出双向流式传输调用:

// SpeechClient is configured to use HttpJson
try (SpeechClient speechClient = SpeechClient.create(SpeechSettings.newHttpJsonBuilder().build())) {
  // Bidi Callable is not supported in HttpJson
  BidiStreamingCallable<StreamingRecognizeRequest, StreamingRecognizeResponse> callable = speechClient.streamingRecognizeCallable();
  ...
}

这不会导致编译错误,但会显示为运行时错误:

Not implemented: streamingRecognizeCallable(). REST transport is not implemented for this method yet.
Important: The client library MUST be configured with gRPC to use client or bidirectional streaming.