webSocket
webSocket
C:\Users\lihuan\Desktop\项目\websocket\springboot-websocket-demo https://github.com/dadiyang/java-stomp 讲到RabbitMQ Nginx 抽空整理一下
https://blog.csdn.net/t610654893/article/details/137963103
https://blog.51cto.com/u_15753094/5830334
Spring WebSocket +STOMP+RabbitMQ
https://blog.csdn.net/qq_35387940/article/details/108276136
https://www.cnblogs.com/lyd447113735/p/14951601.html
Spring WebSocket +STOMP+RabbitMQ C:\Users\lihuan\Desktop\项目\websocket\springboot-websocket-demo
https://www.cnblogs.com/ycnp/p/11990324.html
https://blog.csdn.net/t610654893/article/details/137963103
所需jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
sockJS
nginx配置
说明当前请求路径下经过的nginx都需要配置 下面的 proxy_set_header 部分,否则会影响到握手的成功
upstream websocket {
server 192.168.110.93:8215;
}
server {
listen 19091;
# .....
location /ws/ {
proxy_pass http://websocket/busModule/endpointWs/;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 36000s; #1小时未传输数据则关闭接
}
}
补充:对于Apache
ProxyPass "/your-endpoint" "http://your-backend-server"
RewriteEngine On
RewriteCond %{HTTP:Upgrade} websocket [NC]
RewriteCond %{HTTP:Connection} upgrade [NC]
RewriteRule /your-endpoint/(.*) ws://your-backend-server/$1 [P,L]
异常记录
启动时报错 https://blog.csdn.net/wy8227754/article/details/86604393
Parameter 0 of method springAsyncExecutor in org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration required a single bean, but 3 were found:
- clientInboundChannelExecutor: defined by method 'clientInboundChannelExecutor' in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
- clientOutboundChannelExecutor: defined by method 'clientOutboundChannelExecutor' in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
- brokerChannelExecutor: defined by method 'brokerChannelExecutor' in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
解决:在启动类中添加
@Primary
@Bean
public TaskExecutor primaryTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
return executor;
}
后端实现
消息发送共2个方法 广播:convertAndSend 和点对点 convertAndSendToUser
this.template.convertAndSend("/topic/getMessage", "广播:消息推送.....");
前端订阅
this.subscription = this.stompClient.subscribe("/topic/getMessage", ({body}) => {
console.log(body);
});
this.template.convertAndSendToUser(userId, "/getMessage", "点对点:消息推送.....");
前端订阅
this.stompClient.subscribe('/user/' + receiverId + '/getMessage', function (message) {
console.log('Received: ' + message.body);
});
说明:目前业务涉及到2部分
WebSocketConfig.java和WebSocketController.java核心在
WebSocketConfig.java中,如果没有特别的需求,直接就可以使用,如果需要自定义推送,可以通过@MessageMapping实现对消息的自定义,目前在WebSocketController.java中自定义了几个方法
WebSocketController.java 中说明:
@SendTo("/mass/getMessage") 相当于 this.template.convertAndSend("/mass/getMessage", jsonobject);
考虑到如果发送的消息需要服务器中进行业务处理,通过@MessageMapping 实现来自定义处理
@MessageMapping("/mass/customRequest")
@SendTo("/mass/getMessage")//SendTo 发送至 Broker 下的指定订阅路径
public JSONObject mass(JSONObject jsonobject){
// 后期可以再这里进行业务处理,否则只用 默认的 群发 topic/getMessage
return jsonobject;
}
/**
* 自定义 群发
* 通过 @SendTo("/mass/getMessage") 实现发送
* 或者通过 this.template.convertAndSend("/mass/getMessage", jsonobject); 实现发送
* @param jsonobject
*/
@MessageMapping("/mass/customRequest2")
public void mass2(JSONObject jsonobject){
this.template.convertAndSend("/mass/getMessage", jsonobject);
}
/**
* 点对点发送,指定用户
var message = {
'content': 'Hello, User!',
'senderId': targetUser,// 发送人
'receiverId': targetUser,// 接收人
'messageType': targetUser,// 消息类型
};
订阅消息
this.stompClient.subscribe('/user/' + targetUser + '/getMessage', function (message) {
console.log('Received: ' + message.body);
});
* @param chatMessage
* @return
*/
@MessageMapping("/alone/customRequest")
public void alone(ChatMessage chatMessage){
this.template.convertAndSendToUser(chatMessage.getReceiverId()+"","/getMessage",chatMessage);
}
注:如果需要@Scheduled (定时器)生效需要再启动类中添加 @EnableScheduling
前端实现
npm install sockjs-client
npm install stompjs
将连接部分放在混入中
混入说明:
当前是springCloud服务,socket后端代码写在模块
bus-module实际端口对应的是8215
var socket = new SockJS('http://192.168.110.130:19091/ws/');这里是通过nginx实现 nginx配置
或者直接连接bus-module模块
var socket = new SockJS('http://192.168.110.93:8215/busModule/endpointWs/')
<template>
</template>
<script>
import { sockjsMixins } from '@/mixin/sockjs.js'
export default {
name: 'demo1',
components: {draggable},
mixins: [sockjsMixins],
data() {
return { }
},
methods: {
clientOnConnectHandle() {
// 1.广播(无需后端写接口,直再WebSocketConfig 中配置的代理就能实现)
// 1.1 订阅广播
this.subscription = this.stompClient.subscribe("/topic/getMessage", ({body}) => {
console.log(body);
});
// 1.2.发送广播
this.stompClient.send("/topic/getMessage", {}, JSON.stringify({ content: '广播,通过 ‘topic/getMessage’ 订阅...' }));
// 2. 群发
// 2.1 群发接收
this.subscription = this.stompClient.subscribe("/mass/getMessage", ({body}) => {
console.log(body);
});
// 2.2. 群发
this.stompClient.send("/mass/customRequest",{},JSON.stringify({ content: '自定义群发,由后端massRequest 方法接收,然后通过 ‘mass/getMessage’ 转发,前端通过 ‘mass/getMessage’ 订阅...' }));
// 在连接成功后,订阅用户专属的目标
this.stompClient.subscribe('/user/' + targetUser + '/getMessage', function (message) {
console.log('Received: ' + message.body);
});
const targetUser ="lihuan"
var message = {
'content': 'Hello, User!',
'senderId': targetUser,// 发送人
'receiverId': targetUser,// 接收人
'messageType': targetUser,// 消息类型
};
this.stompClient.send("/userCustomRequest",{},JSON.stringify(message));
// 点对点发送
let destination = "/user/" + targetUser + "/customRequest";
this.stompClient.send(destination,{},JSON.stringify({ content: '测试点对点发送'}));
},
},
}
</script>
说明:
当前java WebSocketConfig 配置中
//注册STOMP协议的节点(endpoint),并映射指定的url
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//注册一个STOMP的endpoint,并指定使用SockJS协议
registry.addEndpoint("/busModule/endpointWs").setAllowedOrigins("*").withSockJS();
}
/**
* 配置消息代理(Message Broker)
* user:点对点
* topic:广播
* mass:群发
* alone:单独聊天
* @param registry
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理,群发(mass),单独聊天(alone)
//registry.enableSimpleBroker("/topic","/user","/mass","/alone");
registry.enableSimpleBroker("/topic","/user","/mass");
//点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
registry.setUserDestinationPrefix("/user");
}
上面 的
registry.addEndpoint("/busModule/endpointWs").setAllowedOrigins("*").withSockJS();registerStompEndpoints()方法注册了一个STOMP端点,客户端将使用这个端点连接到WebSocket服务器。/busModule/endpointWs是端点的URL,(因为是微服务,这里busModule业务模块名)setAllowedOrigins("*")允许任何域连接,withSockJS()启用SockJS支持。
configureMessageBroker()方法配置了一个简单的消息代理,客户端可以订阅来自这个消息代理的消息,也可以将消息发送到这个消息代理。enableSimpleBroker("/topic","/user","/mass","/alone")启用了一个简单的内存消息代理,它支持四种类型的目标前缀/topic,/user,/mass,/alone,用于区分不同类型的消息,如广播消息、点对点消息等。setUserDestinationPrefix("/user")设置了用户目标前缀,这意味着以/user开头的目标将被视为用户专属的目标,并会路由到与当前认证用户关联的唯一用户会话。
前端消息的订阅
// 订阅广播式消息
stompClient.subscribe('/topic/some-path', function(message) {
console.log(JSON.parse(message.body).content);
});
// 订阅点对点消息
stompClient.subscribe('/user/' + userId + '/some-path', function(message) {
console.log(JSON.parse(message.body).content);
});
// 订阅群发消息
stompClient.subscribe('/mass/some-path', function(message) {
console.log(JSON.parse(message.body).content);
});
// 订阅单独聊天消息
stompClient.subscribe('/alone/some-path', function(message) {
console.log(JSON.parse(message.body).content);
});
其中
some-path为实际的路径,不做控制,只要发送和接收保持一致即可,如果需要后端自定义业务处理,则 可以通过 @MessageMapping去自定义
例如:目前有个广播全发,后端自定义一个 @MessageMapping("/mass/customRequest")
this.stompClient.send("/mass/customRequest",{},JSON.stringify({ content: '自定义群发,由后端customRequest 方法接收,然后通过 ‘mass/getMessage’ 转发,前端通过 ‘mass/getMessage’ 订阅...' }));
当前的发送会经过 后端customRequest方法,后端通过 this.template.convertAndSend("/mass/getMessage", jsonobject); 定义消息发送给 /mass/getMessage 这个路径对应的是 订阅时的路径
// 订阅
this.subscription = this.stompClient.subscribe("/mass/getMessage", ({body}) => {
console.log(body);
});
下面的这个由于后端没有配置MessageMapping,通过WebSocketConfig 配置的 /mass 代理实现
this.stompClient.send("/mass/xxxxx",{},JSON.stringify({ content: '群发,前端通过 ‘mass/xxxxx’ 订阅...' }));
this.subscription = this.stompClient.subscribe("/mass/xxxxx", ({body}) => {
console.log(body);
});
Spring WebSocket +STOMP+RabbitMQ
docker run --name RabbitMQ -d -p 5672:5672 -p 15672:15672 -p 61613:61613 -p 15674:15674 -p 1883:1883 --hostname=RabbitMQ rabbitmq:latest
开启stomp插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_web_stomp
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_web_stomp_examples
重启容器
docker restart rabbitmq
异常记录
1. websocket通道中断
场景描述:
var socket = new SockJS('http://192.168.2.231:19091/ws/');本机可以正常访问,但是服务器中直接访问
http://192.168.2.231:19091/ws/也没问题 ,但是后面就会断开连接百度出来的解释是 InetAddress.getLocalHost() 缓慢导致系统Websocket连接中断,解决办法是配置hosts文件
15:24:37.156 [http-nio-8215-exec-38] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - [log,175] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.web.socket.sockjs.SockJsException: Uncaught failure in SockJS request, uri=http://websocket/busModule/endpointWs/801/sbbtktvw/eventsource; nested exception is org.springframework.web.socket.sockjs.SockJsTransportFailureException: Failed to open session; nested exception is org.springframework.web.socket.sockjs.SockJsTransportFailureException: Failed to write SockJsFrame content='o'; nested exception is org.apache.catalina.connector.ClientAbortException: java.io.IOException: 断开的管道] with root cause
java.io.IOException: 断开的管道
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138)
at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101)
at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152)
at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253)
at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740)
在本地/etc/hosts文件中 增加本机hostname的解析记录
例如本机的ip为192.168.2.15 在hosts文件中 添加
192.168.2.15 localhost.localdomain
[root@localhost logs]# hostname
localhost.localdomain
[root@localhost logs]# vi /etc/hosts