webSocket改造
webSocket改造
目前正常思路都是局限于业务级别,将webSocket相关的js代码写在一个混入,或者封装在一个js文件种,需要调用的组件就引入,这样一个组件就会创建一个新的连接。
这里考虑将webSocket服务作为系统级别的,一个web客户端只会创建一个连接(在整个应用程序中共享WebSocket连接)
- 连接只初始化一次,其他组件通过订阅消息来共享该连接。
- 连接本身由全局的服务管理,而不是通过组件来管理。
- 组件只负责自己关心的消息的订阅,连接本身保持活跃,并在应用生命周期内不再销毁。
目前的2种可行性方案:
- 采用封装vue插件的方式
- 使用Vuex管理WebSocket连接
- 创建全局 WebSocket 服务 目前验证过,能满足需求
1. 采用封装vue插件的方式
1.1. 创建WebSocket插件
首先,我们需要将现有的逻辑转换为一个Vue插件的形式。我们将创建一个名为 websocket.js 的文件,位于 src/plugins 目录下。
// src/plugins/websocket.js
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';
import { getToken } from '@/utils/auth';
import { v4 as uuidv4 } from 'uuid';
let stompClient = null;
let reconnectAttempts = 0;
let maxReconnectAttempts = 30;
let currentUser = 'user:' + uuidv4();
let isDestroy = false;
const install = function(Vue, options) {
Vue.prototype.$ws = {
initWebSocket: function() {
if (stompClient) {
console.log('WebSocket连接已创建,执行销毁...');
this.disconnect();
}
console.log('创建websocket连接...');
const socket = new SockJS(options.url, null, {
transports: ['websocket', 'xhr-polling', 'xdr-streaming', 'xdr-polling']
});
stompClient = Stomp.over(socket);
const headers = {
login: Vue.prototype.$store.state.user.loginName,
Authorization: 'Bearer ' + getToken(),
userId: currentUser
};
stompClient.debug = null;
stompClient.connect(headers, () => {
console.log('websocket连接成功...');
if (isDestroy) {
console.log('WebSocket已经销毁,无需加载...');
return;
}
reconnectAttempts = 0;
Vue.prototype.$eventBus.$emit('websocket-connected');
}, (err) => {
console.error(err);
console.error('连接异常,重新连接...');
reconnectAttempts++;
if (!isDestroy) {
Vue.prototype.$eventBus.$emit('websocket-disconnected');
}
});
},
disconnect: function() {
console.log('webSocket销毁....');
if (stompClient != null) {
stompClient.disconnect(() => {
console.log('webSocket销毁....成功...');
stompClient = null;
isDestroy = true;
});
}
},
subscribe: function(destination, callback) {
if (stompClient) {
stompClient.subscribe(destination, callback);
}
},
clientOnConnectHandle: function() {
// 在这里定义连接成功后的处理逻辑
},
clientOnDisconnectHandle: function() {
if (reconnectAttempts > maxReconnectAttempts) {
console.log('webSocket失败重连过多,取消重连,请检查服务是否正常...');
return;
}
setTimeout(() => {
this.initWebSocket();
}, 1000);
},
closeStompClient: function() {
if (stompClient) {
stompClient = null;
}
},
};
// 初始化全局事件总线
Vue.prototype.$eventBus = new Vue();
};
export default { install };
1.2. 注册插件
接下来,在 main.js 文件中注册这个插件:
import Vue from 'vue';
import App from './App.vue';
import router from './router';
import store from './store';
import websocketPlugin from './plugins/websocket'; // 引入插件
Vue.config.productionTip = false;
Vue.use(websocketPlugin, { url: process.env.WEBSOCKET_URL }); // 注册插件
new Vue({
router,
store,
render: h => h(App)
}).$mount('#app');
1.3. 在组件中使用插件
现在你可以在任何组件中通过 this.$ws 来访问WebSocket实例,并且可以通过 $eventBus 来监听WebSocket的连接状态。
例如,在一个组件中,你可以这样做:
export default {
name: 'MyComponent',
created() {
// 监听WebSocket连接状态变化
this.$eventBus.$on('websocket-connected', () => {
console.log('WebSocket connected');
this.$ws.subscribe('/topic/myTopic', message => {
console.log('Received message:', message);
});
});
this.$eventBus.$on('websocket-disconnected', () => {
console.log('WebSocket disconnected');
});
},
beforeDestroy() {
// 取消监听
this.$eventBus.$off('websocket-connected');
this.$eventBus.$off('websocket-disconnected');
},
};
这样,你就能够在一个全局范围内管理WebSocket连接,并且在多个组件之间共享同一个连接。同时,通过事件总线,你可以方便地通知各个组件WebSocket的状态变化。
2. 使用Vuex管理WebSocket连接
2.1 创建WebSocket模块
// store/modules/websocket.js
import Vue from 'vue';
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';
const state = {
stompClient: null,
isConnected: false,
};
const mutations = {
SET_STOMP_CLIENT(state, client) {
state.stompClient = client;
state.isConnected = true;
},
SET_DISCONNECTED(state) {
state.isConnected = false;
},
};
const actions = {
connect({ commit }) {
if (state.stompClient) return;
const socket = new SockJS(process.env.WEBSOCKET_URL);
const client = Stomp.over(socket);
client.connect({}, frame => {
commit('SET_STOMP_CLIENT', client);
// 触发全局事件通知所有监听者
Vue.prototype.$eventBus.$emit('websocket-connected');
}, error => {
console.error('WebSocket connection error:', error);
});
},
disconnect({ commit }) {
if (state.stompClient) {
state.stompClient.disconnect(() => {
commit('SET_DISCONNECTED');
// 触发全局事件通知所有监听者
Vue.prototype.$eventBus.$emit('websocket-disconnected');
});
}
},
subscribe({ state }, destination) {
if (state.stompClient) {
state.stompClient.subscribe(destination, message => {
// 处理消息
console.log('Received message:', message);
});
}
},
};
export default {
state,
mutations,
actions,
};
2.2 在store/index.js中引入并注册模块
import Vue from 'vue';
import Vuex from 'vuex';
import websocket from './modules/websocket';
Vue.use(Vuex);
export default new Vuex.Store({
modules: {
websocket,
},
});
2.3 使用全局事件总线来通知连接状态变化
// 创建事件总线实例
Vue.prototype.$eventBus = new Vue();
2.4 在组件中使用WebSocket
在组件中,可以通过this.$store.dispatch('websocket/connect')来触发连接,并监听全局事件总线上的事件来得知连接状态。
3. 创建全局 WebSocket 服务(目前已经测试过,能满足业务)
1. 创建全局 WebSocket 服务
你可以将 WebSocket 服务设计成一个全局单例,在应用的生命周期内保持连接。
import SockJS from "sockjs-client";
import Stomp from "stompjs";
import { getToken } from "@/utils/auth";
import store from '@/store';
class WebSocketService {
constructor() {
if (WebSocketService.instance) {
return WebSocketService.instance;
}
this.stompClient = null;
this.isConnected = false;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 30;
// 保存所有订阅的 channel 和回调函数,防止重复订阅
this.subscriptions = {};
// 确保 WebSocketService 为单例模式
WebSocketService.instance = this;
}
// 连接 WebSocket 服务
connect() {
if (this.isConnected) return; // 如果已经连接,则不重复连接
const socket = new SockJS(store.state.user.configItem.websocket_url, null, {
transports: ['websocket', 'xhr-polling', 'xdr-streaming', 'xdr-polling'],
});
this.stompClient = Stomp.over(socket);
const headers = {
login: store.state.user.loginName,
Authorization: `Bearer ${getToken()}`,
};
this.stompClient.debug = null; // 禁用调试信息
this.stompClient.connect(headers, this.onConnect.bind(this), this.onError.bind(this));
}
// 连接成功后的回调
onConnect() {
console.log('WebSocket连接成功...');
this.isConnected = true;
this.reconnectAttempts = 0;
// 重新订阅所有暂存的频道
Object.keys(this.subscriptions).forEach(subscriptionId => {
const { channel, callback } = this.subscriptions[subscriptionId];
const subscription = this.stompClient.subscribe(channel, callback);
// 更新订阅的真实 ID
this.subscriptions[subscriptionId].id = subscription.id;
});
}
// 连接失败的回调,尝试重连
onError(err) {
console.error(err);
this.reconnectAttempts++;
if (this.reconnectAttempts > this.maxReconnectAttempts) {
console.log('WebSocket重连过多,停止重连...');
return;
}
const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000); // 指数退避,最大延迟30s
setTimeout(() => this.connect(), delay);
}
// 生成唯一的订阅 ID
generateUniqueId() {
return 'subscription-' + Math.random().toString(36).substr(2, 9);
}
// 订阅频道
subscribe(channel, callback, subscriberKey) {
if (!this.isConnected) {
const subscriptionId = this.generateUniqueId();
this.subscriptions[subscriptionId] = { channel, callback, subscriberKey };
return subscriptionId;
}
// 直接订阅
const subscription = this.stompClient.subscribe(channel, callback);
const subscriptionId = this.generateUniqueId();
this.subscriptions[subscriptionId] = {
id: subscription.id,
channel,
callback,
subscriberKey,
};
return subscriptionId;
}
unsubscribe(subscriptionId) {
if (this.subscriptions[subscriptionId]) {
const { id, channel } = this.subscriptions[subscriptionId];
this.stompClient.unsubscribe(id); // 使用 Stomp 的 ID 取消订阅
delete this.subscriptions[subscriptionId]; // 从订阅记录中移除
console.log(`Unsubscribed from channel: ${channel}`);
} else {
console.log(`No subscription found for ID: ${subscriptionId}`);
}
}
// 断开连接
disconnect() {
if (this.stompClient) {
this.stompClient.disconnect(() => {
console.log('WebSocket连接已断开');
this.stompClient = null;
this.isConnected = false;
});
}
}
}
export const websocketService = new WebSocketService();
2. 组件中使用全局 WebSocket 服务
组件不需要再关心连接的创建和销毁,只有在需要的时候订阅和取消订阅对应的频道。
import { websocketService } from "@/services/WebSocketService"; // 引入全局 WebSocket 服务
export default {
data() {
return {
channel: "/topic/your-channel",
subscriptionId: null,//为当前组件生成一个唯一的订阅 ID
};
},
mounted() {
// 确保连接已初始化(如果未连接会自动连接)
websocketService.connect();
// 订阅特定频道
this.subscriptionId = websocketService.subscribe(this.channel, this.handleMessage,this.$options.name); // this.$options.name订阅者标识,使用组件名(或任意唯一字符串)
},
beforeDestroy() {
// 组件销毁时取消订阅
websocketService.unsubscribe(this.subscriptionId);
},
methods: {
handleMessage(message) {
console.log('Received message:', message);
},
// 发送消息
sendMess(targetUser = 'all', data, messageType) {
var message = {
'data': data,
keyId: 'jeId:' + this.formObj.jeId, // 记录当前是那个 联合演练id
'senderId': this.currentRole,// 发送人
'receiverId': targetUser,// 接收人
'messageType': messageType,// 消息类型
};
websocketService.stompClient.send(this.channel, {}, JSON.stringify(message));
},
},
};
关键变化说明
- 连接初始化和生命周期管理:
WebSocketService类管理连接的生命周期,应用启动时进行一次连接初始化,连接只会在需要时重连,不会因为每个组件的挂载和销毁而频繁创建和销毁连接。 - 组件订阅和取消订阅:组件只需在
mounted生命周期钩子中订阅消息,在beforeDestroy中取消订阅。WebSocket 连接是全局唯一的,不需要每个组件都管理连接和销毁。 - 重连机制:当连接丢失时,
WebSocketService会自动重连,而不需要每个组件单独实现重连逻辑。 - 全局管理:通过
WebSocketService单例,WebSocket 连接和消息订阅的管理变得集中和统一。
优势
- 真正的全局连接:WebSocket 连接只会初始化一次,所有组件共享同一个连接。没有多个组件重复创建连接的情况。
- 组件解耦:组件只关心自己需要的消息的订阅和接收,不需要关注连接的生命周期管理。
- 集中管理:通过一个全局服务管理 WebSocket 连接和重连机制,避免了重复的逻辑和冗余的操作。
4. vue-socket.io 【待验证】
vue-socket.io 是一个用于在 Vue.js 应用中集成 Socket.IO 客户端的库,它允许前端通过 WebSocket 与后端进行实时通信。Socket.IO 不仅支持标准的 WebSocket 协议,还提供了额外的功能,如自动重连、断线重试、跨域支持等,因此它是一个非常强大的工具。
关于 vue-socket.io 的全局性和后端调用
全局性:
后端调用:
- Java 后端:虽然
vue-socket.io是前端库,但你可以使用 Java 的 Socket.IO 服务器端实现(如 Socket.IO-Java 或 Netty-SocketIO)来与前端进行通信。 - 推送消息给用户:当你启动流程并创建一个待办任务时,Java 后端可以通过 Socket.IO 服务器端向特定的客户端或所有连接的客户端发送消息。这样,前端可以接收到这些消息并更新界面。
- Java 后端:虽然
实现步骤
1. 前端配置 vue-socket.io
首先,在你的 Vue 项目中安装 vue-socket.io:
npm install vue-socket.io
然后,在 main.js 中配置 vue-socket.io 作为全局插件:
import Vue from 'vue';
import App from './App.vue';
import VueSocketIO from 'vue-socket.io';
// 配置 Socket.IO 连接到后端服务器
Vue.use(new VueSocketIO({
debug: true, // 开启调试模式
connection: 'http://your-backend-server-url', // 替换为你的后端服务器地址
vuex: {
store, // 如果你使用 Vuex,可以在这里集成
actionPrefix: 'SOCKET_', // 指定前缀以便区分 Socket.IO 事件
mutationPrefix: 'SOCKET_' // 指定前缀以便区分 Socket.IO 事件
}
}));
new Vue({
router,
store,
render: h => h(App)
}).$mount('#app');
2. 后端配置 Socket.IO 服务器
假设你使用的是 Java 后端,可以选择以下几种方式来实现 Socket.IO 服务器:
使用 Socket.IO-Java(推荐)
添加依赖: 在你的
pom.xml文件中添加Socket.IO-Java的依赖:<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.19</version> </dependency>配置 Socket.IO 服务器: 创建一个简单的 Socket.IO 服务器,并监听来自前端的消息。你还可以根据需要向特定客户端发送消息。
import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.SocketIOServer; import com.corundumstudio.socketio.listener.DataListener; public class SocketIOServerMain { private static final int PORT = 3000; public static void main(String[] args) { Configuration config = new Configuration(); config.setPort(PORT); SocketIOServer server = new SocketIOServer(config); server.addEventListener("chat message", String.class, new DataListener<String>() { @Override public void onData(SocketIOClient client, String data, AckRequest ackRequest) throws Exception { System.out.println("Received message: " + data); // 广播消息给所有连接的客户端 server.getBroadcastOperations().sendEvent("chat message", data); } }); server.start(); System.out.println("Socket.IO server started on port " + PORT); // 注册关闭钩子,确保服务器在程序退出时正确关闭 Runtime.getRuntime().addShutdownHook(new Thread(() -> { server.stop(); System.out.println("Socket.IO server stopped"); })); } }推送消息给用户: 当你创建一个新的待办任务时,可以在 Java 代码中调用
server.getBroadcastOperations().sendEvent()方法来向所有连接的客户端发送消息。你还可以根据用户的会话 ID 或房间(room)来定向推送消息。// 假设你有一个方法来创建待办任务 public void createTask(String taskId, String userId) { // 创建任务的逻辑... // 推送消息给特定用户 SocketIOClient userClient = getUserClientByUserId(userId); // 获取用户的 Socket.IO 客户端 if (userClient != null) { userClient.sendEvent("new-task", taskId); } // 或者广播给所有用户 server.getBroadcastOperations().sendEvent("new-task", taskId); }
3. 前端处理推送消息
在前端组件中,你可以监听来自后端的 new-task 事件,并根据接收到的消息更新界面。
export default {
name: 'TaskComponent',
created() {
// 监听来自后端的 "new-task" 事件
this.$socket.on('new-task', (taskId) => {
console.log('New task received:', taskId);
// 更新组件状态或显示通知
this.tasks.push({ id: taskId, status: 'pending' });
});
},
data() {
return {
tasks: [],
};
},
};
优点
- 实时通信:
vue-socket.io和Socket.IO-Java提供了双向实时通信的能力,使得前后端之间的交互更加高效。 - 简单易用:
vue-socket.io作为全局插件,前端开发者可以非常方便地在任何组件中使用 Socket.IO 功能。 - 扩展性强:你可以根据业务需求灵活地扩展功能,比如支持多个房间、用户认证、消息加密等。
- 自动重连:
vue-socket.io内置了自动重连机制,确保即使在网络不稳定的情况下,连接也能自动恢复。
注意事项
跨域问题:如果你的前端和后端部署在不同的域名上,确保你已经正确配置了跨域资源共享(CORS)。
Socket.IO支持跨域连接,但你需要在服务器端显式允许跨域请求。安全性:在生产环境中,建议使用 HTTPS 和 WSS(WebSocket Secure)来确保通信的安全性。你还可以通过 JWT 或其他方式对连接进行身份验证,防止未授权的客户端连接到你的 WebSocket 服务器。
性能优化:对于大规模应用,考虑使用负载均衡和集群化来提高 WebSocket 服务器的性能和可靠性。
Socket.IO支持 Redis 等消息队列,可以帮助你在多个服务器实例之间同步消息。
总结
vue-socket.io 是一个非常适合 Vue.js 应用的 WebSocket 解决方案,它可以与 Java 后端无缝集成。通过 Socket.IO-Java 或 Netty-SocketIO,你可以在 Java 后端轻松实现 WebSocket 服务器,并根据业务需求向前端推送实时消息。这种方式不仅能够满足你当前的需求(如推送待办任务),还为未来的扩展提供了灵活性和可维护性。