webSocket

lishihuan大约 6 分钟

webSocket

测试:http://wstool.js.org/open in new window

关于前端的优化方案

C:\Users\lihuan\Desktop\项目\websocket\springboot-websocket-demo https://github.com/dadiyang/java-stompopen in new window 讲到RabbitMQ Nginx 抽空整理一下

https://blog.csdn.net/t610654893/article/details/137963103open in new window

https://blog.51cto.com/u_15753094/5830334open in new window

Spring WebSocket +STOMP+RabbitMQ open in new window

https://blog.csdn.net/qq_35387940/article/details/108276136open in new window

open in new window

https://www.cnblogs.com/lyd447113735/p/14951601.htmlopen in new window

Spring WebSocket +STOMP+RabbitMQopen in new window C:\Users\lihuan\Desktop\项目\websocket\springboot-websocket-demo

https://www.cnblogs.com/ycnp/p/11990324.htmlopen in new window

https://blog.csdn.net/t610654893/article/details/137963103open in new window

所需jar包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

sockJS

nginx配置

说明当前请求路径下经过的nginx都需要配置 下面的 proxy_set_header 部分,否则会影响到握手的成功

upstream websocket {
	server 192.168.110.93:8215;
}
    server {
        listen       19091;
        # .....

        location /ws/ {
            proxy_pass http://websocket/busModule/endpointWs/;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_set_header Host $host;
        	proxy_set_header X-Real-IP $remote_addr;
        	proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        	proxy_set_header X-Forwarded-Proto $scheme;
        	proxy_read_timeout 36000s; #1小时未传输数据则关闭接
            
        }

}

补充:对于Apache

ProxyPass "/your-endpoint" "http://your-backend-server"
RewriteEngine On
RewriteCond %{HTTP:Upgrade} websocket [NC]
RewriteCond %{HTTP:Connection} upgrade [NC]
RewriteRule /your-endpoint/(.*) ws://your-backend-server/$1 [P,L]

异常记录

启动时报错 https://blog.csdn.net/wy8227754/article/details/86604393open in new window

Parameter 0 of method springAsyncExecutor in org.activiti.spring.boot.AbstractProcessEngineAutoConfiguration required a single bean, but 3 were found:
	- clientInboundChannelExecutor: defined by method 'clientInboundChannelExecutor' in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
	- clientOutboundChannelExecutor: defined by method 'clientOutboundChannelExecutor' in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]
	- brokerChannelExecutor: defined by method 'brokerChannelExecutor' in class path resource [org/springframework/web/socket/config/annotation/DelegatingWebSocketMessageBrokerConfiguration.class]

解决:在启动类中添加

  @Primary
    @Bean
    public TaskExecutor primaryTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        return executor;
    }

后端实现

webSocket后端代码

消息发送共2个方法 广播:convertAndSend 和点对点 convertAndSendToUser

  • this.template.convertAndSend("/topic/getMessage", "广播:消息推送.....");

前端订阅

this.subscription = this.stompClient.subscribe("/topic/getMessage", ({body}) => {
    console.log(body);
});
  • this.template.convertAndSendToUser(userId, "/getMessage", "点对点:消息推送.....");

前端订阅

 this.stompClient.subscribe('/user/' + receiverId + '/getMessage', function (message) {
 	console.log('Received: ' + message.body);
 });

说明:目前业务涉及到2部分 WebSocketConfig.javaWebSocketController.java

核心在WebSocketConfig.java中,如果没有特别的需求,直接就可以使用,如果需要自定义推送,可以通过 @MessageMapping 实现对消息的自定义,目前在 WebSocketController.java 中自定义了几个方法

WebSocketController.java 中说明:

@SendTo("/mass/getMessage") 相当于 this.template.convertAndSend("/mass/getMessage", jsonobject);

考虑到如果发送的消息需要服务器中进行业务处理,通过@MessageMapping 实现来自定义处理

    @MessageMapping("/mass/customRequest")
    @SendTo("/mass/getMessage")//SendTo 发送至 Broker 下的指定订阅路径
    public JSONObject mass(JSONObject jsonobject){
        // 后期可以再这里进行业务处理,否则只用 默认的 群发 topic/getMessage
        return jsonobject;
    }

    /**
     * 自定义 群发
     *  通过 @SendTo("/mass/getMessage") 实现发送
     *  或者通过 this.template.convertAndSend("/mass/getMessage", jsonobject); 实现发送
     * @param jsonobject
     */
    @MessageMapping("/mass/customRequest2")
    public void mass2(JSONObject jsonobject){
        this.template.convertAndSend("/mass/getMessage", jsonobject);
    }

/**
     * 点对点发送,指定用户
     var message = {
     'content': 'Hello, User!',
     'senderId': targetUser,// 发送人
     'receiverId': targetUser,// 接收人
     'messageType': targetUser,// 消息类型
     };
     订阅消息
     this.stompClient.subscribe('/user/' + targetUser + '/getMessage', function (message) {
     console.log('Received: ' + message.body);
     });
     * @param chatMessage
     * @return
     */
    @MessageMapping("/alone/customRequest")
    public void alone(ChatMessage chatMessage){
        this.template.convertAndSendToUser(chatMessage.getReceiverId()+"","/getMessage",chatMessage);
    }

注:如果需要@Scheduled (定时器)生效需要再启动类中添加 @EnableScheduling

前端实现

npm install sockjs-client
npm install stompjs

将连接部分放在混入中

sockjs混入

混入说明:

当前是springCloud服务,socket后端代码写在模块 bus-module 实际端口对应的是8215

var socket = new SockJS('http://192.168.110.130:19091/ws/');这里是通过nginx实现 nginx配置

或者直接连接bus-module模块

var socket = new SockJS('http://192.168.110.93:8215/busModule/endpointWs/')

<template>

</template>
<script>
  import { sockjsMixins } from '@/mixin/sockjs.js'
export default {
    name: 'demo1',
    components: {draggable},
    mixins: [sockjsMixins],
    data() {
      return { }
    },
    methods: {
        clientOnConnectHandle() {
		// 1.广播(无需后端写接口,直再WebSocketConfig 中配置的代理就能实现)
        // 1.1 订阅广播
        this.subscription = this.stompClient.subscribe("/topic/getMessage", ({body}) => {
          console.log(body);
        });
        // 1.2.发送广播
        this.stompClient.send("/topic/getMessage", {}, JSON.stringify({ content: '广播,通过 ‘topic/getMessage’ 订阅...' }));


        // 2. 群发
        // 2.1 群发接收
        this.subscription = this.stompClient.subscribe("/mass/getMessage", ({body}) => {
          console.log(body);
        });
        // 2.2. 群发
        this.stompClient.send("/mass/customRequest",{},JSON.stringify({ content: '自定义群发,由后端massRequest 方法接收,然后通过 ‘mass/getMessage’ 转发,前端通过 ‘mass/getMessage’ 订阅...' }));


        // 在连接成功后,订阅用户专属的目标
        this.stompClient.subscribe('/user/' + targetUser + '/getMessage', function (message) {
          console.log('Received: ' + message.body);
        });
        const targetUser ="lihuan"
        var message = {
          'content': 'Hello, User!',
          'senderId': targetUser,// 发送人
          'receiverId': targetUser,// 接收人
          'messageType': targetUser,// 消息类型
        };
        this.stompClient.send("/userCustomRequest",{},JSON.stringify(message));

        // 点对点发送
        let destination = "/user/" + targetUser + "/customRequest";
        this.stompClient.send(destination,{},JSON.stringify({ content: '测试点对点发送'}));        
      },
    },
}
</script>
    
    
    

说明:

当前java WebSocketConfig 配置中

//注册STOMP协议的节点(endpoint),并映射指定的url
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //注册一个STOMP的endpoint,并指定使用SockJS协议
        registry.addEndpoint("/busModule/endpointWs").setAllowedOrigins("*").withSockJS();
    }

    /**
     * 配置消息代理(Message Broker)
     * user:点对点
     * topic:广播
     * mass:群发
     * alone:单独聊天
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //点对点应配置一个/user消息代理,广播式应配置一个/topic消息代理,群发(mass),单独聊天(alone)
        //registry.enableSimpleBroker("/topic","/user","/mass","/alone");
        registry.enableSimpleBroker("/topic","/user","/mass");
        //点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/
        registry.setUserDestinationPrefix("/user");
    }
  • 上面 的 registry.addEndpoint("/busModule/endpointWs").setAllowedOrigins("*").withSockJS();

    • registerStompEndpoints()方法注册了一个STOMP端点,客户端将使用这个端点连接到WebSocket服务器。/busModule/endpointWs是端点的URL,(因为是微服务,这里busModule业务模块名)
    • setAllowedOrigins("*")允许任何域连接,withSockJS()启用SockJS支持。
  • configureMessageBroker()方法配置了一个简单的消息代理,客户端可以订阅来自这个消息代理的消息,也可以将消息发送到这个消息代理。

  • enableSimpleBroker("/topic","/user","/mass","/alone")启用了一个简单的内存消息代理,它支持四种类型的目标前缀/topic,/user,/mass,/alone,用于区分不同类型的消息,如广播消息、点对点消息等。

  • setUserDestinationPrefix("/user")设置了用户目标前缀,这意味着以/user开头的目标将被视为用户专属的目标,并会路由到与当前认证用户关联的唯一用户会话。

前端消息的订阅

// 订阅广播式消息
stompClient.subscribe('/topic/some-path', function(message) {
    console.log(JSON.parse(message.body).content);
});

// 订阅点对点消息
stompClient.subscribe('/user/' + userId + '/some-path', function(message) {
    console.log(JSON.parse(message.body).content);
});

// 订阅群发消息
stompClient.subscribe('/mass/some-path', function(message) {
    console.log(JSON.parse(message.body).content);
});

// 订阅单独聊天消息
stompClient.subscribe('/alone/some-path', function(message) {
    console.log(JSON.parse(message.body).content);
});

其中some-path 为实际的路径,不做控制,只要发送和接收保持一致即可,如果需要后端自定义业务处理,则 可以通过 @MessageMapping去自定义

例如:目前有个广播全发,后端自定义一个 @MessageMapping("/mass/customRequest")

this.stompClient.send("/mass/customRequest",{},JSON.stringify({ content: '自定义群发,由后端customRequest 方法接收,然后通过 ‘mass/getMessage’ 转发,前端通过 ‘mass/getMessage’ 订阅...' }));

当前的发送会经过 后端customRequest方法,后端通过 this.template.convertAndSend("/mass/getMessage", jsonobject); 定义消息发送给 /mass/getMessage 这个路径对应的是 订阅时的路径

// 订阅
this.subscription = this.stompClient.subscribe("/mass/getMessage", ({body}) => {
    console.log(body);
});

下面的这个由于后端没有配置MessageMapping,通过WebSocketConfig 配置的 /mass 代理实现

this.stompClient.send("/mass/xxxxx",{},JSON.stringify({ content: '群发,前端通过 ‘mass/xxxxx’ 订阅...' }));

this.subscription = this.stompClient.subscribe("/mass/xxxxx", ({body}) => {
    console.log(body);
});

Spring WebSocket +STOMP+RabbitMQ

docker run --name RabbitMQ -d -p 5672:5672 -p 15672:15672 -p 61613:61613 -p 15674:15674 -p 1883:1883 --hostname=RabbitMQ  rabbitmq:latest

开启stomp插件
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_web_stomp
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_web_stomp_examples
重启容器
docker restart rabbitmq

异常记录

1. websocket通道中断

场景描述:var socket = new SockJS('http://192.168.2.231:19091/ws/');

本机可以正常访问,但是服务器中直接访问http://192.168.2.231:19091/ws/也没问题 ,但是后面就会断开连接

百度出来的解释是 InetAddress.getLocalHost() 缓慢导致系统Websocket连接中断,解决办法是配置hosts文件

15:24:37.156 [http-nio-8215-exec-38] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - [log,175] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.web.socket.sockjs.SockJsException: Uncaught failure in SockJS request, uri=http://websocket/busModule/endpointWs/801/sbbtktvw/eventsource; nested exception is org.springframework.web.socket.sockjs.SockJsTransportFailureException: Failed to open session; nested exception is org.springframework.web.socket.sockjs.SockJsTransportFailureException: Failed to write SockJsFrame content='o'; nested exception is org.apache.catalina.connector.ClientAbortException: java.io.IOException: 断开的管道] with root cause
java.io.IOException: 断开的管道
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
	at org.apache.tomcat.util.net.NioChannel.write(NioChannel.java:138)
	at org.apache.tomcat.util.net.NioBlockingSelector.write(NioBlockingSelector.java:101)
	at org.apache.tomcat.util.net.NioSelectorPool.write(NioSelectorPool.java:152)
	at org.apache.tomcat.util.net.NioEndpoint$NioSocketWrapper.doWrite(NioEndpoint.java:1253)
	at org.apache.tomcat.util.net.SocketWrapperBase.doWrite(SocketWrapperBase.java:740)

在本地/etc/hosts文件中 增加本机hostname的解析记录

例如本机的ip为192.168.2.15 在hosts文件中 添加 192.168.2.15 localhost.localdomain

[root@localhost logs]# hostname
localhost.localdomain
[root@localhost logs]# vi /etc/hosts