GVKun编程网logo

基于 Google Protobuff 和 Mina 的 RPC(google protobuffer)

17

关于基于GoogleProtobuff和Mina的RPC和googleprotobuffer的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于CGoogleProtocolBuffers.R

关于基于 Google Protobuff 和 Mina 的 RPCgoogle protobuffer的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于C Google Protocol Buffers. RPC怎么样?、C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能、com.google.protobuf.BlockingRpcChannel的实例源码、com.google.protobuf.DescriptorProtos.EnumOptions的实例源码等相关知识的信息别忘了在本站进行查找喔。

本文目录一览:

基于 Google Protobuff 和 Mina 的 RPC(google protobuffer)

基于 Google Protobuff 和 Mina 的 RPC(google protobuffer)

## 基于 Google Protobuff 和 Mina 的 RPC##

RPC(Remote procedure call):In computer science, a remote procedure call (RPC) is an inter-process communication(IPC) that allows a computer program to cause a subroutine or procedure to execute in another address space (commonly on another computer on a shared network) without the programmer explicitly coding the details for this remote interaction. That is, the programmer writes essentially the same code whether the subroutine is local to the executing program, or remote.

简单思想:RPC 框架将客户端调用的 [服务名称、方法名称、方法参数] 打包,通过网络发送到服务端,服务端将数据包解包,在本地查找请求的服务、在服务中调用使用传来的参数调用请求的方法,最后将执行结果打包,通过网络再返回给客户端。类似 Restful 请求,后者根据 URI 来定为服务的请求,最终通过 HTTP 返回 JSON 格式是数据,RPC 一般都封装了底层的网络服务,传输是基于 socket 的字节流数据包,当然也可以简单的使用 HTTP 来传输 JSON 格式的请求和应答。Restful 的本质思想略有不同。

RPC 中的事件流:

  1. The client calls the client stub. The call is a local procedure call, with parameters pushed on to the stack in the normal way.
  2. The client stub packs the parameters into a message and makes a system call to send the message. Packing the parameters is called marshalling.
  3. The client''s local operating system sends the message from the client machine to the server machine.
  4. The local operating system on the server machine passes the incoming packets to the server stub.
  5. The server stub unpacks the parameters from the message. Unpacking the parameters is called unmarshalling.
  6. Finally, the server stub calls the server procedure. The reply traces the same steps in the reverse direction.

Why using Protobuff?

为了让多种不同的 Clients(不同语言的客户端方法、对象、参数都不相同)可以访问共同的 Server,通常使用 IDL (Interface description language) 来描述不同平台的 RPC 服务。大家都使用 server 使用 IDL 定义的接口,来调用 server 上的方法,这样才有共同的语言。

What is Protobuff?

Protobuff 是 Google 提供的一种跨语言的对象序列化方案,同时又支持了 IDL 的功能,在 Google 内部广泛使用。类似的有 Facebook 的 thrift,但是后者提供了 RPC 的实现,而 Protobuff 只是提供了机制,没有具体的实现,网上有很多对二者的比较。Protobuff 的官方文档 https://developers.google.com/protocol-buffers/docs/overview

有了共同的语言,还需要解决的是数据传输,客户端的请求和数据需要传送到服务端,此时就需要网络通信了。Java 里面使用 Mina 和 Nettry 都可以,网上也有很多二者比较。

Protobuff 官方列出了一些第三方的 RPC 实现 https://code.google.com/p/protobuf/wiki/ThirdPartyAddOns#RPC_Implementations

简单学习下 https://github.com/jcai/fepss-rpc 的实现(貌似 Protobuff 现在不按这个模式搞了,使用 code generator plugins 了)

### 定义 RPC 沟通的 IDL### .proto 文件中定义了错误码、RPC 请求对象、RPC 响应对象 package com.aquar.rpc;

option java_package = "com.aquar.rpc";
option java_outer_classname = "RpcProtobuf";
option optimize_for = SPEED;

// Possible error reasons
enum ErrorReason {
BAD_REQUEST_DATA = 0;
BAD_REQUEST_PROTO = 1;
SERVICE_NOT_FOUND = 2;
METHOD_NOT_FOUND = 3;
RPC_ERROR = 4;
RPC_FAILED = 5;
CLIENT_FAILED=6;
}

message Request {
// RPC request id, used to identify different request
optional uint32 id = 1;

// RPC service full name
required string service_name = 2;

// RPC method name
required string method_name = 3;

// RPC request proto
required bytes request_proto = 4;
}

message Response {
// RPC request id
optional uint32 id = 1;

// RPC response proto
optional bytes response_proto = 2;

// Eror, if any
optional string error = 3;

// Was callback invoked
optional bool callback = 4 [default = false];

// Error Reason
optional ErrorReason error_reason = 5;
}

编译生成 RpcProtobuf 类文件 protoc -I=./--java_out=./src ./rpc.proto

###Server 端定义 ### 定义 Mina 用到的编码和解码对象

字节流中结构为 整数 + Message 对象,其中整数表明整个消息大小,Message 为 RPC 请求或响应对象

Encoder:

<!-- lang: java -->

Message msg = (Message) message;
int size = msg.getSerializedSize();
IoBuffer buffer = IoBuffer.allocate(SizeContext.computeTotal(size));
CodedOutputStream cos = CodedOutputStream.newInstance(buffer.asOutputStream());
cos.writeRawVarint32(size);
msg.writeTo(cos);
cos.flush();
buffer.flip();
out.write(buffer);

Decoder: extends CumulativeProtocolDecoder

<!-- lang: java -->

@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
	SizeContext ctx = SizeContext.get(session, in);
	if(ctx.hasEnoughData(in)) {
		try	{
			decodeMessage(in, out, ctx);
			return true;
		} finally {
			ctx.shiftPositionAndReset(session, in);
		}
	}
	return false;
}

private void decodeMessage(IoBuffer in, ProtocolDecoderOutput out,
		SizeContext ctx) throws IOException {
	Message.Builder builder = newBuilder();
	ctx.getInputStream(in).readMessage(builder, ExtensionRegistry.getEmptyRegistry());
	out.write(builder.build());
}
protected abstract Message.Builder newBuilder();

其中使用了 SizeContext 类来辅助统计字节个数以及 buffer 相关计算

和普通的 Mina 服务端一样初始化服务端

<!-- lang: java -->

int processorCount = Runtime.getRuntime().availableProcessors();
acceptor = new NioSocketAcceptor(processorCount);
acceptor.setReuseAddress(true);
acceptor.getSessionConfig().setReuseAddress(true);
acceptor.getSessionConfig().setReceiveBufferSize(1024);
acceptor.getSessionConfig().setSendBufferSize(1024);
acceptor.getSessionConfig().setTcpNoDelay(true);
acceptor.getSessionConfig().setSoLinger(-1);
acceptor.setBacklog(1024);

acceptor.setDefaultLocalAddress(new InetSocketAddress(port));
DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
chain.addLast("protobuf", new ProtocolCodecFilter(
		new ProtobufEncoder(), new ProtobufDecoder() {
			@Override
			protected Builder newBuilder() {
				return RpcProtobuf.Request.newBuilder();
			}
		}));

acceptor.setHandler(ioHandler);
acceptor.bind(new InetSocketAddress(port));

服务端的 IoHandler

  • 在 messageReceived 中根据请求的服务名称,从服务列表中找到需要的服务 service
  • 从 service 中 findMethodByName (rpcRequest.getMethodName ()) 找到方法 method
  • 解析出请求的对象 request = service.getRequestPrototypemethod).newBuilderForType ().mergeFrom (rpcRequest.getRequestProto ());
  • service 调用方法 method service.callMethod (method, controller, request, callback);
  • 生成 RpcResponse 对象,写入 session responseBuilder.setCallback (true).setResponseProto (callback.response.toByteString ());

其中 Callback 类主要用来保存具体的 Service 的 method 方法需要返回的对象,可以看作一个 wrapper,实际的实现方法将需要返回给客户端的对象塞到 callback 里面,服务端在从 callback 中将该对象取出来,存入 RpcResponse 中,客户端通过解析 RpcResponse 的 ResponseProto 就得到了服务端方法返回的对象值。

<!-- lang: java -->

private Map<String, Service> services;

@Override
public void messageReceived(IoSession session, Object message)
        throws Exception {
    Request rpcRequest = (Request) message;
    if (rpcRequest == null) {
        throw new RpcException(ErrorReason.BAD_REQUEST_DATA,
                "request data is null!");
    }
    // Get the service/method
    Service service = services.get(rpcRequest.getServiceName());
    if (service == null) {
        throw new RpcException(ErrorReason.SERVICE_NOT_FOUND,
                "could not find service: " + rpcRequest.getServiceName());
    }
    MethodDescriptor method = service.getDescriptorForType()
            .findMethodByName(rpcRequest.getMethodName());
    if (method == null) {
        throw new RpcException(ErrorReason.METHOD_NOT_FOUND, String.format(
                "Could not find method %s in service %s", rpcRequest
                        .getMethodName(), service.getDescriptorForType()
                        .getFullName()));
    }

    // Parse request
    Message.Builder builder = null;
    try {
        builder = service.getRequestPrototype(method).newBuilderForType()
                .mergeFrom(rpcRequest.getRequestProto());
        if (!builder.isInitialized()) {
            throw new RpcException(ErrorReason.BAD_REQUEST_PROTO,
                    "Invalid request proto");
        }
    } catch (InvalidProtocolBufferException e) {
        throw new RpcException(ErrorReason.BAD_REQUEST_PROTO, e);
    }
    Message request = builder.build();

    // Call method
    RpcControllerImpl controller = new RpcControllerImpl();
    Callback callback = new Callback();
    try {
        service.callMethod(method, controller, request, callback);
    } catch (RuntimeException e) {
        throw new RpcException(ErrorReason.RPC_ERROR, e);
    }

    // Build and return response (callback is optional)
    Builder responseBuilder = Response.newBuilder();
    if (callback.response != null) {
        responseBuilder.setCallback(true).setResponseProto(
                callback.response.toByteString());
    } else {
        // Set whether callback was called
        responseBuilder.setCallback(callback.invoked);
    }
    if (controller.failed()) {
        responseBuilder.setError(controller.errorText());
        responseBuilder.setErrorReason(ErrorReason.RPC_FAILED);
    }
    Response rpcResponse = responseBuilder.build();
    outputResponse(session, rpcResponse);
}
/**
 * Callback that just saves the response and the fact that it was invoked.
 */
private class Callback implements RpcCallback<Message> {

    private Message response;
    private boolean invoked = false;

    public void run(Message response) {
        this.response = response;
        invoked = true;
    }
}

RpcChannel 的实现
RpcChannel is Abstract interface for an RPC channel. An RpcChannel represents a communication line to a Service which can be used to call that Service''s methods. The Service may be running on another machine. Normally, you should not call an RpcChannel directly, but instead construct a stub Service wrapping it. Starting with version 2.3.0, RPC implementations should not try to build on this, but should instead provide code generator plugins which generate code specific to the particular RPC implementation. This way the generated code can be more appropriate for the implementation in use and can avoid unnecessary layers of indirection.

RpcChannel 主要在客户端实现,可以和服务端使用完全不同的语言或者通信框架

在 callMethod 的实现中主要处理:

  • 初始化 Mina 客户端
  • 实现客户端的 IoHandler:

a. 在 sessionOpened () 中根据 callMethod 传入的参数创建 RpcRequest 对象,并写入 session
b. 在 messageReceived () 方法中,解析 Server 传来的 RpcReponse,将它转为客户端所调用的方法的返回值,即 RpcReponse 中的 response_proto 所对应的返回值 c. 最终回调给客户端请求服务方法时传入的 RpcCallback 对象中,从而将实际的返回值传给了客户端

<!-- lang: java -->

public void callMethod(final MethodDescriptor method,
        final RpcController controller, final Message request,
        final Message responsePrototype, final RpcCallback<Message> done) {
    // check rpc request
    if (!request.isInitialized()) {
        throw new RpcException(ErrorReason.BAD_REQUEST_DATA,
                "request uninitialized!");
    }

    // using MINA IoConnector
    IoConnector connector = new NioSocketConnector();

    // add protocol buffer codec
    DefaultIoFilterChainBuilder chain = connector.getFilterChain();
    chain.addLast("protobuf", new ProtocolCodecFilter(
            new ProtobufEncoder(), new ProtobufDecoder() {
                @Override
                protected Message.Builder newBuilder() {
                    return RpcProtobuf.Response.newBuilder();
                }
            }));

    // connector handler
    connector.setHandler(new IoHandlerAdapter() {
        @Override
        public void messageReceived(IoSession session, Object message)
                throws Exception {
            Response rpcResponse = (Response) message;
            handleResponse(responsePrototype, rpcResponse, controller, done);
            session.close(true);
        }

        /**
         * @see org.apache.mina.core.service.IoHandlerAdapter#sessionOpened(org.apache.mina.core.session.IoSession)
         */
        @Override
        public void sessionOpened(IoSession session) throws Exception {
            ((SocketSessionConfig) session.getConfig()).setKeepAlive(true);

            // Create request protocol buffer
            Request rpcRequest = Request.newBuilder()
                    .setRequestProto(request.toByteString())
                    .setServiceName(method.getService().getFullName())
                    .setMethodName(method.getName()).build();
            // Write request
            session.write(rpcRequest);
        }

        /**
         * @see org.apache.mina.core.service.IoHandlerAdapter#exceptionCaught(org.apache.mina.core.session.IoSession,
         *      java.lang.Throwable)
         */
        @Override
        public void exceptionCaught(IoSession session, Throwable cause)
                throws Exception {
            StringBuilder errorBuilder = new StringBuilder();
            errorBuilder.append("client has runtime exception!\n");
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            cause.printStackTrace(new PrintStream(out));
            errorBuilder.append(out.toString());
            controller.setFailed(errorBuilder.toString());
        }

    });

    // connect remote server
    ConnectFuture cf = connector.connect(new InetSocketAddress(host, port));
    try {
        cf.awaitUninterruptibly();// wait to connect remote server
        cf.getSession().getCloseFuture().awaitUninterruptibly();
    } finally {
        connector.dispose();
    }
}

private void handleResponse(Message responsePrototype,
        Response rpcResponse, RpcController controller,
        RpcCallback<Message> callback) {
    // Check for error
    if (rpcResponse.hasError()) {
        ErrorReason reason = rpcResponse.getErrorReason();
        controller
                .setFailed(reason.name() + " : " + rpcResponse.getError());
        return;
    }

    if ((callback == null) || !rpcResponse.getCallback()) {
        // No callback needed
        return;
    }

    if (!rpcResponse.hasResponseProto()) {
        // Callback was called with null on server side
        callback.run(null);
        return;
    }
    try {
        Message.Builder builder = responsePrototype.newBuilderForType()
                .mergeFrom(rpcResponse.getResponseProto());
        Message response = builder.build();
        //调用客户端请求时定义的回调接口,将实际的返回值response传个客户端
        callback.run(response);
    } catch (InvalidProtocolBufferException e) {
        throw new RuntimeException(e);
    }
}

### 定义一个 Service### 自定义一个查找游戏的服务,proto 文件:
package com.aquar.rpc.services;

option java_package = "com.aquar.rpc.services"; option java_outer_classname = "GameServiceProto"; option java_generic_services = true;

message Game{
optional string gameName = 1;
}

message Result{
optional string result=1;
optional bool success=2;
}

service GameService {
rpc findGame(Game) returns(Result);
}

protoc -I=./ --java_out=./src ./GameService.proto

###Server 端启动 ### 主要是初始化服务列表,把服务端端口绑定运行起来

<!-- lang: java -->

public class ServerDemo {
public static String host = "127.0.0.1";
public static int port = 5566;
public static void main(String[] args) {
    Map<String, Service> services = new HashMap<String, Service>();
    services.put(GameService.getDescriptor().getFullName(), new GameServiceImpl());
    ServerIoHandler ioHandler = new ServerIoHandler(services);
    RpcServer server = new RpcServer(host, port, ioHandler);
    try {
        server.start();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

} ###Client 端请求 ###

  • 创建 RpcChannel 对象
  • 创建服务 Stub
  • 创建请求方法的参数以及服务端执行完毕后用到的 RpcCallback<Result> 回调对象
  • 调用服务定义的方法

<!-- lang: java -->

public static void main(String[] args) {
    RpcChannelImpl channel = new RpcChannelImpl(ServerDemo.host, ServerDemo.port);
    RpcControllerImpl controller = channel.newRpcController();
    Stub service = GameService.newStub(channel); 
    Game request = Game.newBuilder().setGameName("FIFA").build();
    RpcCallback<Result> done = new RpcCallback<GameServiceProto.Result>() {
        
        @Override
        public void run(Result result) {
            if (result.getSuccess()) {
                System.out.println("Client get " + result.getResult());
            }
        }
    };
    System.out.println("Client request Gameservice for findGame: " + request.getGameName());
    service.findGame(controller, request, done);
}

C Google Protocol Buffers. RPC怎么样?

C Google Protocol Buffers. RPC怎么样?

我已经按照谷歌RPC上的3个教程尝试了3个不同的来源.而且总是同样的问题.我无法让它发挥作用.我现在非常接近,我正在使用 protoserv.我现在遇到的问题是在他们的教程中说:

int main(int argc,char* argv[]) {
   EchoServiceImpl echo_service;
   RpcManager rpcman;
   ...

我找不到如何访问RpcManager及其位置.我对protobuf和protoserv dir做了一个grep无济于事.在.java包中只有RpcManager.也许它只是一个非supoorted版本,即使它在C中有例子.

任何帮助将不胜感激.即使是稳定/简单的C protobuf实现的链接也会很棒.

解决方法

对于这个问题,我的答案会很晚,但它对于理解/实现Protobuf RPC系统会很有用.

有关如何构建处理原始请求和响应的HTTP服务器,请检查pbrpc.

有关如何处理请求和生成响应(构建RPC系统),请查看libpbrpc.

每个项目只包含几个源文件,它们不会花费很长时间来阅读.

C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能

C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能

  初识gRPC还是一位做JAVA的同事在项目中用到了它,为了C#的客户端程序和java的服务器程序进行通信和数据交换,当时还是对方编译成C#,我直接调用。

  后来,自己下来做了C#版本gRPC编写,搜了很多资料,但许多都是从入门开始?调用说“Say Hi!”这种官方标准的入门示例,然后遇到各种问题……

  关于gRPC和Protobuf介绍,就不介绍了,网络上一搜一大把,随便一抓都是标准的官方,所以直接从使用说起。

  gPRC源代码:https://github.com/grpc/grpc;

  protobuf的代码仓库:

github仓库地址:https://github.com/google/protobuf;

Google下载protobuff下载地址:https://developers.google.com/protocol-buffers/docs/downloads。

1、新建解决方案

  分别在VS中新建解决方案:GrpcTest;再在解决方案中新建三个项目:GrpcClient、GrpcServer、GrpcService,对应的分别是客户端(wpf窗体程序)、服务端(控制台程序)、gRPC服务者(控制台程序)。在GrpcClient和GrpcServer项目中添加对GrpcService的引用。

  在VS中对3个项目添加工具包引用:右键点击“解决方案gRPCDemo”,点击“管理解决方案的NuGet程序包”,在浏览中分别搜索"Grpc"、"Grpc.Tools"、"Google.Protobuf",然后点击右面项目,全选,再点击安装(也可以用视图 -> 窗口 -> 程序包管理器控制台 中的"Install-Package Grpc"进行这一步,这里不提供这种方法,有兴趣自己百度)。

2、proto文件的语法

  对于使用gRPC的通信框架,需要使用到对应的通信文件。在gRPC中,使用到的是proto格式的文件,对应的自然有其相应的语法。本文不详细阐述该文件的语法,感兴趣可以去官网看标准的语法,这儿有一个链接,中文翻译比较全的https://www.codercto.com/a/45372.html。需要对其文章内的1.3进行补充下:

  • required:一个格式良好的消息一定要含有1个这种字段。表示该值是必须要设置的。
  • optional:消息格式中该字段可以有0个或1个值(不超过1个)。
  • repeated:在一个格式良好的消息中,这种字段可以重复任意多次(包括0次)。重复的值的顺序会被保留。表示该值可以重复,相当于java中的List。

  本示例项目实现文件传输,因此在项目GrpcService中添加一个FileTransfer.proto文件,文件内容如下:

syntax = "proto3";
package GrpcService;

service FileTransfer{
 rpc FileDownload (FileRequest) returns (stream FileReply);
 rpc FileUpload (stream FileReply) returns(stream FileReturn);
}

//请求下载文件时,所需下载文件的文件名称集合
message FileRequest{
 repeated string FileNames=1;//文件名集合
 //repeated重复字段 类似链表;optional可有可无的字段;required必要设置字段
 string Mark = 2;//携带的包
}

//下载和上传文件时的应答数据
message FileReply{
 string FileName=1;//文件名
 int32 Block = 2;//标记---第几个数据
 bytes Content = 3;//数据
 string Mark = 4;//携带的包
 }

//数据上传时的返回值
message FileReturn{
 string FileName=1;//文件名
 string Mark = 2;//携带的包
}

3、编译proto文件为C#代码

  proto文件仅仅只是定义了相关的数据,如果需要在代码中使用该格式,就需要将它编译成C#代码文件。

    PS:网上可以找到的编译,需要下载相关的代码,见博文。其他的也较为繁琐,所以按照自己理解的来写了。注意,我的项目是放在D盘根目录下的。

  首先打开cmd窗口,然后在窗口中输入:D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\protoc.exe -ID:\GrpcTest\GrpcService --csharp_out D:\GrpcTest\GrpcService D:\GrpcTest\GrpcService\FileTransfer.proto --grpc_out D:\GrpcTest\GrpcService --plugin=protoc-gen-grpc=D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\grpc_csharp_plugin.exe

  输入上文后,按enter键,回车编译。

  命令解读:

  • D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\protoc.exe :调用的编译程序路径,注意版本不同路径稍有不一样。
  • -ID:\GrpcTest\GrpcService :-I 指定一个或者多个目录,用来搜索.proto文件的。所以上面那行的D:\GrpcTest\GrpcService\FileTransfer.proto 已经可以换成FileTransfer.proto了,因为-I已经指定了。注意:如果不指定,那就是当前目录。
  •  --csharp_out D:\GrpcTest\GrpcService D:\GrpcTest\GrpcService\FileTransfer.proto :(--csharp_out)生成C#代码、存放路径、文件。当然还能cpp_out、java_out、javanano_out、js_out、objc_out、php_out、python_out、ruby_out 这时候你就应该知道,可以支持多语言的,才用的,生成一些文件,然后给各个语言平台调用。参数1(D:\GrpcTest\GrpcService)是输出路径,参数2(D:\GrpcTest\GrpcService\FileTransfer.proto)是proto的文件名或者路径。
  •  --grpc_out D:\GrpcTest\GrpcService :grpc_out是跟服务相关,创建,调用,绑定,实现相关。生成的玩意叫xxxGrpc.cs。与前面的区别是csharp_out是输出类似于咱们平时写的实体类,接口,定义之类的。生成的文件叫xxx.cs
  • --plugin=protoc-gen-grpc=D:\GrpcTest\packages\Grpc.Tools.2.32.0\tools\windows_x86\grpc_csharp_plugin.exe :这个就是csharp的插件,python有python的,java有java的。

  编译后,会在新增两个文件(文件位置与你的输出位置有关),并将两个文件加入到GrpcService项目中去:

    

4、编写服务端的文件传输服务

  在GrpcServer项目中,新建一个FileImpl并继承自GrpcService.FileTransfer.FileTransferBase,然后复写其方法FileDownload和FileUpload方法,以供客户端进行调用。

/// <summary>
/// 文件传输类
/// </summary>
class FileImpl:GrpcService.FileTransfer.FileTransferBase
{
 /// <summary>
 /// 文件下载
 /// </summary>
 /// <param name="request">下载请求</param>
 /// <param name="responseStream">文件写入流</param>
 /// <param name="context">站点上下文</param>
 /// <returns></returns>
 public override async Task FileDownload(FileRequest request, global::Grpc.Core.IServerStreamWriter<FileReply> responseStream, global::Grpc.Core.ServerCallContext context)
 {
 List<string> lstSuccFiles = new List<string>();//传输成功的文件
 DateTime startTime = DateTime.Now;//传输文件的起始时间
 int chunkSize = 1024 * 1024;//每次读取的数据
 var buffer = new byte[chunkSize];//数据缓冲区
 FileStream fs = null;//文件流
 try
 {
  //reply.Block数字的含义是服务器和客户端约定的
  for (int i = 0; i < request.FileNames.Count; i++)
  {
  string fileName = request.FileNames[i];//文件名
  string filePath = Path.GetFullPath($".//Files\\{fileName}");//文件路径
  FileReply reply = new FileReply
  {
   FileName = fileName,
   Mark = request.Mark
  };//应答数据
  Console.WriteLine($"{request.Mark},下载文件:{filePath}");//写入日志,下载文件
  if (File.Exists(filePath))
  {
   fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, useAsync: true);

   //fs.Length 可以告诉客户端所传文件大小
   int readTimes = 0;//读取次数
   while (true)
   {
   int readSise = fs.Read(buffer, 0, buffer.Length);//读取数据
   if (readSise > 0)//读取到了数据,有数据需要发送
   {
    reply.Block = ++readTimes;
    reply.Content = Google.Protobuf.ByteString.CopyFrom(buffer, 0, readSise);
    await responseStream.WriteAsync(reply);
   }
   else//没有数据了,就告诉对方,读取完了
   {
    reply.Block = 0;
    reply.Content = Google.Protobuf.ByteString.Empty;
    await responseStream.WriteAsync(reply);
    lstSuccFiles.Add(fileName);
    Console.WriteLine($"{request.Mark},完成发送文件:{filePath}");//日志,记录发送成功
    break;//跳出去
   }
   }
   fs?.Close();
  }
  else
  {
   Console.WriteLine($"文件【{filePath}】不存在。");//写入日志,文件不存在
   reply.Block = -1;//-1的标记为文件不存在
   await responseStream.WriteAsync(reply);//告诉客户端,文件状态
  }
  }
  //告诉客户端,文件传输完成
  await responseStream.WriteAsync(new FileReply
  {
  FileName = string.Empty,
  Block = -2,//告诉客户端,文件已经传输完成
  Content = Google.Protobuf.ByteString.Empty,
  Mark = request.Mark
  });
 }
 catch(Exception ex)
 {
  Console.WriteLine($"{request.Mark},发生异常({ex.GetType()}):{ex.Message}");
 }
 finally
 {
  fs?.Dispose();
 }
 Console.WriteLine($"{request.Mark},文件传输完成。共计【{lstSuccFiles.Count / request.FileNames.Count}】,耗时:{DateTime.Now - startTime}");
 }


 /// <summary>
 /// 上传文件
 /// </summary>
 /// <param name="requestStream">请求流</param>
 /// <param name="responseStream">响应流</param>
 /// <param name="context">站点上下文</param>
 /// <returns></returns>
 public override async Task FileUpload(global::Grpc.Core.IAsyncStreamReader<FileReply> requestStream, global::Grpc.Core.IServerStreamWriter<FileReturn> responseStream, global::Grpc.Core.ServerCallContext context)
 {
 List<string> lstFilesName = new List<string>();//文件名
 List<FileReply> lstContents = new List<FileReply>();//数据集合

 FileStream fs = null;
 DateTime startTime = DateTime.Now;//开始时间
 string mark = string.Empty;
 string savePath = string.Empty;
 try
 {
  //reply.Block数字的含义是服务器和客户端约定的
  while (await requestStream.MoveNext())//读取数据
  {
  var reply = requestStream.Current;
  mark = reply.Mark;
  if (reply.Block == -2)//传输完成
  {
   Console.WriteLine($"{mark},完成上传文件。共计【{lstFilesName.Count}】个,耗时:{DateTime.Now-startTime}");
   break;
  }
  else if (reply.Block == -1)//取消了传输
  {
   Console.WriteLine($"文件【{reply.FileName}】取消传输!");//写入日志
   lstContents.Clear();
   fs?.Close();//释放文件流
   if (!string.IsNullOrEmpty(savePath) && File.Exists(savePath))//如果传输不成功,删除该文件
   {
   File.Delete(savePath);
   }
   savePath = string.Empty;
   break;
  }
  else if(reply.Block==0)//文件传输完成
  {
   if (lstContents.Any())//如果还有数据,就写入文件
   {
   lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs));
   lstContents.Clear();
   }
   lstFilesName.Add(savePath);//传输成功的文件
   fs?.Close();//释放文件流
   savePath = string.Empty;

   //告知客户端,已经完成传输
   await responseStream.WriteAsync(new FileReturn
   {
   FileName= reply.FileName,
   Mark=mark
   });
  }
  else
  {
   if(string.IsNullOrEmpty(savePath))//有新文件来了
   {
   savePath = Path.GetFullPath($".//Files\\{reply.FileName}");//文件路径
   fs = new FileStream(savePath, FileMode.Create, FileAccess.ReadWrite);
   Console.WriteLine($"{mark},上传文件:{savePath},{DateTime.UtcNow.ToString("HH:mm:ss:ffff")}");
   }
   lstContents.Add(reply);//加入链表
   if (lstContents.Count() >= 20)//每个包1M,20M为一个集合,一起写入数据。
   {
   lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs));
   lstContents.Clear();
   }
  }
  }
 }
 catch(Exception ex)
 {
  Console.WriteLine($"{mark},发生异常({ex.GetType()}):{ex.Message}");
 }
 finally
 {
  fs?.Dispose();
 }
 }
}

  在main函数中添加服务:

class Program
{
 static void Main(string[] args)
 {
 //提供服务
 Server server = new Server()
 {
  Services = {GrpcService.FileTransfer.BindService(new FileImpl())},
  Ports = {new ServerPort("127.0.0.1",50000,ServerCredentials.Insecure)}
 };
 //服务开始
 server.Start();

 while(Console.ReadLine().Trim().ToLower()!="exit")
 {

 }
 //结束服务
 server.ShutdownAsync();
 }
}

5、编写客户端的文件传输功能

  首先定义一个文件传输结果类TransferResult<T>,用于存放文件的传输结果。

/// <summary>
/// 传输结果
/// </summary>
/// <typeparam name="T"></typeparam>
class TransferResult<T>
{
 /// <summary>
 /// 传输是否成功
 /// </summary>
 public bool IsSuccessful { get; set; }
 /// <summary>
 /// 消息
 /// </summary>
 public string Message { get; set; }

 /// <summary>
 /// 标记类型
 /// </summary>
 public T Tag { get; set; } = default;
}

  然后在GrpcClinet项目中添加一个FileTransfer的类,并实现相关方法:

class FileTransfer
{

 /// <summary>
 /// 获取通信客户端
 /// </summary>
 /// <returns>通信频道、客户端</returns>
 static (Channel, GrpcService.FileTransfer.FileTransferClient) GetClient()
 {
 //侦听IP和端口要和服务器一致
 Channel channel = new Channel("127.0.0.1", 50000, ChannelCredentials.Insecure);
 var client = new GrpcService.FileTransfer.FileTransferClient(channel);
 return (channel, client);
 }

 /// <summary>
 /// 下载文件
 /// </summary>
 /// <param name="fileNames">需要下载的文件集合</param>
 /// <param name="mark">标记</param>
 /// <param name="saveDirectoryPath">保存路径</param>
 /// <param name="cancellationToken">异步取消命令</param>
 /// <returns>下载任务(是否成功、原因、失败文件名)</returns>
 public static async Task<TransferResult<List<string>>> FileDownload(List<string> fileNames, string mark, string saveDirectoryPath, System.Threading.CancellationToken cancellationToken = new System.Threading.CancellationToken())
 {
 var result = new TransferResult<List<string>>() { Message = $"文件保存路径不正确:{saveDirectoryPath}" };
 if (!System.IO.Directory.Exists(saveDirectoryPath))
 {
  return await Task.Run(() => result);//文件路径不存在
 }
 if (fileNames.Count == 0)
 {
  result.Message = "未包含任何文件";
  return await Task.Run(() => result);//文件路径不存在
 }
 result.Message = "未能连接到服务器";
 FileRequest request = new FileRequest() { Mark = mark };//请求数据
 request.FileNames.AddRange(fileNames);//将需要下载的文件名赋值
 var lstSuccFiles = new List<string>();//传输成功的文件
 string savePath = string.Empty;//保存路径
 System.IO.FileStream fs = null;
 Channel channel = null;//申明通信频道
 GrpcService.FileTransfer.FileTransferClient client = null;
 DateTime startTime = DateTime.Now;
 try
 {
  (channel, client) = GetClient();
  using (var call = client.FileDownload(request))
  {
  List<FileReply> lstContents = new List<FileReply>();//存放接收的数据
  var reaponseStream = call.ResponseStream;
  //reaponseStream.Current.Block数字的含义是服务器和客户端约定的
  while (await reaponseStream.MoveNext(cancellationToken))//开始接收数据
  {
   if (cancellationToken.IsCancellationRequested)
   {
   break;
   }
   if (reaponseStream.Current.Block == -2)//说明文件已经传输完成了
   {
   result.Message = $"完成下载任务【{lstSuccFiles.Count}/{fileNames.Count}】,耗时:{DateTime.Now - startTime}";
   result.IsSuccessful = true;
   break;
   }
   else if (reaponseStream.Current.Block == -1)//当前文件传输错误
   {
   Console.WriteLine($"文件【{reaponseStream.Current.FileName}】传输失败!");//写入日志
   lstContents.Clear();
   fs?.Close();//释放文件流
   if (!string.IsNullOrEmpty(savePath) && File.Exists(savePath))//如果传输不成功,删除该文件
   {
    File.Delete(savePath);
   }
   savePath = string.Empty;
   }
   else if (reaponseStream.Current.Block == 0)//当前文件传输完成
   {
   if (lstContents.Any())//如果还有数据,就写入文件
   {
    lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs));
    lstContents.Clear();
   }
   lstSuccFiles.Add(reaponseStream.Current.FileName);//传输成功的文件
   fs?.Close();//释放文件流
   savePath = string.Empty;
   }
   else//有文件数据过来
   {
   if (string.IsNullOrEmpty(savePath))//如果字节流为空,则说明时新的文件数据来了
   {
    savePath = Path.Combine(saveDirectoryPath, reaponseStream.Current.FileName);
    fs = new FileStream(savePath, FileMode.Create, FileAccess.ReadWrite);
   }
   lstContents.Add(reaponseStream.Current);//加入链表
   if (lstContents.Count() >= 20)//每个包1M,20M为一个集合,一起写入数据。
   {
    lstContents.OrderBy(c => c.Block).ToList().ForEach(c => c.Content.WriteTo(fs));
    lstContents.Clear();
   }
   }
  }
  }
  fs?.Close();//释放文件流
  if (!result.IsSuccessful &&!string.IsNullOrEmpty(savePath)&& File.Exists(savePath))//如果传输不成功,那么久删除该文件
  {
  File.Delete(savePath);
  }
 }
 catch (Exception ex)
 {
  if (cancellationToken.IsCancellationRequested)
  {
  fs?.Close();//释放文件流
  result.IsSuccessful = false;
  result.Message = $"用户取消下载。已完成下载【{lstSuccFiles.Count}/{fileNames.Count}】,耗时:{DateTime.Now - startTime}";
  }
  else
  {
  result.Message = $"文件传输发生异常:{ex.Message}";
  }
 }
 finally
 {
  fs?.Dispose();
 }
 result.Tag = fileNames.Except(lstSuccFiles).ToList();//获取失败文件集合
 //关闭通信、并返回结果
 return await channel?.ShutdownAsync().ContinueWith(t => result);
 }


 /// <summary>
 /// 文件上传
 /// </summary>
 /// <param name="filesPath">文件路径</param>
 /// <param name="mark">标记</param>
 /// <param name="cancellationToken">异步取消命令</param>
 /// <returns>下载任务(是否成功、原因、成功的文件名)</returns>
 public static async Task<TransferResult<List<string>>> FileUpload(List<string> filesPath, string mark, System.Threading.CancellationToken cancellationToken=new System.Threading.CancellationToken())
 {
 var result = new TransferResult<List<string>> { Message = "没有文件需要下载" };
 if (filesPath.Count == 0)
 {
  return await Task.Run(() => result);//没有文件需要下载
 }
 result.Message = "未能连接到服务器。";
 var lstSuccFiles = new List<string>();//传输成功的文件
 int chunkSize = 1024 * 1024;
 byte[] buffer = new byte[chunkSize];//每次发送的大小
 FileStream fs = null;//文件流
 Channel channel = null;//申明通信频道
 GrpcService.FileTransfer.FileTransferClient client = null;
 DateTime startTime = DateTime.Now;
 try
 {
  (channel, client) = GetClient();
  using(var stream=client.FileUpload())//连接上传文件的客户端
  {
  //reply.Block数字的含义是服务器和客户端约定的
  foreach (var filePath in filesPath)//遍历集合
  {
   if(cancellationToken.IsCancellationRequested)
   break;//取消了传输
   FileReply reply = new FileReply()
   {
   FileName=Path.GetFileName(filePath),
   Mark=mark
   };
   if(!File.Exists(filePath))//文件不存在,继续下一轮的发送
   {
   Console.WriteLine($"文件不存在:{filePath}");//写入日志
   continue;
   }
   fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, FileShare.Read, chunkSize, useAsync: true);
   int readTimes = 0;
   while(true)
   {
   if (cancellationToken.IsCancellationRequested)
   {
    reply.Block = -1;//取消了传输
    reply.Content = Google.Protobuf.ByteString.Empty;
    await stream.RequestStream.WriteAsync(reply);//发送取消传输的命令
    break;//取消了传输
   }
   int readSize = fs.Read(buffer, 0, buffer.Length);//读取数据
   if(readSize>0)
   {
    reply.Block = ++readTimes;//更新标记,发送数据
    reply.Content = Google.Protobuf.ByteString.CopyFrom(buffer, 0, readSize);
    await stream.RequestStream.WriteAsync(reply);
   }
   else
   {
    Console.WriteLine($"完成文件【{filePath}】的上传。");
    reply.Block = 0;//传送本次文件发送结束的标记
    reply.Content = Google.Protobuf.ByteString.Empty;
    await stream.RequestStream.WriteAsync(reply);//发送结束标记
    //等待服务器回传
    await stream.ResponseStream.MoveNext(cancellationToken);
    if(stream.ResponseStream.Current!=null&&stream.ResponseStream.Current.Mark==mark)
    {
    lstSuccFiles.Add(filePath);//记录成功的文件
    }
    break;//发送下一个文件
   }
   }
   fs?.Close();
  }
  if (!cancellationToken.IsCancellationRequested)
  {
   result.IsSuccessful = true;
   result.Message = $"完成文件上传。共计【{lstSuccFiles.Count}/{filesPath.Count}】,耗时:{DateTime.Now - startTime}";

   await stream.RequestStream.WriteAsync(new FileReply
   {
   Block = -2,//传输结束
   Mark = mark
   }) ;//发送结束标记
  }
  }
 }
 catch(Exception ex)
 {
  if (cancellationToken.IsCancellationRequested)
  {
  fs?.Close();//释放文件流
  result.IsSuccessful = false;
  result.Message = $"用户取消了上传文件。已完成【{lstSuccFiles.Count}/{filesPath.Count}】,耗时:{DateTime.Now - startTime}";
  }
  else
  {
  result.Message = $"文件上传发生异常({ex.GetType()}):{ex.Message}";
  }
 }
 finally
 {
  fs?.Dispose();
 }
 Console.WriteLine(result.Message);
 result.Tag = lstSuccFiles;
 //关闭通信、并返回结果
 return await channel?.ShutdownAsync().ContinueWith(t => result);
 }
}

  现在可以在客户端窗体内进行调用了:

private string GetFilePath()
{
 // Create OpenFileDialog 
 Microsoft.Win32.OpenFileDialog dlg = new Microsoft.Win32.OpenFileDialog();

 // Set filter for file extension and default file extension 
 dlg.Title = "选择文件";
 dlg.Filter = "所有文件(*.*)|*.*";
 dlg.FileName = "选择文件夹.";
 dlg.FilterIndex = 1;
 dlg.ValidateNames = false;
 dlg.CheckFileExists = false;
 dlg.CheckPathExists = true;
 dlg.Multiselect = false;//允许同时选择多个文件 

 // Display OpenFileDialog by calling ShowDialog method 
 Nullable<bool> result = dlg.ShowDialog();

 // Get the selected file name and display in a TextBox 
 if (result == true)
 {
 // Open document 
 return dlg.FileName;
 }

 return string.Empty;
}
// 打开文件
private void btnOpenUpload_Click(object sender, RoutedEventArgs e)
{
 lblUploadPath.Content = GetFilePath();
}
CancellationTokenSource uploadTokenSource;
//上传
private async void btnUpload_Click(object sender, RoutedEventArgs e)
{
 lblMessage.Content = string.Empty;

 uploadTokenSource = new CancellationTokenSource();
 List<string> fileNames = new List<string>();
 fileNames.Add(lblUploadPath.Content.ToString());
 var result = await ServerNet.FileTransfer.FileUpload(fileNames, "123", uploadTokenSource.Token);

 lblMessage.Content = result.Message;

 uploadTokenSource = null;
}
//取消上传
private void btnCancelUpload_Click(object sender, RoutedEventArgs e)
{
 uploadTokenSource?.Cancel();
}


//打开需要下载的文件
private void btnOpenDownload_Click(object sender, RoutedEventArgs e)
{
 txtDownloadPath.Text = GetFilePath();
}
//下载文件
private async void btnDownload_Click(object sender, RoutedEventArgs e)
{
 lblMessage.Content = string.Empty;

 downloadTokenSource = new CancellationTokenSource();
 List<string> fileNames = new List<string>();
 fileNames.Add(System.IO.Path.GetFileName(txtDownloadPath.Text));
 var result= await ServerNet.FileTransfer.FileDownload(fileNames, "123", Environment.CurrentDirectory, downloadTokenSource.Token);

 lblMessage.Content = result.Message;

 downloadTokenSource = null;
}
CancellationTokenSource downloadTokenSource;
//下载取消
private void btnCancelDownload_Click(object sender, RoutedEventArgs e)
{
 downloadTokenSource?.Cancel();
}

6、源代码

  https://files.cnblogs.com/files/pilgrim/GrpcTest.rar

总结

到此这篇关于C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能的文章就介绍到这了,更多相关c#文件传输内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

您可能感兴趣的文章:
  • golang在GRPC中设置client的超时时间
  • golang 微服务之gRPC与Protobuf的使用
  • golang grpc 负载均衡的方法
  • 详解golang consul-grpc 服务注册与发现
  • go grpc安装使用教程
  • go实现grpc四种数据流模式

com.google.protobuf.BlockingRpcChannel的实例源码

com.google.protobuf.BlockingRpcChannel的实例源码

项目:ditb    文件:SecureTestUtil.java   
/**
 * Grant permissions globally to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantGlobal(final hbasetestingutility util,final String user,final Permission.Action... actions) throws Exception {
  SecureTestUtil.updateACLs(util,new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
        try (Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
          AccessControlService.BlockingInterface protocol =
              AccessControlService.newBlockingStub(service);
          ProtobufUtil.grant(null,protocol,user,actions);
        }
      }
      return null;
    }
  });
}
项目:ditb    文件:SecureTestUtil.java   
/**
 * Revoke permissions globally from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeGlobal(final hbasetestingutility util,new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
        try (Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
          AccessControlService.BlockingInterface protocol =
              AccessControlService.newBlockingStub(service);
          ProtobufUtil.revoke(null,actions);
        }
      }
      return null;
    }
  });
}
项目:ditb    文件:SecureTestUtil.java   
/**
 * Grant permissions on a namespace to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnNamespace(final hbasetestingutility util,final String namespace,namespace,actions);
        }
      }
      return null;
    }
  });
}
项目:ditb    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a namespace from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromNamespace(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:ditb    文件:SecureTestUtil.java   
/**
 * Grant permissions on a table to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnTable(final hbasetestingutility util,final TableName table,final byte[] family,final byte[] qualifier,table,family,qualifier,actions);
        }
      }
      return null;
    }
  });
}
项目:ditb    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a table from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromTable(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:ditb    文件:SecureTestUtil.java   
public static void checkGlobalPerms(hbasetestingutility testUtil,Permission.Action... actions)
    throws IOException {
  Permission[] perms = new Permission[actions.length];
  for (int i = 0; i < actions.length; i++) {
    perms[i] = new Permission(actions[i]);
  }
  CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
  for (Action a : actions) {
    request.addPermission(AccessControlProtos.Permission.newBuilder()
        .setType(AccessControlProtos.Permission.Type.Global)
        .setGlobalPermission(
            AccessControlProtos.GlobalPermission.newBuilder()
                .addAction(ProtobufUtil.toPermissionAction(a)).build()));
  }
  try(Connection conn = ConnectionFactory.createConnection(testUtil.getConfiguration());
      Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
    BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
    AccessControlService.BlockingInterface protocol =
      AccessControlService.newBlockingStub(channel);
    try {
      protocol.checkPermissions(null,request.build());
    } catch (ServiceException se) {
      ProtobufUtil.toIOException(se);
    }
  }
}
项目:ditb    文件:TestAccessController.java   
@Test (timeout=180000)
public void testGlobalPermissionList() throws Exception {
  List<UserPermission> perms;
  Table acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
  try {
    BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
    AccessControlService.BlockingInterface protocol =
      AccessControlService.newBlockingStub(service);
    perms = ProtobufUtil.getUserPermissions(null,protocol);
  } finally {
    acl.close();
  }
  UserPermission adminPerm = new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),AccessControlLists.ACL_TABLE_NAME,null,Bytes.toBytes("ACRW"));
  assertTrue("Only global users and user admin has permission on table _acl_ per setup",perms.size() == 5 && hasFoundUserPermission(adminPerm,perms));
}
项目:ditb    文件:ConnectionManager.java   
@Override
// nothing is done w/ the 'master' parameter.  It is ignored.
public AdminService.BlockingInterface getAdmin(final ServerName serverName,final boolean master)
throws IOException {
  if (isDeadServer(serverName)) {
    throw new RegionServerStoppedException(serverName + " is dead.");
  }
  String key = getStubKey(AdminService.BlockingInterface.class.getName(),serverName.getHostname(),serverName.getPort(),this.hostnamesCanChange);
  this.connectionLock.putIfAbsent(key,key);
  AdminService.BlockingInterface stub = null;
  synchronized (this.connectionLock.get(key)) {
    stub = (AdminService.BlockingInterface)this.stubs.get(key);
    if (stub == null) {
      BlockingRpcChannel channel =
          this.rpcclient.createBlockingRpcChannel(serverName,rpcTimeout);
      stub = AdminService.newBlockingStub(channel);
      this.stubs.put(key,stub);
    }
  }
  return stub;
}
项目:ditb    文件:ConnectionManager.java   
@Override
public ClientService.BlockingInterface getClient(final ServerName sn)
throws IOException {
  if (isDeadServer(sn)) {
    throw new RegionServerStoppedException(sn + " is dead.");
  }
  String key = getStubKey(ClientService.BlockingInterface.class.getName(),sn.getHostname(),sn.getPort(),key);
  ClientService.BlockingInterface stub = null;
  synchronized (this.connectionLock.get(key)) {
    stub = (ClientService.BlockingInterface)this.stubs.get(key);
    if (stub == null) {
      BlockingRpcChannel channel =
          this.rpcclient.createBlockingRpcChannel(sn,rpcTimeout);
      stub = ClientService.newBlockingStub(channel);
      // In old days,after getting stub/proxy,we'd make a call.  We are not doing that here.
      // Just fail on first actual call rather than in here on setup.
      this.stubs.put(key,stub);
    }
  }
  return stub;
}
项目:pbase    文件:SecureTestUtil.java   
/**
 * Grant permissions globally to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantGlobal(final hbasetestingutility util,new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
        try (Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
          AccessControlService.BlockingInterface protocol =
              AccessControlService.newBlockingStub(service);
          ProtobufUtil.grant(protocol,actions);
        }
      }
      return null;
    }
  });
}
项目:pbase    文件:SecureTestUtil.java   
/**
 * Revoke permissions globally from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeGlobal(final hbasetestingutility util,new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
        try (Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
          AccessControlService.BlockingInterface protocol =
              AccessControlService.newBlockingStub(service);
          ProtobufUtil.revoke(protocol,actions);
        }
      }
      return null;
    }
  });
}
项目:pbase    文件:SecureTestUtil.java   
/**
 * Grant permissions on a namespace to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnNamespace(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:pbase    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a namespace from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromNamespace(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:pbase    文件:SecureTestUtil.java   
/**
 * Grant permissions on a table to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnTable(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:pbase    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a table from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromTable(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:pbase    文件:TestAccessController.java   
@Test
public void testGlobalPermissionList() throws Exception {
  List<UserPermission> perms;
  Table acl = new HTable(conf,AccessControlLists.ACL_TABLE_NAME);
  try {
    BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
    AccessControlService.BlockingInterface protocol =
      AccessControlService.newBlockingStub(service);
    perms = ProtobufUtil.getUserPermissions(protocol);
  } finally {
    acl.close();
  }
  UserPermission adminPerm = new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),Bytes.toBytes("ACRW"));
  assertTrue("Only user admin has permission on table _acl_ per setup",perms.size() == 1 && hasFoundUserPermission(adminPerm,perms));
}
项目:pbase    文件:ConnectionManager.java   
@Override
// nothing is done w/ the 'master' parameter.  It is ignored.
public AdminService.BlockingInterface getAdmin(final ServerName serverName,final boolean master)
        throws IOException {
    if (isDeadServer(serverName)) {
        throw new RegionServerStoppedException(serverName + " is dead.");
    }
    String key = getStubKey(AdminService.BlockingInterface.class.getName(),serverName.getHostAndPort());
    this.connectionLock.putIfAbsent(key,key);
    AdminService.BlockingInterface stub = null;
    synchronized (this.connectionLock.get(key)) {
        stub = (AdminService.BlockingInterface) this.stubs.get(key);
        if (stub == null) {
            BlockingRpcChannel channel =
                    this.rpcclient.createBlockingRpcChannel(serverName,rpcTimeout);
            stub = AdminService.newBlockingStub(channel);
            this.stubs.put(key,stub);
        }
    }
    return stub;
}
项目:pbase    文件:ConnectionManager.java   
@Override
public ClientService.BlockingInterface getClient(final ServerName sn)
        throws IOException {
    if (isDeadServer(sn)) {
        throw new RegionServerStoppedException(sn + " is dead.");
    }
    String key = getStubKey(ClientService.BlockingInterface.class.getName(),sn.getHostAndPort());
    this.connectionLock.putIfAbsent(key,key);
    ClientService.BlockingInterface stub = null;
    synchronized (this.connectionLock.get(key)) {
        stub = (ClientService.BlockingInterface) this.stubs.get(key);
        if (stub == null) {
            BlockingRpcChannel channel =
                    this.rpcclient.createBlockingRpcChannel(sn,rpcTimeout);
            stub = ClientService.newBlockingStub(channel);
            // In old days,we'd make a call.  We are not doing that here.
            // Just fail on first actual call rather than in here on setup.
            this.stubs.put(key,stub);
        }
    }
    return stub;
}
项目:HIndex    文件:SecureTestUtil.java   
/**
 * Grant permissions globally to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantGlobal(final hbasetestingutility util,new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      HTable acl = new HTable(util.getConfiguration(),AccessControlLists.ACL_TABLE_NAME);
      try {
        BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
        AccessControlService.BlockingInterface protocol =
            AccessControlService.newBlockingStub(service);
        ProtobufUtil.grant(protocol,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}
项目:HIndex    文件:SecureTestUtil.java   
/**
 * Revoke permissions globally from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeGlobal(final hbasetestingutility util,AccessControlLists.ACL_TABLE_NAME);
      try {
        BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
        AccessControlService.BlockingInterface protocol =
            AccessControlService.newBlockingStub(service);
        ProtobufUtil.revoke(protocol,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}
项目:HIndex    文件:SecureTestUtil.java   
/**
 * Grant permissions on a namespace to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnNamespace(final hbasetestingutility util,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}
项目:HIndex    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a namespace from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromNamespace(final hbasetestingutility util,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}
项目:HIndex    文件:SecureTestUtil.java   
/**
 * Grant permissions on a table to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnTable(final hbasetestingutility util,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}
项目:HIndex    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a table from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromTable(final hbasetestingutility util,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}
项目:HIndex    文件:TestAccessController.java   
@Test
public void testGlobalPermissionList() throws Exception {
  List<UserPermission> perms;
  HTable acl = new HTable(conf,perms));
}
项目:HIndex    文件:HConnectionManager.java   
@Override
// nothing is done w/ the 'master' parameter.  It is ignored.
public AdminService.BlockingInterface getAdmin(final ServerName serverName,serverName.getHostAndPort());
  this.connectionLock.putIfAbsent(key,key);
  AdminService.BlockingInterface stub = null;
  synchronized (this.connectionLock.get(key)) {
    stub = (AdminService.BlockingInterface)this.stubs.get(key);
    if (stub == null) {
      BlockingRpcChannel channel = this.rpcclient.createBlockingRpcChannel(serverName,this.rpcTimeout);
      stub = AdminService.newBlockingStub(channel);
      this.stubs.put(key,stub);
    }
  }
  return stub;
}
项目:HIndex    文件:HConnectionManager.java   
@Override
public ClientService.BlockingInterface getClient(final ServerName sn)
throws IOException {
  if (isDeadServer(sn)) {
    throw new RegionServerStoppedException(sn + " is dead.");
  }
  String key = getStubKey(ClientService.BlockingInterface.class.getName(),sn.getHostAndPort());
  this.connectionLock.putIfAbsent(key,key);
  ClientService.BlockingInterface stub = null;
  synchronized (this.connectionLock.get(key)) {
    stub = (ClientService.BlockingInterface)this.stubs.get(key);
    if (stub == null) {
      BlockingRpcChannel channel = this.rpcclient.createBlockingRpcChannel(sn,this.rpcTimeout);
      stub = ClientService.newBlockingStub(channel);
      // In old days,stub);
    }
  }
  return stub;
}
项目:hbase    文件:SecureTestUtil.java   
/**
 * Grant permissions globally to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantGlobal(final hbasetestingutility util,new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
        try (Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
          AccessControlService.BlockingInterface protocol =
              AccessControlService.newBlockingStub(service);
          AccessControlUtil.grant(null,false,actions);
        }
      }
      return null;
    }
  });
}
项目:hbase    文件:SecureTestUtil.java   
/**
 * Revoke permissions globally from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeGlobal(final hbasetestingutility util,new Callable<Void>() {
    @Override
    public Void call() throws Exception {
      try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
        try (Table acl = connection.getTable(AccessControlLists.ACL_TABLE_NAME)) {
          BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
          AccessControlService.BlockingInterface protocol =
              AccessControlService.newBlockingStub(service);
          AccessControlUtil.revoke(null,actions);
        }
      }
      return null;
    }
  });
}
项目:hbase    文件:SecureTestUtil.java   
/**
 * Grant permissions on a namespace to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnNamespace(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:hbase    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a namespace from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromNamespace(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:hbase    文件:SecureTestUtil.java   
/**
 * Grant permissions on a table to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantOnTable(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:hbase    文件:SecureTestUtil.java   
/**
 * Revoke permissions on a table from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeFromTable(final hbasetestingutility util,actions);
        }
      }
      return null;
    }
  });
}
项目:hbase    文件:SecureTestUtil.java   
public static void checkGlobalPerms(hbasetestingutility testUtil,Permission.Action... actions)
    throws IOException {
  Permission[] perms = new Permission[actions.length];
  for (int i = 0; i < actions.length; i++) {
    perms[i] = new Permission(actions[i]);
  }
  CheckPermissionsRequest.Builder request = CheckPermissionsRequest.newBuilder();
  for (Action a : actions) {
    request.addPermission(AccessControlProtos.Permission.newBuilder()
        .setType(AccessControlProtos.Permission.Type.Global)
        .setGlobalPermission(
            AccessControlProtos.GlobalPermission.newBuilder()
                .addAction(AccessControlUtil.toPermissionAction(a)).build()));
  }
  try(Connection conn = ConnectionFactory.createConnection(testUtil.getConfiguration());
      Table acl = conn.getTable(AccessControlLists.ACL_TABLE_NAME)) {
    BlockingRpcChannel channel = acl.coprocessorService(new byte[0]);
    AccessControlService.BlockingInterface protocol =
      AccessControlService.newBlockingStub(channel);
    try {
      protocol.checkPermissions(null,request.build());
    } catch (ServiceException se) {
      ProtobufUtil.toIOException(se);
    }
  }
}
项目:hbase    文件:TestAccessController.java   
@Test (timeout=180000)
public void testGlobalPermissionList() throws Exception {
  List<UserPermission> perms;
  Table acl = systemUserConnection.getTable(AccessControlLists.ACL_TABLE_NAME);
  try {
    BlockingRpcChannel service = acl.coprocessorService(HConstants.EMPTY_START_ROW);
    AccessControlService.BlockingInterface protocol =
      AccessControlService.newBlockingStub(service);
    perms = AccessControlUtil.getUserPermissions(null,protocol);
  } finally {
    acl.close();
  }

  Collection<String> superUsers = Superusers.getSuperUsers();
  List<UserPermission> adminPerms = new ArrayList<>(superUsers.size() + 1);
  adminPerms.add(new UserPermission(Bytes.toBytes(USER_ADMIN.getShortName()),Bytes.toBytes("ACRW")));

  for(String user: superUsers) {
    adminPerms.add(new UserPermission(Bytes.toBytes(user),Action.values()));
  }
  assertTrue("Only super users,global users and user admin has permission on table hbase:acl " +
      "per setup",perms.size() == 5 + superUsers.size() &&
      hasFoundUserPermission(adminPerms,perms));
}
项目:incubator-tajo    文件:BlockingRpcclient.java   
BlockingRpcclient(final Class<?> protocol,final InetSocketAddress addr,ClientSocketChannelFactory factory)
    throws Exception {

  this.protocol = protocol;
  String serviceClassName = protocol.getName() + "$"
      + protocol.getSimpleName() + "Service";
  Class<?> serviceClass = Class.forName(serviceClassName);
  stubMethod = serviceClass.getmethod("newBlockingStub",BlockingRpcChannel.class);

  this.handler = new ClientChannelUpstreamHandler();
  pipeFactory = new ProtoPipelineFactory(handler,RpcResponse.getDefaultInstance());
  super.init(addr,pipeFactory,factory);
  rpcChannel = new ProxyRpcChannel();

  this.key = new RpcConnectionKey(addr,false);
}
项目:tajo-cdh    文件:BlockingRpcclient.java   
BlockingRpcclient(final Class<?> protocol,false);
}
项目:PyroDB    文件:SecureTestUtil.java   
/**
 * Grant permissions globally to the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void grantGlobal(final hbasetestingutility util,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}
项目:PyroDB    文件:SecureTestUtil.java   
/**
 * Revoke permissions globally from the given user. Will wait until all active
 * AccessController instances have updated their permissions caches or will
 * throw an exception upon timeout (10 seconds).
 */
public static void revokeGlobal(final hbasetestingutility util,actions);
      } finally {
        acl.close();
      }
      return null;
    }
  });
}

com.google.protobuf.DescriptorProtos.EnumOptions的实例源码

com.google.protobuf.DescriptorProtos.EnumOptions的实例源码

项目:api-compiler    文件:DescriptorGenerator.java   
private EnumOptions generateEnumOptions(Enum e) {
  EnumOptions.Builder builder = EnumOptions.newBuilder();
  setoptions(builder,e.getoptionsList(),ENUM_OPTION_NAME_PREFIX);
  return builder.build();
}
项目:play-store-api    文件:Descriptors.java   
/** Get the {@code EnumOptions},defined in {@code descriptor.proto}. */
public EnumOptions getoptions() { return proto.getoptions(); }
项目:Beam    文件:Descriptors.java   
/**
 * Get the {@code EnumOptions},defined in {@code descriptor.proto}.
 */
public EnumOptions getoptions () {
    return proto.getoptions ();
}
项目:protobuf-el    文件:Scopes.java   
public EnumOptions.Builder getEnumOptions() {
  return currentScope.getEnumOptions();
}
项目:protobuf-el    文件:Scopes.java   
protected EnumOptions.Builder getEnumOptions() {
  throw new RuntimeException(NOT_APPLICABLE_IN_CURRENT_ScopE);
}
项目:protobuf-el    文件:Scopes.java   
@Override
protected EnumOptions.Builder getEnumOptions() {
  return protoBuilder.getoptionsBuilder();
}

今天关于基于 Google Protobuff 和 Mina 的 RPCgoogle protobuffer的介绍到此结束,谢谢您的阅读,有关C Google Protocol Buffers. RPC怎么样?、C#语言使用gRPC、protobuf(Google Protocol Buffers)实现文件传输功能、com.google.protobuf.BlockingRpcChannel的实例源码、com.google.protobuf.DescriptorProtos.EnumOptions的实例源码等更多相关知识的信息可以在本站进行查询。

本文标签: