在本文中,我们将带你了解SpringWebSocket@SendToSession:向特定会话发送消息在这篇文章中,我们将为您详细介绍SpringWebSocket@SendToSession:向特定
在本文中,我们将带你了解Spring WebSocket @SendToSession:向特定会话发送消息在这篇文章中,我们将为您详细介绍Spring WebSocket @SendToSession:向特定会话发送消息的方方面面,并解答springboot websocket消息推送常见的疑惑,同时我们还将给您一些技巧,以帮助您实现更有效的io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码、Java Spring测试多个发送消息WebSocket、node.js – Express router WebSocketServer – 在POST上发送消息、node.js – 使用socket.io和空消息队列向特定客户端发送消息。
本文目录一览:- Spring WebSocket @SendToSession:向特定会话发送消息(springboot websocket消息推送)
- io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码
- Java Spring测试多个发送消息WebSocket
- node.js – Express router WebSocketServer – 在POST上发送消息
- node.js – 使用socket.io和空消息队列向特定客户端发送消息
Spring WebSocket @SendToSession:向特定会话发送消息(springboot websocket消息推送)
是否可以向特定会话发送消息?
我在客户端和Spring Servlet之间有未经身份验证的WebSocket。异步作业结束时,我需要向特定的连接发送未经请求的消息。
@Controllerpublic class WebsocketTest { @Autowired public SimpMessageSendingOperations messagingTemplate; ExecutorService executor = Executors.newSingleThreadExecutor(); @MessageMapping("/start") public void start(SimpMessageHeaderAccessor accessor) throws Exception { String applicantId=accessor.getSessionId(); executor.submit(() -> { //... slow job jobEnd(applicantId); }); } public void jobEnd(String sessionId){ messagingTemplate.convertAndSend("/queue/jobend"); //how to send only to that session? }}
如您在此代码中所见,客户端可以启动异步作业,完成后,它需要结束消息。显然,我只需要向申请人发送消息,而不是向所有人广播。拥有@SendToSession
注释或messagingTemplate.convertAndSendToSession
方法会很棒。
更新
我尝试了这个:
messagingTemplate.convertAndSend("/queue/jobend", true, Collections.singletonMap(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId));
但这会广播到所有会话,而不仅仅是指定的会话。
更新2
使用convertAndSendToUser()方法进行测试。该测试是Spring官方教程的hack:https://spring.io/guides/gs/messaging-stomp-
websocket/
这是服务器代码:
@Controllerpublic class WebsocketTest { @PostConstruct public void init(){ ScheduledExecutorService statusTimerExecutor=Executors.newSingleThreadScheduledExecutor(); statusTimerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test")); } }, 5000,5000, TimeUnit.MILLISECONDS); } @Autowired public SimpMessageSendingOperations messagingTemplate;}
这是客户端代码:
function connect() { var socket = new WebSocket(''ws://localhost:8080/hello''); stompClient = Stomp.over(socket); stompClient.connect({}, function(frame) { setConnected(true); console.log(''Connected: '' + frame); stompClient.subscribe(''/user/queue/test'', function(greeting){ console.log(JSON.parse(greeting.body)); }); }); }
不幸的是,客户端没有按预期每隔5000毫秒收到一次会话答复。我确定“
1”是连接的第二个客户端的有效sessionId,因为我在调试模式下看到SimpMessageHeaderAccessor.getSessionId()
背景场景
我想为远程作业创建一个进度条,客户端向服务器请求一个异步作业,它通过从服务器发送的websocket消息检查其进度。这不是文件上传,而是远程计算,因此只有服务器知道每个作业的进度。我需要向特定的会话发送消息,因为每个作业都是由会话启动的。客户要求进行远程计算服务器将启动此作业,并针对每个作业步骤将其工作进度状态回复给申请人客户端。客户获取有关其工作的消息并建立进度/状态栏。这就是为什么我需要每个会话的消息。我也可以使用每用户一条消息,但是Spring
不会提供 每位用户不请自来的消息。
解决方案
__ __ ___ ___ _ __ ___ _ _ ___ ___ ___ _ _ _ _____ ___ ___ _ _ \ \ / // _ \ | _ \| |/ /|_ _|| \| | / __| / __| / _ \ | | | | | ||_ _||_ _|/ _ \ | \| | \ \/\/ /| (_) || /| '' < | | | .` || (_ | \__ \| (_) || |__| |_| | | | | || (_) || .` | \_/\_/ \___/ |_|_\|_|\_\|___||_|\_| \___| |___/ \___/ |____|\___/ |_| |___|\___/ |_|\_|
从UPDATE2解决方案开始,我必须完成最后一个参数(MessageHeaders)的convertAndSendToUser方法:
messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"), createHeaders("1"));
createHeaders()
该方法在哪里:
private MessageHeaders createHeaders(String sessionId) { SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE); headerAccessor.setSessionId(sessionId); headerAccessor.setLeaveMutable(true); return headerAccessor.getMessageHeaders(); }
答案1
小编典典不需要创建特定的目的地,从Spring
4.1开始就已经完成(请参阅SPR-11309)。
给定用户订阅/user/queue/something
队列,您可以使用以下命令将消息发送到单个会话:
如SimpMessageSendingOperations Javadoc中所述,由于您的用户名实际上是一个sessionId,因此您还必须将其设置为标头,否则DefaultUserDestinationResolver
将无法路由该消息并将其丢弃。
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor .create(SimpMessageType.MESSAGE);headerAccessor.setSessionId(sessionId);headerAccessor.setLeaveMutable(true);messagingTemplate.convertAndSendToUser(sessionId,"/queue/something", payload, headerAccessor.getMessageHeaders());
您不需要为此认证用户。
io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码
@Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new ReadTimeoutHandler(60,TimeUnit.SECONDS)); if (sslContext != null) { p.addLast(sslContext.newHandler(ch.alloc())); } p.addLast(new HttpContentCompressor(5)); p.addLast(new HttpServerCodec()); p.addLast(new HttpObjectAggregator(1048576)); p.addLast(new ChunkedWriteHandler()); if (null != corsConfig) { p.addLast(new CorsHandler(corsConfig)); } p.addLast(new WebSocketServerCompressionHandler()); p.addLast(new WebSocketServerProtocolHandler(webSocketPath,null,true)); p.addLast(new LaputaServerHandler(null != sslContext,requestProcessor)); }
private void switchToHttp(ChannelHandlerContext ctx) { ChannelPipeline p = ctx.pipeline(); p.addLast(new HttpServerCodec()); p.addLast(new HttpObjectAggregator(65536)); p.addLast(new WebSocketServerCompressionHandler()); p.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,"ws",true)); p.addLast(new NetoJsonStringToMapWebSocketDecoder()); p.addLast(new NetoMessagetoWebsocketFrameEncoder()); p.remove(this); // 핸들러를 다시 등록 했으므로 이벤트를 전파 ctx.fireChannelActive(); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,"binary",true)); pipeline.addLast(new WebSocketIndexPageHandler(pluginDataFolder)); pipeline.addLast(new WebSocketFrameHandler(webSocketServerThread,checkIPBans)); }
@Override protected void initPipeline(ChannelPipeline pipeline) { pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(CC.mp.net.ws_path,true)); pipeline.addLast(new WebSocketIndexPageHandler()); pipeline.addLast(getChannelHandler()); }
/** * 채널 파이프라인 설정. * Netty.Server.Configuration.NettyServerConfiguration 에서 등록한 Bean 을 이용해 사용자의 통신을 처리할 Handler 도 등록. * Netty.Server.Handler.JsonHandler 에서 실제 사용자 요청 처리. * * @param channel * @throws Exception */ @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline channelPipeline = channel.pipeline(); switch (transferType) { case "websocket": channelPipeline .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(65536)) .addLast(new WebSocketServerCompressionHandler()) .addLast(new WebSocketServerProtocolHandler(transferWebsocketPath,transferWebsocketSubProtocol,transferWebsocketAllowExtensions)) .addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline))) .addLast(websocketHandler); case "tcp": default: channelPipeline .addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE)) .addLast(STRING_DECODER) .addLast(STRING_ENCODER) .addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline))) .addLast(jsonHandler); } }
@Override protected void initChannel(final SocketChannel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true)); pipeline.addLast(webSockServiceHandler); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,subProtocols,true)); pipeline.addLast(new WebSocketRemoteServerFrameHandler()); }
@Override public void apply(ChannelPipeline pipeline){ pipeline.channel().config().setAutoRead(true); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); if(Config.getInstance().isCompressionEnabled()){ pipeline.addLast(new WebSocketServerCompressionHandler()); } pipeline.addLast(guicer.inject(new WebSocketValidationHandler())); pipeline.addLast(guicer.inject(new WebSocketServerHandler())); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true)); pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH)); pipeline.addLast(new WebSocketFrameHandler()); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true)); pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH)); pipeline.addLast(new TextWebSocketFrameHandler()); }
@Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true)); pipeline.addLast(new WatcherServerIndexPageHandler(WEBSOCKET_PATH)); pipeline.addLast(new WatcherServerHandler()); }
@Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new HttpObjectAggregator(65536)); pipeline.addLast(new WebSocketServerCompressionHandler()); pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true)); pipeline.addLast(new WebSocketRemoteServerFrameHandler()); }
@Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Flash policy file if (isFlashSupported) { pipeline.addLast(FLASH_POLICY_HANDLER,flashPolicyHandler); } // SSL if (sslContext != null) { pipeline.addLast(SSL_HANDLER,sslContext.newHandler(ch.alloc())); } // HTTP pipeline.addLast(HTTP_Request_DECODER,new HttpRequestDecoder()); pipeline.addLast(HTTP_CHUNK_AGGREGATOR,new HttpObjectAggregator(MAX_HTTP_CONTENT_LENGTH)); pipeline.addLast(HTTP_RESPONSE_ENCODER,new HttpResponseEncoder()); if (isHttpCompressionEnabled) { pipeline.addLast(HTTP_COMPRESSION,new HttpContentCompressor()); } // Flash resources if (isFlashSupported) { pipeline.addLast(FLASH_RESOURCE_HANDLER,flashResourceHandler); } // Socket.IO pipeline.addLast(SOCKETIO_PACKET_ENCODER,packetEncoderHandler); pipeline.addLast(SOCKETIO_HANDSHAKE_HANDLER,handshakeHandler); pipeline.addLast(SOCKETIO_disCONNECT_HANDLER,disconnectHandler); if (isWebsocketCompressionEnabled) { pipeline.addLast(WEBSOCKET_COMPRESSION,new WebSocketServerCompressionHandler()); } pipeline.addLast(SOCKETIO_WEBSOCKET_HANDLER,webSocketHandler); if (isFlashSupported) { pipeline.addLast(SOCKETIO_FLASHSOCKET_HANDLER,flashSocketHandler); } pipeline.addLast(SOCKETIO_XHR_POLLING_HANDLER,xhrPollingHandler); if (isJsonpSupported) { pipeline.addLast(SOCKETIO_JSONP_POLLING_HANDLER,jsonpPollingHandler); } pipeline.addLast(SOCKETIO_HEARTBEAT_HANDLER,heartbeatHandler); pipeline.addLast(eventExecutorGroup,SOCKETIO_PACKET_disPATCHER,packetdispatcherHandler); if (pipelineModifier != null) { pipelineModifier.modifyPipeline(pipeline); } }
Java Spring测试多个发送消息WebSocket
如何解决Java Spring测试多个发送消息WebSocket?
我正在遵循Java Spring的本教程:https://spring.io/guides/gs/messaging-stomp-websocket/。
但是,他们目前设置的测试程序仅允许一条发送消息。如果我用多条替换该发送消息,则该代码似乎只注册了第一条发送消息。您可以在this file上查看测试设置。
如何在同一测试方法上测试多条发送消息?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)
node.js – Express router WebSocketServer – 在POST上发送消息
>连接始终保持活动状态,因此我可以处理状态更新
>我可以使用POST请求中的数据向客户端发送websocket消息
ws api here
router.post('/',function (req,res) { // Need to send ws.send() with post data }) wss.on('connection',function(ws) { ws.on('message',function(message) { console.log('r : %s',message); }); // ws is only defined under this callback as an object of type ws });
解决方法
//Create an event var event = require('events').EventEmitter(); router.post('/',res) { // fire an event event.emit('homePage') }) wss.on('connection',function(ws) { ws.on('message',function(message) { console.log('r : %s',message); }); // listen the event event.on('homePage',function(){ ws.emit('someEvent'); }); });
node.js – 使用socket.io和空消息队列向特定客户端发送消息
文档太糟糕了,这根本不是真的。
我想通过socket.io发送一个反馈给特定的客户端
我的服务器端看起来像这样:
app.get('/upload',requiresLogin,function(request,response) { response.render('upload/index.jade'); io.sockets.on('connection',function (socket) { console.log('SOCKET ID ' + socket.id); io.sockets.socket(socket.id).emit('new','hello'); }); });
并且客户端端看起来像这样:
$(document).ready(function() { var socket = io.connect('http://localhost:80/socket.io/socket.io.js'); socket.on('new',function (data) { console.log(socket.id); console.log(data); //$('#state').html(data.status); }); });
但客户端什么都不做。我已经尝试了几乎一切。有人可以告诉我,我做错了,请!
总结
以上是小编为你收集整理的node.js – 使用socket.io和空消息队列向特定客户端发送消息全部内容。
如果觉得小编网站内容还不错,欢迎将小编网站推荐给好友。
关于Spring WebSocket @SendToSession:向特定会话发送消息和springboot websocket消息推送的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码、Java Spring测试多个发送消息WebSocket、node.js – Express router WebSocketServer – 在POST上发送消息、node.js – 使用socket.io和空消息队列向特定客户端发送消息的相关信息,请在本站寻找。
本文标签: