GVKun编程网logo

Windows RPC RpcEpResolveBinding失败(rpc/encoded wsdls not support)

6

以上就是给各位分享WindowsRPCRpcEpResolveBinding失败,其中也会对rpc/encodedwsdlsnotsupport进行解释,同时本文还将给你拓展akka-rpc(基于ak

以上就是给各位分享Windows RPC RpcEpResolveBinding失败,其中也会对rpc/encoded wsdls not support进行解释,同时本文还将给你拓展akka-rpc(基于 akka 的 rpc 实现)、babel: yet another rpc, but far beyond rpc(上)、babel: yet another rpc, but far beyond rpc(下)、babel: yet another rpc, but far beyond rpc(中)等相关知识,如果能碰巧解决你现在面临的问题,别忘了关注本站,现在开始吧!

本文目录一览:

Windows RPC RpcEpResolveBinding失败(rpc/encoded wsdls not support)

Windows RPC RpcEpResolveBinding失败(rpc/encoded wsdls not support)

如何解决Windows RPC RpcEpResolveBinding失败?

 RpcEpResolveBinding call fails with 1753 error. 
 1753. There are no more endpoints available from the endpoint mapper.

RpcStringBindingCompose

RpcBindingFromStringBinding 调用顺利进行,随后对RpcEpResolveBinding的调用失败。 询问有关我可以检查的所有内容的建议。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

akka-rpc(基于 akka 的 rpc 实现)

akka-rpc(基于 akka 的 rpc 实现)

akka-rpc(基于 akka 的 rpc 的实现)


代码:http://git.oschina.net/for-1988/Simples


目前的工作在基于 akka(java)实现数据服务总线,Akka 2.3 中提供了 Cluster Sharing (分片集群) 和 Persistence 功能可以很简单的写出一个大型的分布式集群的架构。里面的一块功能就是 RPC(远程过程调用)。

RPC

远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用,例:Java RMI。

实现原理

整个 RPC 的调用过程完全基于 akka 来传递对象,因为需要进行网络通信,所以我们的接口实现类、调用参数以及返回值都需要实现 java 序列化接口。客户端跟服务端其实都是在一个 Akka 集群关系中,Client 跟 Server 都是集群中的一个节点。首先 Client 需要初始化 RpcClient 对象,在初始化的过程中,我们启动了 AkkaSystem,加入到整个集群中,并创建了负责与 Server 进行通信的 Actor。然后通过 RpcClient 中的 getBean (Class<T> clz) 方法获取 Server 端的接口实现类的实例对象,然后通过动态代理拦截这个对象的所有方法。最后,在执行方法的时候,在 RpcBeanProxy 中向 Server 发送 CallMethod 事件,执行远程实现类的方法,获取返回值给 Client。

Server 端核心代码

public class RpcServer extends UntypedActor {
         private Map<String, Object> proxyBeans;

	public RpcServer(Map<Class<?>, Object> beans) {
		proxyBeans = new HashMap<String, Object>();
		for (Iterator<Class<?>> iterator = beans.keySet().iterator(); iterator
				.hasNext();) {
			Class<?> inface = iterator.next();
			proxyBeans.put(inface.getName(), beans.get(inface));
		}
	}

	@Override
	public void onReceive(Object message) throws Exception {
		if (message instanceof RpcEvent.CallBean) {   //返回Server端的接口实现类的实例
			CallBean event = (CallBean) message;
			ReturnBean bean = new ReturnBean(
					proxyBeans.get(event.getBeanName()), getSelf());
			getSender().tell(bean, getSelf());
		} else if (message instanceof RpcEvent.CallMethod) {
			CallMethod event = (CallMethod) message;
			Object bean = proxyBeans.get(event.getBeanName());
			Object[] params = event.getParams();
			List<Class<?>> paraTypes = new ArrayList<Class<?>>();
			Class<?>[] paramerTypes = new Class<?>[] {};
			if (params != null) {
				for (Object param : params) {
					paraTypes.add(param.getClass());
				}
			}
			Method method = bean.getClass().getMethod(event.getMethodName(),
					paraTypes.toArray(paramerTypes));
			Object o = method.invoke(bean, params);
			getSender().tell(o, getSelf());
		}
	}

}

启动 Server

public static void main(String[] args) {
		final Config config = ConfigFactory
				.parseString("akka.remote.netty.tcp.port=" + 2551)
				.withFallback(
						ConfigFactory
								.parseString("akka.cluster.roles = [RpcServer]"))
				.withFallback(ConfigFactory.load());

		ActorSystem system = ActorSystem.create("EsbSystem", config);
		
		// Server 加入发布的服务
		Map<Class<?>, Object> beans = new HashMap<Class<?>, Object>();
		beans.put(ExampleInterface.classnew ExampleInterfaceImpl());
		system.actorOf(Props.create(RpcServer.classbeans)"rpcServer");
	}

Client 端核心代码 

RpcClient 类型集成了 Thread,为了解决一个问题:因为 AkkaSystem 在加入集群中的时候是异步的,所以我们在第一次 new RpcClient 对象的时候需要等待加入集群成功以后,才可以执行下面的方法,不然获取的 /user/rpcServer Route 中没有 Server 的 Actor,请求会失败。

public class RpcClient extends Thread {

	private ActorSystem system;

	private ActorRef rpc;

	private ActorRef clientServer;

	private static RpcClient instance = null;

	public RpcClient() {
		this.start();
		final Config config = ConfigFactory
				.parseString("akka.remote.netty.tcp.port=" + 2552)
				.withFallback(
						ConfigFactory
								.parseString("akka.cluster.roles = [RpcClient]"))
				.withFallback(ConfigFactory.load());
		system = ActorSystem.create("EsbSystem", config);

		int totalInstances = 100;
		Iterable<String> routeesPaths = Arrays.asList("/user/rpcServer");
		boolean allowLocalRoutees = false;
		ClusterRouterGroup clusterRouterGroup = new ClusterRouterGroup(
				new AdaptiveLoadBalancingGroup(
						HeapMetricsSelector.getInstance(),
						Collections.<String> emptyList()),
				new ClusterRouterGroupSettings(totalInstances, routeesPaths,
						allowLocalRoutees, "RpcServer"));
		rpc = system.actorOf(clusterRouterGroup.props(), "rpcCall");
		clientServer = system.actorOf(Props.create(RpcClientServer.class, rpc),
				"client");
		Cluster.get(system).registerOnMemberUp(new Runnable() {  //加入集群成功后的回调事件,恢复当前线程的中断
			@Override
			public void run() {
				synchronized (instance) {
					System.out.println("notify");
					instance.notify();
				}
			}
		});

	}

	public static RpcClient getInstance() {
		if (instance == null) {
			instance = new RpcClient();
			synchronized (instance) {
				try {   //中断当前线程,等待加入集群成功后,恢复
					System.out.println("wait");
					instance.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		return instance;
	}

	public <T> T getBean(Class<T> clz) {
		Future<Object> future = Patterns.ask(clientServer,
				new RpcEvent.CallBean(clz.getName(), clientServer),
				new Timeout(Duration.create(5, TimeUnit.SECONDS)));
		try {
			Object o = Await.result(future,
					Duration.create(5, TimeUnit.SECONDS));
			if (o != null) {
				ReturnBean returnBean = (ReturnBean) o;
				return (T) new RpcBeanProxy().proxy(returnBean.getObj(),
						clientServer, clz);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return null;
	}
}

RpcClientServer

public class RpcClientServer extends UntypedActor {

	private ActorRef rpc;

	public RpcClientServer(ActorRef rpc) {
		this.rpc = rpc;
	}

	@Override
	public void onReceive(Object message) throws Exception {
		if (message instanceof RpcEvent.CallBean) {  //向Server发送CallBean请求
			CallBean event = (CallBean) message;
			Future<Object> future = Patterns.ask(rpc, event, new Timeout(
					Duration.create(5, TimeUnit.SECONDS)));
			Object o = Await.result(future,
					Duration.create(5, TimeUnit.SECONDS));
			getSender().tell(o, getSelf());
		} else if (message instanceof RpcEvent.CallMethod) {  //向Server发送方法调用请求
			Future<Object> future = Patterns.ask(rpc, message, new Timeout(
					Duration.create(5, TimeUnit.SECONDS)));
			Object o = Await.result(future,
					Duration.create(5, TimeUnit.SECONDS));
			getSender().tell(o, getSelf());
		}
	}
}

RpcBeanProxy,客户端的动态代理类

public class RpcBeanProxy implements InvocationHandler {

	private ActorRef rpcClientServer;

	private Class<?> clz;

	public Object proxy(Object target, ActorRef rpcClientServer, Class<?> clz) {
		this.rpcClientServer = rpcClientServer;
		this.clz = clz;
		return Proxy.newProxyInstance(target.getClass().getClassLoader(),
				target.getClass().getInterfaces(), this);
	}

	@Override
	public Object invoke(Object proxy, Method method, Object[] args)
			throws Throwable {
		Object result = null;
		RpcEvent.CallMethod callMethod = new RpcEvent.CallMethod(
				method.getName(), args, clz.getName());
		Future<Object> future = Patterns.ask(rpcClientServer, callMethod,
				new Timeout(Duration.create(5, TimeUnit.SECONDS)));
		Object o = Await.result(future, Duration.create(5, TimeUnit.SECONDS));
		result = o;
		return result;
	}

}

Demo

Interface,Client 和 Server 都需要这个类,必须实现序列化

public interface ExampleInterface extends Serializable{
	public String sayHello(String name);
}

实现类,只需要 Server 端存在这个类。

public class ExampleInterfaceImpl implements ExampleInterface {
	@Override
	public String sayHello(String name) {
		System.out.println("Be Called !");
		return "Hello " + name;
	}
}

Client 调用

public static void main(String[] args) {
		RpcClient client = RpcClient.getInstance();
		long start = System.currentTimeMillis();
		
		ExampleInterface example = client.getBean(ExampleInterface.class);
		System.out.println(example.sayHello("rpc"));
		
		long time = System.currentTimeMillis() - start;
		System.out.println("time :" + time);
	}


这里第一次调用耗时比较长需要 46 毫秒,akka 会对消息进行优化,调用多次以后时间为 1~2 毫秒。

目前还没来得及做性能测试,后面会补充。

babel: yet another rpc, but far beyond rpc(上)

babel: yet another rpc, but far beyond rpc(上)

babel

这几年工作下来,越来越体会到communication的重要性,无论是从技术层面还是工程组织层面。在这方面的投入,对工作效率、产品质量等等方面都会有很好的加强。

尤其是我们岂安的技术团队从开始就处于人力匮乏的状态,又同时要覆盖SAAS和私有化大数据解决方案两条迥然不同的研发线路,矛盾比较突出。所以,从开始的时候就想着去建立一套自己的通讯基础设施,来满足不同业务场景下的各种需求,于是催生了这朵RPC领域的奇葩。

冠名以babel,纯属个人在初始阶段的勃勃雄心,希望能尽可能的消弭不同系统组件间的通讯细节,让我们这帮三流的程序员也能做出二流的系统:

  1. 要解决python和java的问题,各语言对等。我们的系统有大量跨语言的交互,需要尽量减少通讯开销。这个是RPC的基本功。

  2. 要能简单的管理和监控。这个是RPC良好运维,进化到service层面所需要的。

  3. 最好还能兼顾离线数据分析过程中的数据组织功能。以数据为核心的公司这点其实很重要。

  4. 可以兼顾大规模集群环境和小集群环境(甚至是单机)。

  5. 要解决跨数据中心的问题。风控SAAS有SLA方面的变态需求,需要部署到客户最后100米处,这也是刚需。

  6. 不光是通信,还要能做好持久化、负载均衡、动态伸缩等等其他功能,这个是衡量分布式系统成功与否的重要指标。

  7. 轻。

本文后续部分会先在技术背景方面闲聊一会,讲讲为啥要造轮子;然后会介绍一下我们这个简单又独特的RPC的实现部分,以及在我们岂安公司的实际例子;最后再探讨一下基于技术手段来影响整个工程组织方式。

RPC漫谈

rpc这个领域其实已经是历经好几十年的发展了:

  1. 最早是corba,用来解决跨语言跨平台的数据交互,新一点有ICE,算是更加完整的框架;

  2. 也有平台、语言相关的RPC,比如DCOM,RMI等等;

  3. 十多年前web service为代表的SOA(service本质上是更规范化的RPC)大火,有一整套以xml为基础的框架,伴随着企业应用的蓬勃发展,越来越重。学术界也做了很多脱离实际的工作;

  4. 几年前开始回归简单,有protocol buffer和thrift,虽然两者本质上是序列化工具,但也提供了简单的rpc实现,实际上有很多系统是基于thrift框架来实现;另外restful风格也开始逐渐盛行;

  5. 最近由于分布式系统成为越来越明显的特征,多年前SOA方面的很多内容渐渐开始有实际价值,一番改造后再度开始登场,微服务现在是业界的热点,也必将成为今后的标准;

  6. 国内外很多大型互联网公司也开始作自己的SOA平台,之前在携程的老板就建了一套面向.NET和java的现代化的SOA框架。开源方面,netflix走在前面,阿里的dubbo也应该是国内的翘楚。

异步持久化通信

需要先澄清的是,抛开SOA,微服务,web service,rpc等等各种名词,本质上其实是在讲通信,只是表层的包装方式不同。

而通讯有两个维度可以切割:同步异步,持久化。

  1. 同步非持久化通讯:传统的rpc属于这个范畴,client和server两方必须同时存在,一问一答。这种是最普遍的形式,尤其在前端。

  2. 异步持久化通讯:传统的mq属于这个范畴,server不需要保持在线,client发了就跑,由mq系统作中间缓存。mq的语义其实系统后端是更典型的通讯方式。

  3. 异步非持久化通讯:这种没有什么意义。

  4. 同步持久化通讯:这种目前市面上还比较少,但也是有意义的,会是很好的补充。

我们对RPC的需求实际上需要涵盖1、2、4三个方面。一般的做法是使用独立的rpc和mq系统,再定制开发。最终,我们选择基于现有系统改一个轮子。

自带路由和位置透明的通信

通信很重要的一点是参与对象的定位,现有的从简单到复杂有不同的模式:

  1. 写死对端ip地址。开发最简单,后期运维是噩梦;

  2. 通过配置系统来配置ip。需要开发相应的配置服务,可以集中管理,但仍需要手动管理位置信息;

  3. 自动注册和发现。类似web service中UDDI的概念,现在也有很多开源组件,需要集成;

  4. 上述三点是传统rpc的定位方式,mq的定位方式更加灵活。

我们的角度,希望是有如下的通讯模型:

  • 由中央的RPC框架统一管控消息的路由、持久化。

  • 每个消息的生产者和接收者只需要配置虚拟名称,实现与ip地址的完全解耦。

  • 支持消息的对话和群发功能,但不需要消息发送端作多余的工作,由RPC框架来支撑。

  • RPC框架类似一个邮局的功能,每个实体只需要打通跟其临近的节点(邮箱)就可以通讯。所以从运维的角度,只需要开放一个单独的端口,每台应用服务器(生产者)也只需要绑定一台框架服务器(邮箱地址)。

通讯框架

只有实现这样的通讯框架,我们才能做到:

  1. 所有节点对等,不需要额外的配置,方便部署和迁移。

  2. 所有节点只需要和框架节点通信,大大降低网络复杂度,并提升安全性。

  3. 所有的通讯都集中起来,可以统一管理。

实际上,我们是借助现有的mq开源软件来帮助我们实现这一点。

没有灵魂的SOA

RPC,在规范化到一定程度后,可以成为service,这个是在中大型企业的常规武器。
而service,有几个组件必不可少,借用web service的术语:

  • UDDI:统一的注册中心

  • SOAP:消息交换载体,作为网络通信的序列化和反序列化支撑

  • WSDL:服务的规范化描述

其实,这几个本质上是以XML为基础,包括后续的各种功能特性,以及学术界叠加的什么semantic web等等。

XML本身实际上是整个SOA的灵魂,它才是建立babel tower的基石。然而,由于工程化方面的困难,大家往往绕过。比如,消息采用json/pb等方式,其实完全是两个层次的玩意,这就导致现有的各种SOA系统都是没有魂的系统,只能起到通信承载的功能。

笔者之前两家公司所遇到的就是两个很好的典型:

第一家公司,个人一直评价为好体系配上烂技术。公司有一套C++的RPC框架,框架本身比较普通。我最诟病的是没有注册中心,所以配置上会比较繁琐僵硬。
但是,他在消息内容的定义上颇下功夫,所有消息全部由统一的唯一的xml定义,每个字段都有背后指定的含义,这个在跨部门跨系统的交流上起着至关重要的作用。
这一套虽然与技术无关,但可以看出其作为数据驱动公司的底蕴,估计还是由数据部门驱动的原因。当然,由于底层技术支撑不够,所以整套执行到后来良定义的消息也只有很少人才能明确,失去了原有的意义。

第二家公司,有一套现代化的SOA系统,仅从技术层面去讨论,没什么可挑剔的。这套系统有注册中心,有自己的序列化框架,有服务描述和代码生成等功能,算是完备,但也仅限于技术层面的完备。
因为是技术人员推动的系统,所以对业务的渗透不够,虽然有了一套底层支撑,但是整个消息的定义是没有很好的定义的(非xml这种语义型的定义),所以虽然在技术上取得了成功,但是对业务的推进没法起到决定性的作用。

两个例子正好相反,可以看出两家公司驱动力的不同,以及管理与技术之间可能的微妙关系。

之所以写这一段,是因为之前也提到XML才是babel tower的基石,但其实基本没人做到,我们也做不到。但由于我们厚着脸皮打了babel的名号,所以还是需要澄清一下,甚至,我们走的更加极端——干脆连序列化消息的schema定义也扔掉了,服务定义和消息组织全部用松散的json,一方面是出于减少开销(仅适合我们当前这种微型团队的场景),另一方面,也是没想好怎么玩,所以只是留下升级的空间,暂不触及。

所以说,我们的RPC也是失了魂的,仅仅是对通信的实现和封装上提供一些简要的支撑,覆盖的是通信语义,业务语义上没有涉及。

反爬虫
文章来源:http://bigsec.com/

babel: yet another rpc, but far beyond rpc(下)

babel: yet another rpc, but far beyond rpc(下)

bigsec

(图片源自网络)

4框架生态

实际上,在做babel的同时,我也在探索如何更好的利用技术工具来影响团队组织架构。以babel举例,实际上整个框架生态分为三类人:

业务研发。

在框架上提供服务,或调用他人的服务。由于绝大部分的通讯细节已经封装好。业务研发可以更加专注于他的业务方面的逻辑。

框架研发。

研发babel通讯框架,以及其支撑的其他框架,比如监控报警等。框架的研发更多的关注与系统底层,比如稳定性、性能、各个service的数据积压等。

架构师。

这个是最重要的角色。如果说整个公司的系统就是一张图,那么框架研发就提供了纸和笔——业务研发提供了一个一个点,但是是孤立的,架构师则可以以点连线,完成整张图。
在这里,架构师需要关注很多整体上的指标和大局,比如谁和谁连,实例数多少,是否持久化,等等(babel service的schema由架构师决定)。可以这么说,babel给架构师提供了一个可以去描绘大系统框架的技术手段,从而避免了长期空对空的局面。现实中,见过好多不会写代码的架构师,主要原因就是缺乏这类供架构师使用的工具。

在公司内部,从一开始我们就做类似的划分。babel不仅仅是用来做系统组件间的解耦;同时也是不同角色人的解耦工具。

5未来的脚步

从个人的角度看,babel目前也才堪堪能用,只做到了30%的完成度,要成为一个完整和成熟的系统,还有很多路要走——

  • 在现有的基础上尝试做workflow功能。babel重合了部分storm的功能,希望能做更多的覆盖。

  • 着手central config的开发。目前service的配置与分发还不能做到自动化,有手工工作量。长远做集中的配置管理分发是必须的。

  • 完成zeromq的后端实现。对于低延迟、本地应用来说是必须的。

  • 完成自动化发布、部署,将整个系统做一个统一的整体。

  • 研究弹性伸缩方面的非功能性扩展。

  • 考虑增加熔断等自我保护机制。

babel: yet another rpc, but far beyond rpc(中)

babel: yet another rpc, but far beyond rpc(中)

bigsec

(图片源自网络)

2 架构描述

简单架构

从之前的描述,已经可以看出我们会采用RPC over MQ的方式做底层实现,类似方法调用的通信语义会在client和server两端的库中作封装。

bigsec

从后端实现来说,我们用三套后端来满足不同的场景:

1、对大中型分布式系统环境,rabbitmq是非常非常好的支撑。本来以为需要自己做很多工作,但深入了解rabbitmq,尤其是其支持的amqp协议,发现其实前人在很多思路方面已经栽好树了,比如一致性hash和跨机房等功能,都有相应的插件支撑。
所以,rabbitmq成为babel的第一选择,可以实现我们规划功能的全集,我们的SAAS平台都是使用的rabbitmq。

2、对少量机器而言,redis提供了非常轻量级的队列支持,可以提供有限但必要的功能。
redis没有类似amqp这样的协议,需要手动作些封装。我们在单机环境使用redis,尽可能减少部署和运维的开销。

3、对性能有苛刻要求的可以用zeromq后端去做tcp直连。前两种mq的方式毕竟会多几跳中转,但在路由的灵活性和通讯语义的提供更丰富的选择,而且在大数据量的处理上,吞吐量和平均延时并不会比直连差很多。
但为了满足特殊环境的需要,我们预留了zeromq的实现选择,最近由于新的需求,正在准备完成这块拼图。zeromq的缺点在于需要中央配置系统来帮忙完成路由功能。

每种后台实现对使用者透明,可以通过配置进行透明切换,但是有些高级通信语义redis和zeromq不支持。

如果对应到web service 三要素:

  • UDDI:传统的rpc或者SOA都是去注册中心发现远端对象,然后客户端主动推送数据到服务端。mq的方式帮我们省却了自注册(订阅实现)和服务发现(mq自己路由)的问题。

  • WSDL:目前我们通过json的方式来描述rpc的service端,包括机房所在地,持久化,超时等等。

  • SOAP:目前使用json的方式,我们定义了一个统一的Event对象来封装一些固定属性,其他都在一个map中。由业务代码自己去打包拆包。当然这种方式在大团队中不适合。

通讯语义封装

大量的工作可以利用mq来实现,我们的工作主要体现在通讯语义的封装。

❶ client端访问模式语义

  • queue语义(消息有去无回):传统的数据输送。

  • 简单rpc(消息一去一回):传统的rpc和soa都适用于此场景。

  • 轮询rpc(消息一去多回):一个request出去,多个response回来,适合于轮询下游节点的场景。

  • 分布式存储rpc(一个request消息,只要有最小条件的response消息就返回):适合于分布式场景下的读写。例如三个拷贝,需要至少两份读成功或者至少两份写成功,等等。目前此方式我们还没有用到。

❷ 消息分发语义(实际上这里的行为参考了storm的部分功能)

  • Shuffle:一个消息,会有多个接收者,这些接收者根据自己的资源情况去抢占同一来源的消息,达到load balance的目的。实际上我们通过shuffle来做集群功能,省掉了LB的引入。而且性能强的拉多点,性能弱的拉少点,变相的实现了根据消费者的性能来做分发。

  • Sharding:与shuffle类似,也是多个consumer来分享消息,不过根据消息的key,保证在拓扑环境不发生改变的情况下,同一个key始终指向同一个消费者,为后续分布式系统的搭建打下基础

  • topic语义:所有消费者都会得到消息的一个拷贝。常见的mq语义

  • topic+shffule:一组消费者作为一个整体来订阅topic,得到所有的消息,每个订阅团体内部通过shuffle的形式去分摊。这种非常适合用大数据环境下,有不同类型的数据消费者,每一个类型的消费者有各自的实例数。

  • topic+sharding:一组消费者作为一个整体来订阅topic,得到所有的消息,每个订阅团体内部通过sharding的形式去分摊。类似于topic shuffle,只是换用了sharding这种更严格的语义。

❸ 数据的封装语义。用于指定babel上承载数据的特征,例如:

  • batch operation:用于指定是否进行批处理传递。

  • Security:暂无使用。

  • Compressing:指定payload压缩方法,目前只做了gzip。

  • 机房:指定了机房所在地,框架会根据生产者和消费者的不同自动做跨机房的处理。

  • 持久化:指定在无消费者的情况下,是否需要持久化存储,以及最大大小。

  • 超时:指定消息的最大有效时间,超过的消息将会被丢弃。

  • 其他。

对于以上的通讯语义,首先需要去底层的mq基础里面找到相对应的设施来做封装,比如对于queue语义作个简单举例:

而对于像rpc,轮询,以及其他功能,则需要相应的代码来支撑,比如:

bigsec

  • response的返回可以通过client监听queue来实现

  • response和request的串联可以通过自定义的requestid来实现

  • 轮询可以通过client 端等待多个消息返回,可以用condition来做同步

  • ……

这里有不少细节,暂不在本文中进行展开了。

跨语言

由于几种mq都有python和java的客户端,所以我们工作会轻松很多,只是同样的逻辑需要写两份,好处还是很明显的,使得我们的系统语言无关,方便根据当前人员的技能情况来分配开发任务。
不过这里不得不吐槽python的并发,虽然有心理准备,没想到是如此之差。当使用多线程的时候,性能下降的厉害,比java要差两个数量级,所以我们python版做了同步(多线程),异步(协程)两个版本。异步版本的性能尚可接受;我们已经准备在build自己的异步python框架,来覆盖我们的应用程序。

跨机房通信

Babel的一大特色是跨机房通信,来帮助我们解决不同数据中心的通信问题,使得业务开发人员只用关心其所负责的业务即可。跨机房的通信和本机房的通信有所不同:

本地机房的通信讲究高吞吐量,rpc类访问会要求低延时。

跨机房通信必须应对复杂的网络情况,要求数据不丢,rpc类通信可以接受相对较高的延时。

实现上,我们利用了federation插件,当rpc框架发现存在跨机房访问时,会自动启用相应的路由,下图是同事画的两种情况下的路由,绿线是本地调用,红线是跨机房调用。

bigsec

对于业务应用而言,使用上是基本透明的,借助于mq的中转,在多机房环境下它也可以玩转除数据推送外的RPC类访问语义。

3 实战举例

实例

1 分布式数据计算平台

首先是我们的私有化大数据平台warden。warden集数据采集、转换、分发、实时分析和展示等功能于一体,希望从客户的原始网络流量中找出异常点和风险事件。

bigsec

此图是一个warden分布式版本的草图:

  1. 采集的数据通过topic sharding类分配给不同类型的消费者,比如ES writer,Mysql writer,实时分析引擎;每种消费者可以有不同的实例数。

  2. 实时计算引擎通过sharding来分摊流量,达到scale out的效果。

  3. rule引擎需要数据的时候同样通过简单的sharding的rpc就可以获得相应的数据了。

  4. 规则引擎的结果可以通过topic来进行再分发。

  5. 目前只有实时引擎是java的,因为性能要求苛刻,其他模块采用python开发。

上图只是个例子,来简要说明babel是如何支撑一个分布式数据计算系统的。实际的系统使用了更多的语义,也更加的复杂。不过借助于Babel的协助,整个系统在实现和运维上已经很大程度上减轻了复杂程度。

2 水平扩展的web系统

第二个例子是我们曾经做过的SAAS平台私有化案例,是我们早期SAAS平台的极简版本。

bigsec

图画的略凌乱了些:

  1. 系统主架构是用haproxy做负载均衡,发到我们的两台主机上。

  2. 两台主机内部完全相同,右边主机内部组件没有画全。

  3. 每台主机有内部的nginx,load balance到本机器内部的诸多python web server 上。

  4. python web server直接读取本地的nosql数据库。

  5. 写数据时,由于写请求sharding到两台机器上,所以我们有个topic的service来处理nosql数据写入,保证每个写入操作都写到两台机器上,每台机器的nosql始终存有全量的最新数据。

  6. 由于客户要求落地关系型数据库,所以通过shuffle再将写请求分散开,统一写入mysql中。

在这个系统中,我们成功的利用babel建立了自己的一致性框架,从而避免了去使用db做数据一致性;同时由于对等的服务器架构,在部署维护上省掉了很多事情。

框架运维

整个框架,我们都准备了统一的metrics体系去做监控和报警(实际上metrics系统本身的跨机房属性反而是通过babel来实现),详尽的监视了RPC的某个环节,之前有过我们监控的文章,这里就不重复了。

关于Windows RPC RpcEpResolveBinding失败rpc/encoded wsdls not support的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于akka-rpc(基于 akka 的 rpc 实现)、babel: yet another rpc, but far beyond rpc(上)、babel: yet another rpc, but far beyond rpc(下)、babel: yet another rpc, but far beyond rpc(中)的相关知识,请在本站寻找。

本文标签: