webSocket改造

lishihuan大约 11 分钟

webSocket改造

目前正常思路都是局限于业务级别,将webSocket相关的js代码写在一个混入,或者封装在一个js文件种,需要调用的组件就引入,这样一个组件就会创建一个新的连接。

这里考虑将webSocket服务作为系统级别的,一个web客户端只会创建一个连接(在整个应用程序中共享WebSocket连接)

  1. 连接只初始化一次,其他组件通过订阅消息来共享该连接。
  2. 连接本身由全局的服务管理,而不是通过组件来管理。
  3. 组件只负责自己关心的消息的订阅,连接本身保持活跃,并在应用生命周期内不再销毁。

目前的2种可行性方案:

  • 采用封装vue插件的方式
  • 使用Vuex管理WebSocket连接
  • 创建全局 WebSocket 服务 目前验证过,能满足需求

1. 采用封装vue插件的方式

1.1. 创建WebSocket插件

首先,我们需要将现有的逻辑转换为一个Vue插件的形式。我们将创建一个名为 websocket.js 的文件,位于 src/plugins 目录下。

// src/plugins/websocket.js
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';
import { getToken } from '@/utils/auth';
import { v4 as uuidv4 } from 'uuid';

let stompClient = null;
let reconnectAttempts = 0;
let maxReconnectAttempts = 30;
let currentUser = 'user:' + uuidv4();
let isDestroy = false;

const install = function(Vue, options) {
  Vue.prototype.$ws = {
    initWebSocket: function() {
      if (stompClient) {
        console.log('WebSocket连接已创建,执行销毁...');
        this.disconnect();
      }
      console.log('创建websocket连接...');
      const socket = new SockJS(options.url, null, {
        transports: ['websocket', 'xhr-polling', 'xdr-streaming', 'xdr-polling']
      });
      stompClient = Stomp.over(socket);
      const headers = {
        login: Vue.prototype.$store.state.user.loginName,
        Authorization: 'Bearer ' + getToken(),
        userId: currentUser
      };
      stompClient.debug = null;
      stompClient.connect(headers, () => {
        console.log('websocket连接成功...');
        if (isDestroy) {
          console.log('WebSocket已经销毁,无需加载...');
          return;
        }
        reconnectAttempts = 0;
        Vue.prototype.$eventBus.$emit('websocket-connected');
      }, (err) => {
        console.error(err);
        console.error('连接异常,重新连接...');
        reconnectAttempts++;
        if (!isDestroy) {
          Vue.prototype.$eventBus.$emit('websocket-disconnected');
        }
      });
    },
    disconnect: function() {
      console.log('webSocket销毁....');
      if (stompClient != null) {
        stompClient.disconnect(() => {
          console.log('webSocket销毁....成功...');
          stompClient = null;
          isDestroy = true;
        });
      }
    },
    subscribe: function(destination, callback) {
      if (stompClient) {
        stompClient.subscribe(destination, callback);
      }
    },
    clientOnConnectHandle: function() {
      // 在这里定义连接成功后的处理逻辑
    },
    clientOnDisconnectHandle: function() {
      if (reconnectAttempts > maxReconnectAttempts) {
        console.log('webSocket失败重连过多,取消重连,请检查服务是否正常...');
        return;
      }
      setTimeout(() => {
        this.initWebSocket();
      }, 1000);
    },
    closeStompClient: function() {
      if (stompClient) {
        stompClient = null;
      }
    },
  };

  // 初始化全局事件总线
  Vue.prototype.$eventBus = new Vue();
};

export default { install };

1.2. 注册插件

接下来,在 main.js 文件中注册这个插件:

import Vue from 'vue';
import App from './App.vue';
import router from './router';
import store from './store';
import websocketPlugin from './plugins/websocket'; // 引入插件

Vue.config.productionTip = false;

Vue.use(websocketPlugin, { url: process.env.WEBSOCKET_URL }); // 注册插件

new Vue({
  router,
  store,
  render: h => h(App)
}).$mount('#app');

1.3. 在组件中使用插件

现在你可以在任何组件中通过 this.$ws 来访问WebSocket实例,并且可以通过 $eventBus 来监听WebSocket的连接状态。

例如,在一个组件中,你可以这样做:

export default {
  name: 'MyComponent',
  created() {
    // 监听WebSocket连接状态变化
    this.$eventBus.$on('websocket-connected', () => {
      console.log('WebSocket connected');
      this.$ws.subscribe('/topic/myTopic', message => {
        console.log('Received message:', message);
      });
    });

    this.$eventBus.$on('websocket-disconnected', () => {
      console.log('WebSocket disconnected');
    });
  },
  beforeDestroy() {
    // 取消监听
    this.$eventBus.$off('websocket-connected');
    this.$eventBus.$off('websocket-disconnected');
  },
};

这样,你就能够在一个全局范围内管理WebSocket连接,并且在多个组件之间共享同一个连接。同时,通过事件总线,你可以方便地通知各个组件WebSocket的状态变化。

2. 使用Vuex管理WebSocket连接

2.1 创建WebSocket模块

// store/modules/websocket.js
import Vue from 'vue';
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';

const state = {
  stompClient: null,
  isConnected: false,
};

const mutations = {
  SET_STOMP_CLIENT(state, client) {
    state.stompClient = client;
    state.isConnected = true;
  },
  SET_DISCONNECTED(state) {
    state.isConnected = false;
  },
};

const actions = {
  connect({ commit }) {
    if (state.stompClient) return;

    const socket = new SockJS(process.env.WEBSOCKET_URL);
    const client = Stomp.over(socket);

    client.connect({}, frame => {
      commit('SET_STOMP_CLIENT', client);
      // 触发全局事件通知所有监听者
      Vue.prototype.$eventBus.$emit('websocket-connected');
    }, error => {
      console.error('WebSocket connection error:', error);
    });
  },
  disconnect({ commit }) {
    if (state.stompClient) {
      state.stompClient.disconnect(() => {
        commit('SET_DISCONNECTED');
        // 触发全局事件通知所有监听者
        Vue.prototype.$eventBus.$emit('websocket-disconnected');
      });
    }
  },
  subscribe({ state }, destination) {
    if (state.stompClient) {
      state.stompClient.subscribe(destination, message => {
        // 处理消息
        console.log('Received message:', message);
      });
    }
  },
};

export default {
  state,
  mutations,
  actions,
};

2.2 在store/index.js中引入并注册模块

import Vue from 'vue';
import Vuex from 'vuex';
import websocket from './modules/websocket';

Vue.use(Vuex);

export default new Vuex.Store({
  modules: {
    websocket,
  },
});

2.3 使用全局事件总线来通知连接状态变化

// 创建事件总线实例
Vue.prototype.$eventBus = new Vue();

2.4 在组件中使用WebSocket

在组件中,可以通过this.$store.dispatch('websocket/connect')来触发连接,并监听全局事件总线上的事件来得知连接状态。

3. 创建全局 WebSocket 服务(目前已经测试过,能满足业务)

1. 创建全局 WebSocket 服务

你可以将 WebSocket 服务设计成一个全局单例,在应用的生命周期内保持连接。

import SockJS from "sockjs-client";
import Stomp from "stompjs";
import { getToken } from "@/utils/auth";
import store from '@/store';

class WebSocketService {
  constructor() {
    if (WebSocketService.instance) {
      return WebSocketService.instance;
    }

    this.stompClient = null;
    this.isConnected = false;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 30;

    // 保存所有订阅的 channel 和回调函数,防止重复订阅
    this.subscriptions = {};

    // 确保 WebSocketService 为单例模式
    WebSocketService.instance = this;
  }

  // 连接 WebSocket 服务
  connect() {
    if (this.isConnected) return; // 如果已经连接,则不重复连接

    const socket = new SockJS(store.state.user.configItem.websocket_url, null, {
      transports: ['websocket', 'xhr-polling', 'xdr-streaming', 'xdr-polling'],
    });

    this.stompClient = Stomp.over(socket);
    const headers = {
      login: store.state.user.loginName,
      Authorization: `Bearer ${getToken()}`,
    };

    this.stompClient.debug = null; // 禁用调试信息
    this.stompClient.connect(headers, this.onConnect.bind(this), this.onError.bind(this));
  }

  // 连接成功后的回调
  onConnect() {
    console.log('WebSocket连接成功...');
    this.isConnected = true;
    this.reconnectAttempts = 0;

    // 重新订阅所有暂存的频道
    Object.keys(this.subscriptions).forEach(subscriptionId => {
      const { channel, callback } = this.subscriptions[subscriptionId];
      const subscription = this.stompClient.subscribe(channel, callback);
      // 更新订阅的真实 ID
      this.subscriptions[subscriptionId].id = subscription.id;
    });
  }

  // 连接失败的回调,尝试重连
  onError(err) {
    console.error(err);
    this.reconnectAttempts++;
    if (this.reconnectAttempts > this.maxReconnectAttempts) {
      console.log('WebSocket重连过多,停止重连...');
      return;
    }
    const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000); // 指数退避,最大延迟30s
    setTimeout(() => this.connect(), delay);
  }


  // 生成唯一的订阅 ID
  generateUniqueId() {
    return 'subscription-' + Math.random().toString(36).substr(2, 9);
  }

  // 订阅频道
  subscribe(channel, callback, subscriberKey) {
    if (!this.isConnected) {
      const subscriptionId = this.generateUniqueId();
      this.subscriptions[subscriptionId] = { channel, callback, subscriberKey };
      return subscriptionId;
    }

    // 直接订阅
    const subscription = this.stompClient.subscribe(channel, callback);
    const subscriptionId = this.generateUniqueId();
    this.subscriptions[subscriptionId] = {
      id: subscription.id,
      channel,
      callback,
      subscriberKey,
    };

    return subscriptionId;
  }

  unsubscribe(subscriptionId) {
    if (this.subscriptions[subscriptionId]) {
      const { id, channel } = this.subscriptions[subscriptionId];
      this.stompClient.unsubscribe(id); // 使用 Stomp 的 ID 取消订阅
      delete this.subscriptions[subscriptionId]; // 从订阅记录中移除
      console.log(`Unsubscribed from channel: ${channel}`);
    } else {
      console.log(`No subscription found for ID: ${subscriptionId}`);
    }
  }



  // 断开连接
  disconnect() {
    if (this.stompClient) {
      this.stompClient.disconnect(() => {
        console.log('WebSocket连接已断开');
        this.stompClient = null;
        this.isConnected = false;
      });
    }
  }
}

export const websocketService = new WebSocketService();

2. 组件中使用全局 WebSocket 服务

组件不需要再关心连接的创建和销毁,只有在需要的时候订阅和取消订阅对应的频道。

import { websocketService } from "@/services/WebSocketService";  // 引入全局 WebSocket 服务

export default {
  data() {
    return {
      channel: "/topic/your-channel",
      subscriptionId: null,//为当前组件生成一个唯一的订阅 ID
    };
  },
  mounted() {
    // 确保连接已初始化(如果未连接会自动连接)
    websocketService.connect();

    // 订阅特定频道
    this.subscriptionId = websocketService.subscribe(this.channel, this.handleMessage,this.$options.name);   // this.$options.name订阅者标识,使用组件名(或任意唯一字符串)
  },
  beforeDestroy() {
    // 组件销毁时取消订阅
    websocketService.unsubscribe(this.subscriptionId);
  },
  methods: {
    handleMessage(message) {
      console.log('Received message:', message);
    },
    // 发送消息
    sendMess(targetUser = 'all', data, messageType) {
      var message = {
        'data': data,
        keyId: 'jeId:' + this.formObj.jeId, // 记录当前是那个 联合演练id
        'senderId': this.currentRole,// 发送人
        'receiverId': targetUser,// 接收人
        'messageType': messageType,// 消息类型
      };
      websocketService.stompClient.send(this.channel, {}, JSON.stringify(message));
    },
  },
};

关键变化说明

  1. 连接初始化和生命周期管理WebSocketService 类管理连接的生命周期,应用启动时进行一次连接初始化,连接只会在需要时重连,不会因为每个组件的挂载和销毁而频繁创建和销毁连接。
  2. 组件订阅和取消订阅:组件只需在 mounted 生命周期钩子中订阅消息,在 beforeDestroy 中取消订阅。WebSocket 连接是全局唯一的,不需要每个组件都管理连接和销毁。
  3. 重连机制:当连接丢失时,WebSocketService 会自动重连,而不需要每个组件单独实现重连逻辑。
  4. 全局管理:通过 WebSocketService 单例,WebSocket 连接和消息订阅的管理变得集中和统一。

优势

  • 真正的全局连接:WebSocket 连接只会初始化一次,所有组件共享同一个连接。没有多个组件重复创建连接的情况。
  • 组件解耦:组件只关心自己需要的消息的订阅和接收,不需要关注连接的生命周期管理。
  • 集中管理:通过一个全局服务管理 WebSocket 连接和重连机制,避免了重复的逻辑和冗余的操作。

4. vue-socket.ioopen in new window 【待验证】

vue-socket.io 是一个用于在 Vue.js 应用中集成 Socket.IOopen in new window 客户端的库,它允许前端通过 WebSocket 与后端进行实时通信。Socket.IOopen in new window 不仅支持标准的 WebSocket 协议,还提供了额外的功能,如自动重连、断线重试、跨域支持等,因此它是一个非常强大的工具。

关于 vue-socket.io 的全局性和后端调用

  1. 全局性

  2. 后端调用

实现步骤

1. 前端配置 vue-socket.io

首先,在你的 Vue 项目中安装 vue-socket.io

npm install vue-socket.io

然后,在 main.js 中配置 vue-socket.io 作为全局插件:

import Vue from 'vue';
import App from './App.vue';
import VueSocketIO from 'vue-socket.io';

// 配置 Socket.IO 连接到后端服务器
Vue.use(new VueSocketIO({
  debug: true, // 开启调试模式
  connection: 'http://your-backend-server-url', // 替换为你的后端服务器地址
  vuex: {
    store, // 如果你使用 Vuex,可以在这里集成
    actionPrefix: 'SOCKET_', // 指定前缀以便区分 Socket.IO 事件
    mutationPrefix: 'SOCKET_' // 指定前缀以便区分 Socket.IO 事件
  }
}));

new Vue({
  router,
  store,
  render: h => h(App)
}).$mount('#app');

2. 后端配置 Socket.IOopen in new window 服务器

假设你使用的是 Java 后端,可以选择以下几种方式来实现 Socket.IOopen in new window 服务器:

使用 Socket.IO-Java(推荐)
  1. 添加依赖: 在你的 pom.xml 文件中添加 Socket.IO-Java 的依赖:

    <dependency>
      <groupId>com.corundumstudio.socketio</groupId>
      <artifactId>netty-socketio</artifactId>
      <version>1.7.19</version>
    </dependency>
    
  2. 配置 Socket.IOopen in new window 服务器: 创建一个简单的 Socket.IOopen in new window 服务器,并监听来自前端的消息。你还可以根据需要向特定客户端发送消息。

    import com.corundumstudio.socketio.Configuration;
    import com.corundumstudio.socketio.SocketIOServer;
    import com.corundumstudio.socketio.listener.DataListener;
    
    public class SocketIOServerMain {
    
        private static final int PORT = 3000;
    
        public static void main(String[] args) {
            Configuration config = new Configuration();
            config.setPort(PORT);
    
            SocketIOServer server = new SocketIOServer(config);
            server.addEventListener("chat message", String.class, new DataListener<String>() {
                @Override
                public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws Exception {
                    System.out.println("Received message: " + data);
                    // 广播消息给所有连接的客户端
                    server.getBroadcastOperations().sendEvent("chat message", data);
                }
            });
    
            server.start();
            System.out.println("Socket.IO server started on port " + PORT);
    
            // 注册关闭钩子,确保服务器在程序退出时正确关闭
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                server.stop();
                System.out.println("Socket.IO server stopped");
            }));
        }
    }
    
  3. 推送消息给用户: 当你创建一个新的待办任务时,可以在 Java 代码中调用 server.getBroadcastOperations().sendEvent() 方法来向所有连接的客户端发送消息。你还可以根据用户的会话 ID 或房间(room)来定向推送消息。

    // 假设你有一个方法来创建待办任务
    public void createTask(String taskId, String userId) {
        // 创建任务的逻辑...
    
        // 推送消息给特定用户
        SocketIOClient userClient = getUserClientByUserId(userId); // 获取用户的 Socket.IO 客户端
        if (userClient != null) {
            userClient.sendEvent("new-task", taskId);
        }
    
        // 或者广播给所有用户
        server.getBroadcastOperations().sendEvent("new-task", taskId);
    }
    

3. 前端处理推送消息

在前端组件中,你可以监听来自后端的 new-task 事件,并根据接收到的消息更新界面。

export default {
  name: 'TaskComponent',
  created() {
    // 监听来自后端的 "new-task" 事件
    this.$socket.on('new-task', (taskId) => {
      console.log('New task received:', taskId);
      // 更新组件状态或显示通知
      this.tasks.push({ id: taskId, status: 'pending' });
    });
  },
  data() {
    return {
      tasks: [],
    };
  },
};

优点

  1. 实时通信vue-socket.ioSocket.IO-Java 提供了双向实时通信的能力,使得前后端之间的交互更加高效。
  2. 简单易用vue-socket.io 作为全局插件,前端开发者可以非常方便地在任何组件中使用 Socket.IOopen in new window 功能。
  3. 扩展性强:你可以根据业务需求灵活地扩展功能,比如支持多个房间、用户认证、消息加密等。
  4. 自动重连vue-socket.io 内置了自动重连机制,确保即使在网络不稳定的情况下,连接也能自动恢复。

注意事项

  1. 跨域问题:如果你的前端和后端部署在不同的域名上,确保你已经正确配置了跨域资源共享(CORS)。Socket.IO 支持跨域连接,但你需要在服务器端显式允许跨域请求。

  2. 安全性:在生产环境中,建议使用 HTTPS 和 WSS(WebSocket Secure)来确保通信的安全性。你还可以通过 JWT 或其他方式对连接进行身份验证,防止未授权的客户端连接到你的 WebSocket 服务器。

  3. 性能优化:对于大规模应用,考虑使用负载均衡和集群化来提高 WebSocket 服务器的性能和可靠性。Socket.IO 支持 Redis 等消息队列,可以帮助你在多个服务器实例之间同步消息。

总结

vue-socket.io 是一个非常适合 Vue.js 应用的 WebSocket 解决方案,它可以与 Java 后端无缝集成。通过 Socket.IO-JavaNetty-SocketIO,你可以在 Java 后端轻松实现 WebSocket 服务器,并根据业务需求向前端推送实时消息。这种方式不仅能够满足你当前的需求(如推送待办任务),还为未来的扩展提供了灵活性和可维护性。