本文的目的是介绍Micronaut中使用RxJava的Websocket请求路由的详细情况,特别关注rxjavawebflux的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全
本文的目的是介绍Micronaut 中使用 RxJava 的 Websocket 请求路由的详细情况,特别关注rxjava webflux的相关信息。我们将通过专业的研究、有关数据的分析等多种方式,为您呈现一个全面的了解Micronaut 中使用 RxJava 的 Websocket 请求路由的机会,同时也不会遗漏关于ajax与websocket的区别以及websocket常用使用方式、ajax与websocket的区别以及websocket常用使用方式 介绍、angular – 使用WebSocket可观察RxJs、c#-4.0 – 使用Comet / XMPP和Microsoft Stack上的WebSocket技术实时Web通知和更新的选项?的知识。
本文目录一览:- Micronaut 中使用 RxJava 的 Websocket 请求路由(rxjava webflux)
- ajax与websocket的区别以及websocket常用使用方式
- ajax与websocket的区别以及websocket常用使用方式 介绍
- angular – 使用WebSocket可观察RxJs
- c#-4.0 – 使用Comet / XMPP和Microsoft Stack上的WebSocket技术实时Web通知和更新的选项?
Micronaut 中使用 RxJava 的 Websocket 请求路由(rxjava webflux)
如何解决Micronaut 中使用 RxJava 的 Websocket 请求路由?
我正在 Micronaut 中实现一个 WebSocket 路由器,它在 WebSocket“客户端”和 WebSocket“服务器”应用程序之间路由 WebSocket 文本消息。 每条消息都传达了主题 id,这是 url 路径 /{topic} 的一部分。来自同一主题的消息必须双向转发, 例如主题 A 的消息被转发到主题 A 等。see architecture diagram
WebSocket 路由
所以客户端创建到路由器的 WebSocket 连接,然后路由器创建到服务器的连接。这意味着一旦客户端创建到路由器的连接,路由器必须创建到服务器的连接。
为了在客户端 -> 服务器或服务器 -> 客户端的每个方向发送新消息,必须重用给定主题的现有连接。
这些连接将通过一些重试重新连接逻辑长期存在,我稍后会加入。
消息发送逻辑
客户端发送消息后,必须将其转发到服务器,反向服务器 -> 客户端也是如此。
问题
我被卡住的地方是:
- 如何使用 Micronaut 的 Flowable api 将消息从 ServerHandler 转发到 ClientHandler,反之亦然? 我需要的是一旦建立连接,只需保留连接建立的信息,并使用 Rx Java api 简单地转发 WebSocket 处理程序之间的消息。
最初我通过使用自定义的 WebSocketSession 缓存和 ServerHandler 和 ClientHandler 存储和检索来实现转发逻辑 来自缓存的 CompletableFuture。这种方法有效并且还解决了异步等待连接会话的问题, 但我想通过使用纯 RxJava Java 模式以更无状态的方式实现转发。 为会话使用自定义共享缓存的另一个设计缺陷是,它对于 Netty 中现有的会话注册表来说是多余的。
到目前为止,我已经基本实现了客户端和服务器的处理程序,客户端/服务器的 WebSocket 处理程序, 但我不知道如何正确集成 Flowable 的东西。
源代码ClientHandler:(处理从客户端到路由器的WebSocket连接)
@ServerWebSocket("/topic/{topicId}") public class ClientHandler {
private static final Logger LOG = LoggerFactory.getLogger(ClientHandler.class);
private RxWebSocketClient webSocketClient;
private ConnectionProperties connectionProperties;
public ClientHandler(@Client("${connection.url}") RxWebSocketClient webSocketClient,ConnectionProperties connectionProperties) {
this.webSocketClient = webSocketClient;
this.connectionProperties = connectionProperties;
}
@Onopen
public void onopen(String topicId,WebSocketSession session) {
LOG.info("Open connection for client topic: {}",topicId);
}
@OnMessage
public void onMessage(String topicId,String message,WebSocketSession session) {
LOG.info("New message from client topic: {}",topicId);
Flowable<ServerHandler> flowable = webSocketClient.connect(ServerHandler.class,connectionProperties.resolveURI(topicId));
//? so Now what to do with this
// the basic function works,but new connection is created with each message
//and caching flowable would not be much better than caching WebSocket sessions
//also it would be nicer to connect in onopen() and then subscribe in onMessage()
flowable.subscribe(serverHandler -> {
serverHandler.setClientSession(session); // register session to send messages back
serverHandler.send(message); // send message after connection
},t -> LOG.error("Error handling client topic: {}",topicId,t)); //handle exception
}
@OnClose
public void onClose(String topicId) {
LOG.info("Close connection for client topic: {}",topicId);
}
@OnError
public void onError(String topicId,WebSocketSession session,Throwable t) {
LOG.error("Error for client topic: {}",t);
}
}
源代码ServerHandler:(处理从路由器到服务器的WebSocket连接)
@ClientWebSocket("/topic/{topicId}") public class ServerHandler implements AutoCloseable,WebSocketSessionAware{
private static final Logger LOG = LoggerFactory.getLogger(ServerHandler.class);
private volatile WebSocketSession clientSession;
private volatile WebSocketSession serverSession;
private volatile String topicId;
@Onopen
public void onopen(String topicId,WebSocketSession session) {
this.topicId = topicId;
LOG.info("Open connection for server topic: {}",topicId);
}
@OnMessage
public Publisher<String> onMessage(String message) {
LOG.info("New message from server topic: {}",this.topicId);
return clientSession.send(message); //Could potentially use WebSocketbroadcaster as well and send to topic based on predicate
}
@OnClose
public void onClose(WebSocketSession session) {
LOG.info("Close connection for server topic: {}",this.topicId);
if (clientSession != null) {
clientSession.close();
}
}
@Override
public void close() throws Exception {
LOG.info("Closing handler for server topic: {}",this.topicId);
}
@Override
public void setWebSocketSession(WebSocketSession serverSession) {
this.serverSession = serverSession;
}
public void send(String message) {
if (serverSession == null) {
throw new IllegalStateException("Can not send if connection not opened");
}
serverSession.sendAsync(message); //send message to server
}
public void forceClose() {
if (serverSession != null) {
serverSession.close();
}
}
public void setClientSession(WebSocketSession clientSession) {
this.clientSession = clientSession;
}
}
类似的现有代码
我在其他项目中搜索了类似的代码,并在 Spring Cloud Gateway(WebsocketRoutingFilter 类)中找到了类似功能的片段
在此处查看完整代码:https://github.com/spring-cloud/spring-cloud-gateway/blob/8722f4062ed21de28ebf56f69bccc5ad4ac1d29d/spring-cloud-gateway-server/src/main/java/org/springframework/cloud/gateway/filter/WebsocketRoutingFilter.java
这个例子使用了稍微不同的 api 和 Tomcat WebSocketSession 和 Reactor 库 但无论如何,我不知道如何在 Micronaut Netty 之上用 RxJava Flowable api 表达类似的东西。
return client.execute(url,this.headers,new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) {
// Use retain() for Reactor Netty
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));
// .log("proxySessionSend",Level.FINE);
Mono<Void> serverSessionSend = session
.send(proxySession.receive().doOnNext(WebSocketMessage::retain));
// .log("sessionSend",Level.FINE);
return Mono.zip(proxySessionSend,serverSessionSend).then();
}
额外问题:
2)如何处理超时和重试与 Flowable 的连接?
如果我有这样的代码:
Flowable flowable = serverFlowable.connect(ServerHandler.class,uri);
2a) 如果在给定超时后连接尝试失败,如何连接想要在连接建立后发送消息的等待订阅者并传播错误?
2b) 如果使用 RxJava 连接尝试失败,如何以指数方式重试连接(5,20,30 秒?)?我想 repeat() 可以用于这个吗?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
ajax与websocket的区别以及websocket常用使用方式
笔者近期在公司的项目中渐渐的接触了一些比较高级的业务逻辑处理,其中比较有意思的地方就是前端接受后台给与的推送问题。
一般前端与后端的交互主要是使用ajax进行异步操作调用交互,比较有趣的是这种交互方式一般都是单项交互的--
-及前端给后端发出请求后端接受请求后执行操作,即便前端可以接受后端给予的返回值,但是与后端交互的主动权始终是放在前端手里面。
而这样就会遇到两个有意思的问题--
1.前端如果调用的接口后端操作事件过长可能会导致返回操作响应时间过长,如果此时用户单击其他页面的时候就会导致返回操作无法正常解决。
2.如果后端有什么比较重要的问题需要推送给前端消息,这个时候前端是无法接收到的。
面对这两个问题的主流解决办法分别为
针对第一条我们会给整个页面做一个加载中的动画并且锁定住整个页面从而强迫用户等待到整个后端返回值结束后再将页面解锁。
而对于第二点我们就要引入我们今天所讲的websocket这个概念。
一.什么是websocket
websocket协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
而他与常规的ajax最大的不同在于他可以双向接受和发送
一.websocket的简单使用
对于我们前端来说websocket的使用方式非常简单,协议标识符是ws
(如果加密,则为wss
),服务器网址就是 URL。我们只需要自己定义一个ws服务就可以了
代码如下
var ws = new WebSocket("URL路径");
ws.onopen = function(evt) {
console.log("打开成功");
ws.send("Hello WORD!");
};
ws.onmessage = function(evt) {
console.log( "Received Message: " + evt.data);
ws.close();
};
ws.onclose = function(evt) {
console.log("服务关闭");
};
如以上代码所示我们成功的创建了一个简单的名字为WS的websocket服务并且想后台定义的路径中发送了一条hello word的简讯并且会在接收到推送的时候打印Received Message
在你的ws服务打开以后会在前端的内一直处于挂起状态,由于现在框架横行,当你的框架或者说前端项目经过node编译或者运行时候你可以把ws服务写在所有页面的模板中就可以让项目只要处于被打开状态就能一直运行
这个时候你需要写接受推送消息即可,判断后端给你推送消息的类型并且做出相应的操作,完全不需要有ajax异步操作等待返回值或者返回值后自己操作时间太长而让用户等待太久的烦恼。而这一技术现在也普遍的用在了
在线聊天室和一些需要接受推送提示的地方。
不过值得注意的是一般情况下如果推送量较为大的话公司会选择一些比较大的云服务来做,比如笔者的公司就是使用的融云服务来做的,等笔者有空的话会专门开一次坑讲一讲融云服务的教程
ajax与websocket的区别以及websocket常用使用方式 介绍
笔者近期在公司的项目中渐渐的接触了一些比较高级的业务逻辑处理,其中比较有意思的地方就是前端接受后台给与的推送问题。
一般前端与后端的交互主要是使用ajax进行异步操作调用交互,比较有趣的是这种交互方式一般都是单项交互的--
-及前端给后端发出请求后端接受请求后执行操作,即便前端可以接受后端给予的返回值,但是与后端交互的主动权始终是放在前端手里面。
而这样就会遇到两个有意思的问题--
1.前端如果调用的接口后端操作事件过长可能会导致返回操作响应时间过长,如果此时用户单击其他页面的时候就会导致返回操作无法正常解决。
2.如果后端有什么比较重要的问题需要推送给前端消息,这个时候前端是无法接收到的。
面对这两个问题的主流解决办法分别为针对第一条我们会给整个页面做一个加载中的动画并且锁定住整个页面从而强迫用户等待到整个后端返回值结束后再将页面解锁。
而对于第二点我们就要引入我们今天所讲的websocket这个概念。
一.什么是websocket
websocket协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
而他与常规的ajax最大的不同在于他可以双向接受和发送
一.websocket的简单使用
对于我们前端来说websocket的使用方式非常简单,协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。我们只需要自己定义一个ws服务就可以了
代码如下
var ws = new WebSocket("URL路径"); ws.onopen = function(evt) { console.log("打开成功"); ws.send("Hello WORD!"); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); ws.close(); }; ws.onclose = function(evt) { console.log("服务关闭"); };
如以上代码所示我们成功的创建了一个简单的名字为WS的websocket服务并且想后台定义的路径中发送了一条hello word的简讯并且会在接收到推送的时候打印Received Message
在你的ws服务打开以后会在前端的内一直处于挂起状态,由于现在框架横行,当你的框架或者说前端项目经过node编译或者运行时候你可以把ws服务写在所有页面的模板中就可以让项目只要处于被打开状态就能一直运行
这个时候你需要写接受推送消息即可,判断后端给你推送消息的类型并且做出相应的操作,完全不需要有ajax异步操作等待返回值或者返回值后自己操作时间太长而让用户等待太久的烦恼。而这一技术现在也普遍的用在了
在线聊天室和一些需要接受推送提示的地方。
不过值得注意的是一般情况下如果推送量较为大的话公司会选择一些比较大的云服务来做,比如笔者的公司就是使用的融云服务来做的,等笔者有空的话会专门开一次坑讲一讲融云服务的教程
总结
以上所述是小编给大家介绍的ajax与websocket的区别以及websocket常用使用方式 介绍,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
angular – 使用WebSocket可观察RxJs
在我的测试用例中,我有2个客户端组件. Observable计时器按预期打印两个不同的客户端ID.
每个ngOnInit()还打印其客户端的id.
现在出于某种原因,对于每条消息,websocketService.observeClient()的订阅被调用2次,但是this.client.id总是打印第二个客户端的值.
继承我的客户组件
@Component({ ... }) export class ClientComponent implements OnInit { @input() client: Client; constructor(public websocketService: WebsocketService) { Observable.timer(1000,1000).subscribe(() => console.log(this.client.id)); } ngOnInit() { console.log(this.client.id); this.websocketService.observeClient().subscribe(data => { console.log('message',this.client.id); }); } }
还有我的websocket服务
@Injectable() export class WebsocketService { private observable: Observable<MessageEvent>; private observer: Subject<Message>; constructor() { const socket = new WebSocket('ws://localhost:9091'); this.observable = Observable.create( (observer: Observer<MessageEvent>) => { socket.onmessage = observer.next.bind(observer); socket.onerror = observer.error.bind(observer); socket.onclose = observer.complete.bind(observer); return socket.close.bind(socket); } ); this.observer = Subject.create({ next: (data: Message) => { if (socket.readyState === WebSocket.OPEN) { socket.send(JSON.stringify(data)); } } }); } observeClient(): Observable<MessageEvent> { return this.observable; } }
编辑
好吧,据我所知,它与Observables是单播对象这一事实有关,我必须使用Subject,但我不知道如何创建Subject.
https://stackoverflow.com/a/44067972/552203
TLDR:
let subject = Observable.webSocket('ws://localhost:8081'); subject .retry() .subscribe( (msg) => console.log('message received: ' + msg),(err) => console.log(err),() => console.log('complete') ); subject.next(JSON.stringify({ op: 'hello' }));
c#-4.0 – 使用Comet / XMPP和Microsoft Stack上的WebSocket技术实时Web通知和更新的选项?
其竞争者是基于Jabber / Comet / XMPP和WebSocket技术.
彗星营:
> Pokein
> WebSync
网路营:
> Kaazing
> LightStreamer
> SuperWebSocket
> XSockets
> SignalR
由于现有的基础设施是Microsoft堆栈,所以我宁可不将基于Java的服务器引入到组合中.说这个,它离开(一个非常有吸引力的)WebSync(Comet)和SuperWebSocket(WebSockets).然而,Pokein的DLL集成也是相当无缝的.Net项目.
是否有更多真实的生产级WebSocket计划.Net?在Microsoft堆栈上采用WebSockets还为时过早,我应该喜欢像Kazing这样的东西吗?
我还在等待关于我们当前用户群浏览器类型和版本的报告(检查HTML5兼容性).我怀疑这个数字会很低(较老的用户群).如果是这样,彗星选项将是赢家.
还有什么其他的事情要考虑?
看看Sockets.IO等其他一些.Net计划,我认为这可能还处于起步阶段,适用于大规模的生产系统.
可以从任何使用上述技术和产品的人得到一些意见吗?
谢谢.
UPDATE
我仍然在寻找一些在生产级别可靠的WebSocket服务器.最近发现XSockets和SignalR到Websockets阵营. Hoewver,目前还有两个主要的竞争者.这可能只是因为他们有惊人的营销团队,开发人员可以使用的材料 – API和视频.许多其他实现似乎仍然在新生阶段,其中给出了与只有少数客户端的连接的示例.虽然这表明了这项技术,但这些演示文稿没有备份大量有效负载/负载能力数据. Kaazing和LightStreamer满足以下要求.
XSockets有一些很好的例子,但再次,缺少一些真正的生产指标.
SignalR还没有在真实的生产环境中进行测试.扩展解决方案正在开发中,但尚不稳定.期待看到这个项目在未来的发展.
主要要求是:
>实施后备技术的能力(如果HTML5 / WebSockets是
不可用)
>并发连接数量高,每个消息数量多
第二
>可扩展 – 增加更多服务器/节点的能力
交通要求
解决方法
在“正常”系统中,您应该看到〜20k并发(每个节点)和〜100k个消息/秒.虽然这些数字非常粗糙,因为它严重依赖于您的系统和您发送的消息类型等.我们已经看到高达5万用户(每个节点)和(在不同的测试中)300k个消息/秒.
(免责声明:我为冻山工作)
关于Micronaut 中使用 RxJava 的 Websocket 请求路由和rxjava webflux的问题就给大家分享到这里,感谢你花时间阅读本站内容,更多关于ajax与websocket的区别以及websocket常用使用方式、ajax与websocket的区别以及websocket常用使用方式 介绍、angular – 使用WebSocket可观察RxJs、c#-4.0 – 使用Comet / XMPP和Microsoft Stack上的WebSocket技术实时Web通知和更新的选项?等相关知识的信息别忘了在本站进行查找喔。
本文标签: