官方文档:https://grpc.io/docs/what-is-grpc/introduction/

一 gRPC 允许你定义四种服务方法:

  • 一元 RPC,其中客户端向服务器发送单个请求并得到单个响应,就像普通函数调用一样。

    rpc SayHello(HelloRequest) returns (HelloResponse);

  • 服务器流式 RPC 中,客户端向服务器发送请求并获取流以读取一系列消息。客户端从返回的流中读取,直到没有更多消息。gRPC 保证单个RPC 调用中的消息排序。

    rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);

  • 客户端流式 RPC,客户端使用提供的流编写一系列消息并将其发送到服务器。客户端完成编写消息后,它会等待服务器读取消息并返回响应。同样,gRPC 保证了单个 RPC 调用中的消息排序。

    rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);

  • 双向流式RPC,双方使用读写流发送一系列消息。两个流独立运行,因此客户端和服务器可以按任意顺序进行读写:例如,服务器可以等待接收所有客户端消息后再写入响应,也可以交替读取消息然后写入消息,或者采用其他读写组合。每个流中的消息顺序都会保留。

    rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);

二 代码样例

1 一元 RPC

其中客户端向服务器发送单个请求并得到单个响应,就像普通函数调用一样

1.1 Greeter.proto文件

syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
service Greeter {
  rpc SayHello(HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

1.2 client端

public class GreeterClient {

    public static void main(String[] args) {
        // 创建一个通道连接到服务端
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()  // 关闭TLS(只在开发时使用)
                .build();

        // 创建存根
        GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);

        // 创建请求
        HelloRequest request = HelloRequest.newBuilder()
                .setName("World")
                .build();

        // 调用服务
        try {
            HelloReply response = stub.sayHello(request);
            System.out.println("Response: " + response.getMessage());
        } catch (StatusRuntimeException e) {
            System.err.println("RPC failed: " + e.getStatus());
        } finally {
            // 关闭通道
            channel.shutdown();
        }
    }
}

1.3 server端

public class GreeterServer {
    public static void main(String[] args) throws Exception {
        Server server = ServerBuilder.forPort(50051)
                .addService(new GreeterImpl())
                .build()
                .start();
        System.out.println("Server started on port 50051");
        server.awaitTermination();
    }

    static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
        @Override
        public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
            String message = "Hello, " + request.getName();
            HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
        }
    }
}

1.4 结果

在这里插入图片描述

2 服务器流式 RPC

2.1 StreamingGreeter.proto文件

syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
import "Greeter.proto";
service StreamingGreeter {
  rpc StreamHello(HelloRequest) returns (stream HelloReply);
}

2.2 client端

public class StreamingGreeterClient {

    public static void main(String[] args) {
        // 创建一个通道连接到服务端
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()  // 关闭TLS(只在开发时使用)
                .build();

        // 创建存根
        StreamingGreeterGrpc.StreamingGreeterBlockingStub stub = StreamingGreeterGrpc.newBlockingStub(channel);

        // 创建请求
        HelloRequest request = HelloRequest.newBuilder()
                .setName("World")
                .build();

        // 调用StreamHello服务
        try {
            // 服务器会返回多个消息
            stub.streamHello(request).forEachRemaining(response -> {
                long l = System.currentTimeMillis();
                System.out.println(l+":==:Received: " + response.getMessage());
            });
        } catch (StatusRuntimeException e) {
            System.err.println("RPC failed: " + e.getStatus());
        } finally {
            channel.shutdown();
        }
    }
}

2.3 server端

/**
 * 服务端流式
 */
public class StreamingGreeterServer {
    public static void main(String[] args) throws Exception {
        Server server = ServerBuilder.forPort(50051)
                .addService(new StreamingGreeterImpl())
                .build()
                .start();
        System.out.println("Server started on port 50051");
        server.awaitTermination();
    }

    static class StreamingGreeterImpl extends StreamingGreeterGrpc.StreamingGreeterImplBase {
        @Override
        public void streamHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
            for (int i = 0; i < 5; i++) {
                HelloReply reply = HelloReply.newBuilder()
                        .setMessage("Hello, " + request.getName() + " #" + i)
                        .build();
                responseObserver.onNext(reply);
            }
            responseObserver.onCompleted();
        }
    }
}

2.4 结果

在这里插入图片描述

3 客户端流式 RPC

3.1 ClientStreamingGreeter.proto文件

syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
import "Greeter.proto";
service ClientStreamingGreeter {
  rpc StreamHello(stream HelloRequest) returns (HelloReply);
}

3.2 client端

public class ClientStreamingGreeterClient {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个通道连接到服务端
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50052)
                .usePlaintext()  // 关闭TLS(只在开发时使用)
                .build();
        final CountDownLatch latch = new CountDownLatch(1);
        // 创建存根  异步
        ClientStreamingGreeterGrpc.ClientStreamingGreeterStub asyncStub = ClientStreamingGreeterGrpc.newStub(channel);

        // 创建请求
        StreamObserver<HelloRequest> requestObserver = asyncStub.streamHello(new StreamObserver<HelloReply>() {
            @Override
            public void onNext(HelloReply value) {
                System.out.println("Received: " + value.getMessage());
            }

            @Override
            public void onError(Throwable t) {
                System.err.println("RPC failed: " + t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("Server completed the RPC.");
                latch.countDown();  // 完成时释放Latch
            }
        });

        // 客户端发送多个请求
        try {
            requestObserver.onNext(HelloRequest.newBuilder().setName("John").build());
            requestObserver.onNext(HelloRequest.newBuilder().setName("Jane").build());
            requestObserver.onNext(HelloRequest.newBuilder().setName("Doe").build());

            // 结束请求
            requestObserver.onCompleted();
        } catch (Exception e) {
            System.err.println("Error sending requests: " + e.getMessage());
            requestObserver.onError(e);
        } finally {
            latch.await();
            channel.shutdown();
        }
    }
}

3.2 server端

public class ClientStreamingGreeterServer {

    public static void main(String[] args) throws Exception {
        Server server = ServerBuilder.forPort(50052)
                .addService(new ClientStreamingGreeterImpl())
                .build()
                .start();
        System.out.println("Server started on port 50051");
        server.awaitTermination();
    }

    static class ClientStreamingGreeterImpl extends ClientStreamingGreeterGrpc.ClientStreamingGreeterImplBase {
        @Override
        public StreamObserver<HelloRequest> streamHello(StreamObserver<HelloReply> responseObserver) {
            return new StreamObserver<HelloRequest>() {
                int count = 0;

                @Override
                public void onNext(HelloRequest value) {
                    System.out.println(value.getName());
                    count++;
                }

                @Override
                public void onError(Throwable t) {
                }

                @Override
                public void onCompleted() {
                    HelloReply reply = HelloReply.newBuilder()
                            .setMessage("Received " + count + " messages")
                            .build();
                    responseObserver.onNext(reply);
                    responseObserver.onCompleted();
                    System.out.println(reply.getMessage());
                }
            };
        }
    }
}

3.4 结果

在这里插入图片描述

4 双向流式RPC

4.1 BidirectionalStreamingGreeter.proto文件

syntax = "proto3";
option java_package = "org.apache.demo.v1";
option java_multiple_files = true;
import "Greeter.proto";
service BidirectionalStreamingGreeter {
  rpc StreamHello(stream HelloRequest) returns (stream HelloReply);
}

4.1 client端

public class BidirectionalStreamingGreeterClient {
    public static void main(String[] args) throws InterruptedException {
        // 创建一个通道连接到服务端
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051)
                .usePlaintext()  // 关闭TLS(只在开发时使用)
                .build();
        final CountDownLatch latch = new CountDownLatch(1);
        // 创建存根 异步
        BidirectionalStreamingGreeterGrpc.BidirectionalStreamingGreeterStub stub = BidirectionalStreamingGreeterGrpc.newStub(channel);

        // 创建请求
        StreamObserver<HelloRequest> requestObserver = stub.streamHello(new StreamObserver<HelloReply>() {
            @Override
            public void onNext(HelloReply helloReply) {
                System.out.println("Received from server: " + helloReply.getMessage());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                latch.countDown();
                System.out.println("Server completed the RPC.");
            }
        });
        // 客户端发送多个请求
        try {
            requestObserver.onNext(HelloRequest.newBuilder().setName("Alice").build());
            requestObserver.onNext(HelloRequest.newBuilder().setName("Bob").build());
            requestObserver.onNext(HelloRequest.newBuilder().setName("Charlie").build());

            // 结束请求
            requestObserver.onCompleted();
        } catch (Exception e) {
            System.err.println("Error sending requests: " + e.getMessage());
            requestObserver.onError(e);
        } finally {
            latch.await();
            channel.shutdown();
        }
    }


}

4.1 server端

public class BidirectionalStreamingGreeterServer {

    public static void main(String[] args) throws InterruptedException, IOException {
        Server server = ServerBuilder.forPort(50051)
                .addService(new BidirectionalStreamingGreeterImpl())
                .build()
                .start();
        System.out.println("Server started on port 50051");
        server.awaitTermination();
    }

    static class BidirectionalStreamingGreeterImpl extends BidirectionalStreamingGreeterGrpc.BidirectionalStreamingGreeterImplBase{
        @Override
        public StreamObserver<HelloRequest> streamHello(StreamObserver<HelloReply> responseObserver) {
            return new StreamObserver<HelloRequest>() {
                @Override
                public void onNext(HelloRequest helloRequest) {
                    HelloReply reply = HelloReply.newBuilder()
                            .setMessage("Hello, " + helloRequest.getName())
                            .build();
                    responseObserver.onNext(reply);
                }

                @Override
                public void onError(Throwable throwable) {

                }

                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

4.4 结果

在这里插入图片描述

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐