GVKun编程网logo

Spring WebSocket @SendToSession:向特定会话发送消息(springboot websocket消息推送)

21

在本文中,我们将带你了解SpringWebSocket@SendToSession:向特定会话发送消息在这篇文章中,我们将为您详细介绍SpringWebSocket@SendToSession:向特定

在本文中,我们将带你了解Spring WebSocket @SendToSession:向特定会话发送消息在这篇文章中,我们将为您详细介绍Spring WebSocket @SendToSession:向特定会话发送消息的方方面面,并解答springboot websocket消息推送常见的疑惑,同时我们还将给您一些技巧,以帮助您实现更有效的io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码、Java Spring测试多个发送消息WebSocket、node.js – Express router WebSocketServer – 在POST上发送消息、node.js – 使用socket.io和空消息队列向特定客户端发送消息

本文目录一览:

Spring WebSocket @SendToSession:向特定会话发送消息(springboot websocket消息推送)

Spring WebSocket @SendToSession:向特定会话发送消息(springboot websocket消息推送)

是否可以向特定会话发送消息?

我在客户端和Spring Servlet之间有未经身份验证的WebSocket。异步作业结束时,我需要向特定的连接发送未经请求的消息。

@Controllerpublic class WebsocketTest {     @Autowired    public SimpMessageSendingOperations messagingTemplate;    ExecutorService executor = Executors.newSingleThreadExecutor();    @MessageMapping("/start")    public void start(SimpMessageHeaderAccessor accessor) throws Exception {        String applicantId=accessor.getSessionId();                executor.submit(() -> {            //... slow job            jobEnd(applicantId);        });    }    public void jobEnd(String sessionId){        messagingTemplate.convertAndSend("/queue/jobend"); //how to send only to that session?    }}

如您在此代码中所见,客户端可以启动异步作业,完成后,它需要结束消息。显然,我只需要向申请人发送消息,而不是向所有人广播。拥有@SendToSession注释或messagingTemplate.convertAndSendToSession方法会很棒。

更新

我尝试了这个:

messagingTemplate.convertAndSend("/queue/jobend", true, Collections.singletonMap(SimpMessageHeaderAccessor.SESSION_ID_HEADER, sessionId));

但这会广播到所有会话,而不仅仅是指定的会话。

更新2

使用convertAndSendToUser()方法进行测试。该测试是Spring官方教程的hack:https://spring.io/guides/gs/messaging-stomp-
websocket/

这是服务器代码:

@Controllerpublic class WebsocketTest {    @PostConstruct    public void init(){        ScheduledExecutorService statusTimerExecutor=Executors.newSingleThreadScheduledExecutor();        statusTimerExecutor.scheduleAtFixedRate(new Runnable() {                            @Override            public void run() {                messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"));            }        }, 5000,5000, TimeUnit.MILLISECONDS);    }     @Autowired        public SimpMessageSendingOperations messagingTemplate;}

这是客户端代码:

function connect() {            var socket = new WebSocket(''ws://localhost:8080/hello'');            stompClient = Stomp.over(socket);            stompClient.connect({}, function(frame) {                setConnected(true);                console.log(''Connected: '' + frame);                stompClient.subscribe(''/user/queue/test'', function(greeting){                    console.log(JSON.parse(greeting.body));                });            });        }

不幸的是,客户端没有按预期每隔5000毫秒收到一次会话答复。我确定“
1”是连接的第二个客户端的有效sessionId,因为我在调试模式下看到SimpMessageHeaderAccessor.getSessionId()

背景场景

我想为远程作业创建一个进度条,客户端向服务器请求一个异步作业,它通过从服务器发送的websocket消息检查其进度。这不是文件上传,而是远程计算,因此只有服务器知道每个作业的进度。我需要向特定的会话发送消息,因为每个作业都是由会话启动的。客户要求进行远程计算服务器将启动此作业,并针对每个作业步骤将其工作进度状态回复给申请人客户端。客户获取有关其工作的消息并建立进度/状态栏。这就是为什么我需要每个会话的消息。我也可以使用每用户一条消息,但是Spring
不会提供 每位用户不请自来的消息。
解决方案

 __      __ ___   ___  _  __ ___  _  _   ___      ___   ___   _    _   _  _____  ___  ___   _  _  \ \    / // _ \ | _ \| |/ /|_ _|| \| | / __|    / __| / _ \ | |  | | | ||_   _||_ _|/ _ \ | \| |  \ \/\/ /| (_) ||   /| '' <  | | | .` || (_ |    \__ \| (_) || |__| |_| |  | |   | || (_) || .` |   \_/\_/  \___/ |_|_\|_|\_\|___||_|\_| \___|    |___/ \___/ |____|\___/   |_|  |___|\___/ |_|\_|

从UPDATE2解决方案开始,我必须完成最后一个参数(MessageHeaders)的convertAndSendToUser方法:

messagingTemplate.convertAndSendToUser("1","/queue/test", new Return("test"), createHeaders("1"));

createHeaders()该方法在哪里:

private MessageHeaders createHeaders(String sessionId) {        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);        headerAccessor.setSessionId(sessionId);        headerAccessor.setLeaveMutable(true);        return headerAccessor.getMessageHeaders();    }

答案1

小编典典

不需要创建特定的目的地,从Spring
4.1开始就已经完成(请参阅SPR-11309)。

给定用户订阅/user/queue/something队列,您可以使用以下命令将消息发送到单个会话:

如SimpMessageSendingOperations Javadoc中所述,由于您的用户名实际上是一个sessionId,因此您还必须将其设置为标头,否则DefaultUserDestinationResolver将无法路由该消息并将其丢弃。

SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor    .create(SimpMessageType.MESSAGE);headerAccessor.setSessionId(sessionId);headerAccessor.setLeaveMutable(true);messagingTemplate.convertAndSendToUser(sessionId,"/queue/something", payload,     headerAccessor.getMessageHeaders());

您不需要为此认证用户。

io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码

io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码

项目:laputa    文件:LaputaServerInitializer.java   
@Override
public void initChannel(SocketChannel ch) {
  ChannelPipeline p = ch.pipeline();
  p.addLast(new ReadTimeoutHandler(60,TimeUnit.SECONDS));
  if (sslContext != null) {
    p.addLast(sslContext.newHandler(ch.alloc()));
  }
  p.addLast(new HttpContentCompressor(5));
  p.addLast(new HttpServerCodec());
  p.addLast(new HttpObjectAggregator(1048576));
  p.addLast(new ChunkedWriteHandler());
  if (null != corsConfig) {
    p.addLast(new CorsHandler(corsConfig));
  }
  p.addLast(new WebSocketServerCompressionHandler());
  p.addLast(new WebSocketServerProtocolHandler(webSocketPath,null,true));
  p.addLast(new LaputaServerHandler(null != sslContext,requestProcessor));
}
项目:neto    文件:ProtocolUnificationHandler.java   
private void switchToHttp(ChannelHandlerContext ctx) {
    ChannelPipeline p = ctx.pipeline();
    p.addLast(new HttpServerCodec());
    p.addLast(new HttpObjectAggregator(65536));
    p.addLast(new WebSocketServerCompressionHandler());
    p.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,"ws",true));
    p.addLast(new NetoJsonStringToMapWebSocketDecoder());
    p.addLast(new NetoMessagetoWebsocketFrameEncoder());
    p.remove(this);

    // 핸들러를 다시 등록 했으므로 이벤트를 전파
    ctx.fireChannelActive();
}
项目:WebSandBoxMC    文件:WebSocketServerInitializer.java   
@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc()));
    }
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new WebSocketServerCompressionHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,"binary",true));
    pipeline.addLast(new WebSocketIndexPageHandler(pluginDataFolder));
    pipeline.addLast(new WebSocketFrameHandler(webSocketServerThread,checkIPBans));
}
项目:mpush    文件:WebsocketServer.java   
@Override
protected void initPipeline(ChannelPipeline pipeline) {
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new WebSocketServerCompressionHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler(CC.mp.net.ws_path,true));
    pipeline.addLast(new WebSocketIndexPageHandler());
    pipeline.addLast(getChannelHandler());
}
项目:ChatServer    文件:NettyChannelInitializer.java   
/**
 * 채널 파이프라인 설정.
 * Netty.Server.Configuration.NettyServerConfiguration 에서 등록한 Bean 을 이용해 사용자의 통신을 처리할 Handler 도 등록.
 * Netty.Server.Handler.JsonHandler 에서 실제 사용자 요청 처리.
 *
 * @param channel
 * @throws Exception
 */
@Override
protected void initChannel(Channel channel) throws Exception {

    ChannelPipeline channelPipeline = channel.pipeline();

    switch (transferType) {

        case "websocket":

            channelPipeline
                    .addLast(new HttpServerCodec())
                    .addLast(new HttpObjectAggregator(65536))
                    .addLast(new WebSocketServerCompressionHandler())
                    .addLast(new WebSocketServerProtocolHandler(transferWebsocketPath,transferWebsocketSubProtocol,transferWebsocketAllowExtensions))
                    .addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline)))
                    .addLast(websocketHandler);

        case "tcp":
        default:

            channelPipeline
                    .addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE))
                    .addLast(STRING_DECODER)
                    .addLast(STRING_ENCODER)
                    .addLast(new LoggingHandler(LogLevel.valueOf(logLevelPipeline)))
                    .addLast(jsonHandler);

    }

}
项目:Heliosstreams    文件:RPCServer.java   
@Override
protected void initChannel(final SocketChannel ch) throws Exception {
    final ChannelPipeline pipeline = ch.pipeline();
       pipeline.addLast(new HttpServerCodec());
       pipeline.addLast(new HttpObjectAggregator(65536));
       pipeline.addLast(new WebSocketServerCompressionHandler());
       pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true));
       pipeline.addLast(webSockServiceHandler);

}
项目:product-ei    文件:WebSocketRemoteServerInitializer.java   
@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc()));
    }
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new WebSocketServerCompressionHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,subProtocols,true));
    pipeline.addLast(new WebSocketRemoteServerFrameHandler());
}
项目:xockets.io    文件:WebSocketPipelineBuilder.java   
@Override
public void apply(ChannelPipeline pipeline){
    pipeline.channel().config().setAutoRead(true);
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    if(Config.getInstance().isCompressionEnabled()){
        pipeline.addLast(new WebSocketServerCompressionHandler());
    }
    pipeline.addLast(guicer.inject(new WebSocketValidationHandler()));
    pipeline.addLast(guicer.inject(new WebSocketServerHandler()));
}
项目:JavaAyo    文件:WebSocketServerInitializer.java   
@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc()));
    }
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new WebSocketServerCompressionHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true));
    pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH));
    pipeline.addLast(new WebSocketFrameHandler());
}
项目:Netty-WebSocket    文件:WebSocketServerInitializer.java   
@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc()));
    }
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new WebSocketServerCompressionHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true));
    pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH));
    pipeline.addLast(new TextWebSocketFrameHandler());
}
项目:SimLogMonitor    文件:WatcherChannelInitializer.java   
@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new WebSocketServerCompressionHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true));
    pipeline.addLast(new WatcherServerIndexPageHandler(WEBSOCKET_PATH));
    pipeline.addLast(new WatcherServerHandler());
}
项目:carbon-transports    文件:WebSocketRemoteServerInitializer.java   
@Override
public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    if (sslCtx != null) {
        pipeline.addLast(sslCtx.newHandler(ch.alloc()));
    }
    pipeline.addLast(new HttpServerCodec());
    pipeline.addLast(new HttpObjectAggregator(65536));
    pipeline.addLast(new WebSocketServerCompressionHandler());
    pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH,true));
    pipeline.addLast(new WebSocketRemoteServerFrameHandler());
}
项目:socketio    文件:SocketIOChannelInitializer.java   
@Override
protected void initChannel(Channel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
  // Flash policy file
  if (isFlashSupported) {
    pipeline.addLast(FLASH_POLICY_HANDLER,flashPolicyHandler);
  }
  // SSL
  if (sslContext != null) {
    pipeline.addLast(SSL_HANDLER,sslContext.newHandler(ch.alloc()));
  }

  // HTTP
  pipeline.addLast(HTTP_Request_DECODER,new HttpRequestDecoder());
  pipeline.addLast(HTTP_CHUNK_AGGREGATOR,new HttpObjectAggregator(MAX_HTTP_CONTENT_LENGTH));
  pipeline.addLast(HTTP_RESPONSE_ENCODER,new HttpResponseEncoder());
  if (isHttpCompressionEnabled) {
    pipeline.addLast(HTTP_COMPRESSION,new HttpContentCompressor());
  }

  // Flash resources
  if (isFlashSupported) {
    pipeline.addLast(FLASH_RESOURCE_HANDLER,flashResourceHandler);
  }

  // Socket.IO
  pipeline.addLast(SOCKETIO_PACKET_ENCODER,packetEncoderHandler);
  pipeline.addLast(SOCKETIO_HANDSHAKE_HANDLER,handshakeHandler);
  pipeline.addLast(SOCKETIO_disCONNECT_HANDLER,disconnectHandler);
  if (isWebsocketCompressionEnabled) {
    pipeline.addLast(WEBSOCKET_COMPRESSION,new WebSocketServerCompressionHandler());
  }
  pipeline.addLast(SOCKETIO_WEBSOCKET_HANDLER,webSocketHandler);
  if (isFlashSupported) {
    pipeline.addLast(SOCKETIO_FLASHSOCKET_HANDLER,flashSocketHandler);
  }
  pipeline.addLast(SOCKETIO_XHR_POLLING_HANDLER,xhrPollingHandler);
  if (isJsonpSupported) {
    pipeline.addLast(SOCKETIO_JSONP_POLLING_HANDLER,jsonpPollingHandler);
  }
  pipeline.addLast(SOCKETIO_HEARTBEAT_HANDLER,heartbeatHandler);
  pipeline.addLast(eventExecutorGroup,SOCKETIO_PACKET_disPATCHER,packetdispatcherHandler);

  if (pipelineModifier != null) {
    pipelineModifier.modifyPipeline(pipeline);
  }
}

Java Spring测试多个发送消息WebSocket

Java Spring测试多个发送消息WebSocket

如何解决Java Spring测试多个发送消息WebSocket?

我正在遵循Java Spring的本教程:https://spring.io/guides/gs/messaging-stomp-websocket/。

但是,他们目前设置的测试程序仅允许一条发送消息。如果我用多条替换该发送消息,则该代码似乎只注册了第一条发送消息。您可以在this file上查看测试设置。

如何在同一测试方法上测试多条发送消息?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

node.js – Express router WebSocketServer – 在POST上发送消息

node.js – Express router WebSocketServer – 在POST上发送消息

这是我到目前为止所做的,我正在努力找到一个解决方案,以便:

>连接始终保持活动状态,因此我可以处理状态更新
>我可以使用POST请求中的数据向客户端发送websocket消息

ws api here

router.post('/',function (req,res) {
// Need to send ws.send() with post data
})


wss.on('connection',function(ws) {
  ws.on('message',function(message) {
    console.log('r : %s',message);
  });
  // ws is only defined under this callback as an object of type ws
});

解决方法

你可以使用这样的事件:

//Create an event
var event = require('events').EventEmitter();

router.post('/',res) {
   // fire an event
   event.emit('homePage')
})

wss.on('connection',function(ws) {
    ws.on('message',function(message) {
     console.log('r : %s',message);
    });

    // listen the event
    event.on('homePage',function(){
        ws.emit('someEvent');
    });
});

node.js – 使用socket.io和空消息队列向特定客户端发送消息

node.js – 使用socket.io和空消息队列向特定客户端发送消息

我会疯狂与socket.io!
文档太糟糕了,这根本不是真的。

我想通过socket.io发送一个反馈给特定的客户端

我的服务器端看起来像这样:

app.get('/upload',requiresLogin,function(request,response) {
    response.render('upload/index.jade');
    io.sockets.on('connection',function (socket) {
        console.log('SOCKET ID ' + socket.id);
        io.sockets.socket(socket.id).emit('new','hello');
    });
});

并且客户端端看起来像这样:

$(document).ready(function() {
    var socket = io.connect('http://localhost:80/socket.io/socket.io.js');
    socket.on('new',function (data) { 
        console.log(socket.id);
        console.log(data); 
        //$('#state').html(data.status);
    });
});

但客户端什么都不做。我已经尝试了几乎一切。有人可以告诉我,我做错了,请!

总结

以上是小编为你收集整理的node.js – 使用socket.io和空消息队列向特定客户端发送消息全部内容。

如果觉得小编网站内容还不错,欢迎将小编网站推荐给好友。

关于Spring WebSocket @SendToSession:向特定会话发送消息springboot websocket消息推送的介绍已经告一段落,感谢您的耐心阅读,如果想了解更多关于io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler的实例源码、Java Spring测试多个发送消息WebSocket、node.js – Express router WebSocketServer – 在POST上发送消息、node.js – 使用socket.io和空消息队列向特定客户端发送消息的相关信息,请在本站寻找。

本文标签: