分布式下的WebSocket解决方案是什么

发布时间:2021-10-27 11:02:51 作者:iii
来源:亿速云 阅读:140

本篇内容主要讲解“分布式下的WebSocket解决方案是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“分布式下的WebSocket解决方案是什么”吧!

WebSocket单体应用介绍

在介绍分布式集群之前,我们先来看一下王子的WebSocket代码实现,先来看java后端代码如下:

import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject;import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/webSocket/{key}") public class WebSocket {     private static int onlineCount = 0;     /**      * 存储连接的客户端      */     private static Map<String, WebSocket> clients = new ConcurrentHashMap<String, WebSocket>();     private Session session;     /**      * 发送的目标科室code      */     private String key;     @OnOpen     public void onOpen(@PathParam("key") String key, Session session) throws IOException {         this.key = key;         this.session = session;         if (!clients.containsKey(key)) {             addOnlineCount();        }        clients.put(key, this);         Log.info(key+"已连接消息服务!");     }    @OnClose     public void onClose() throws IOException {         clients.remove(key);        subOnlineCount();    }    @OnMessage     public void onMessage(String message) throws IOException {         if(message.equals("ping")){             return ;         }        JSONObject jsonTo = JSON.parseObject(message);        String mes = (String) jsonTo.get("message");         if (!jsonTo.get("to").equals("All")){             sendMessageTo(mes, jsonTo.get("to").toString());         }else{             sendMessageAll(mes);        }    }    @OnError     public void onError(Session session, Throwable error) {         error.printStackTrace();    }    private void sendMessageTo(String message, String To) throws IOException {         for (WebSocket item : clients.values()) {             if (item.key.contains(To) )                 item.session.getAsyncRemote().sendText(message);        }    }    private void sendMessageAll(String message) throws IOException {         for (WebSocket item : clients.values()) {             item.session.getAsyncRemote().sendText(message);        }    }    public static synchronized int getOnlineCount() {         return onlineCount;     }    public static synchronized void addOnlineCount() {         WebSocket.onlineCount++;    }    public static synchronized void subOnlineCount() {         WebSocket.onlineCount--;    }    public static synchronized Map<String, WebSocket> getClients() {         return clients;     }}

示例代码中并没有使用Spring,用的是原生的java web编写的,简单和大家介绍一下里面的方法。

可以看到,在onMessage方法中,我们直接根据客户端发送的消息,进行消息的转发功能,这样在单体消息服务中是没有问题的。

再来看一下js代码

var host = document.location.host;     // 获得当前登录科室    var deptCodes='${sessionScope.$UserContext.departmentID}';     deptCodes=deptCodes.replace(/[\[|\]|\s]+/g, "");     var key = '${sessionScope.$UserContext.userID}'+deptCodes;     var lockReconnect = false;  //避免ws重复连接     var ws = null;          // 判断当前浏览器是否支持WebSocket    var wsUrl = 'ws://' + host + '/webSocket/'+ key;     createWebSocket(wsUrl);   //连接ws    function createWebSocket(url) {         try{            if('WebSocket' in window){                 ws = new WebSocket(url);            }else if('MozWebSocket' in window){                   ws = new MozWebSocket(url);            }else{                   layer.alert("您的浏览器不支持websocket协议,建议使用新版谷歌、火狐等浏览器,请勿使用IE10以下浏览器,360浏览器请使用极速模式,不要使用兼容模式!");              }            initEventHandle();        }catch(e){            reconnect(url);            console.log(e);         }         }    function initEventHandle() {         ws.onclose = function () {             reconnect(wsUrl);            console.log("llws连接关闭!"+new Date().toUTCString());         };        ws.onerror = function () {             reconnect(wsUrl);            console.log("llws连接错误!");         };        ws.onopen = function () {             heartCheck.reset().start();      //心跳检测重置            console.log("llws连接成功!"+new Date().toUTCString());         };        ws.onmessage = function (event) {    //如果获取到消息,心跳检测重置             heartCheck.reset().start();      //拿到任何消息都说明当前连接是正常的//接收到消息实际业务处理  ...        };    }    // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。    window.onbeforeunload = function() {         ws.close();     }      function reconnect(url) {         if(lockReconnect) return;         lockReconnect = true;         setTimeout(function () {     //没连接上会一直重连,设置延迟避免请求过多             createWebSocket(url);            lockReconnect = false;         }, 2000);     }    //心跳检测    var heartCheck = {        timeout: 300000,        //5分钟发一次心跳         timeoutObj: null,        serverTimeoutObj: null,        reset: function(){             clearTimeout(this.timeoutObj);            clearTimeout(this.serverTimeoutObj);            return this;         },        start: function(){             var self = this;            this.timeoutObj = setTimeout(function(){                 //这里发送一个心跳,后端收到后,返回一个心跳消息,                //onmessage拿到返回的心跳就说明连接正常                ws.send("ping");                 console.log("ping!")                 self.serverTimeoutObj = setTimeout(function(){//如果超过一定时间还没重置,说明后端主动断开了                     ws.close();     //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次                 }, self.timeout)            }, this.timeout)        }}

js部分使用的是原生H5编写的,如果为了更好的兼容浏览器,也可以使用SockJS,有兴趣小伙伴们可以自行百度。

接下来我们就手动的优化代码,实现WebSocket对分布式架构的支持。

解决方案的思考

现在我们已经了解单体应用下的代码结构,也清楚了WebSocket在分布式环境下面临的问题,那么是时候思考一下如何能够解决这个问题了。

我们先来看一看发生这个问题的根本原因是什么。

简单思考一下就能明白,单体应用下只有一台服务器,所有的客户端连接的都是这一台消息服务器,所以当发布消息者发送消息时,所有的客户端其实已经全部与这台服务器建立了连接,直接群发消息就可以了。

换成分布式系统后,假如我们有两台消息服务器,那么客户端通过Nginx负载均衡后,就会有一部分连接到其中一台服务器,另一部分连接到另一台服务器,所以发布消息者发送消息时,只会发送到其中的一台服务器上,而这台消息服务器就可以执行群发操作,但问题是,另一台服务器并不知道这件事,也就无法发送消息了。

现在我们知道了根本原因是生产消息时,只有一台消息服务器能够感知到,所以我们只要让另一台消息服务器也能感知到就可以了,这样感知到之后,它就可以群发消息给连接到它上边的客户端了。

那么什么方法可以实现这种功能呢,王子很快想到了引入消息中间件,并使用它的发布订阅模式来通知所有消息服务器就可以了。

引入RabbitMQ解决分布式下的WebSocket问题

在消息中间件的选择上,王子选择了RabbitMQ,原因是它的搭建比较简单,功能也很强大,而且我们只是用到它群发消息的功能。

RabbitMQ有一个广播模式(fanout),我们使用的就是这种模式。

首先我们写一个RabbitMQ的连接类:

import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtil {     private static Connection connection;     /**      * 与rabbitmq建立连接      * @return      */     public static Connection getConnection() {         if (connection != null&&connection.isOpen()) {             return connection;         }        ConnectionFactory factory = new ConnectionFactory();         factory.setVirtualHost("/");         factory.setHost("192.168.220.110"); // 用的是虚拟IP地址         factory.setPort(5672);         factory.setUsername("guest");         factory.setPassword("guest");         try {             connection = factory.newConnection();         } catch (IOException e) {             e.printStackTrace();         } catch (TimeoutException e) {             e.printStackTrace();         }         return connection;     } }

这个类没什么说的,就是获取MQ连接的一个工厂类。

然后按照我们的思路,就是每次服务器启动的时候,都会创建一个MQ的消费者监听MQ的消息,王子这里测试使用的是Servlet的监听器,如下:

import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; public class InitListener implements ServletContextListener {     @Override     public void contextInitialized(ServletContextEvent servletContextEvent) {         WebSocket.init();    }    @Override     public void contextDestroyed(ServletContextEvent servletContextEvent) {     }}

记得要在Web.xml中配置监听器信息

<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_4_0.xsd"          version="4.0">     <listener>         <listener-class>InitListener</listener-class>     </listener> </web-app>

WebSocket中增加init方法,作为MQ消费者部分

public  static void init() {         try {            Connection connection = RabbitMQUtil.getConnection();            Channel channel = connection.createChannel();            //交换机声明(参数为:交换机名称;交换机类型)             channel.exchangeDeclare("fanoutLogs",BuiltinExchangeType.FANOUT);             //获取一个临时队列             String queueName = channel.queueDeclare().getQueue();            //队列与交换机绑定(参数为:队列名称;交换机名称;routingKey忽略)             channel.queueBind(queueName,"fanoutLogs","");             //这里重写了DefaultConsumer的handleDelivery方法,因为发送的时候对消息进行了getByte(),在这里要重新组装成String             Consumer consumer = new DefaultConsumer(channel) {                @Override                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    super.handleDelivery(consumerTag, envelope, properties, body);                     String message = new String(body,"UTF-8");                     System.out.println(message);//这里可以使用WebSocket通过消息内容发送消息给对应的客户端                 }            };            //声明队列中被消费掉的消息(参数为:队列名称;消息是否自动确认;consumer主体)             channel.basicConsume(queueName,true,consumer);             //这里不能关闭连接,调用了消费方法后,消费者会一直连接着rabbitMQ等待消费         } catch (IOException e) {            e.printStackTrace();        }    }

同时在接收到消息时,不是直接通过WebSocket发送消息给对应客户端,而是发送消息给MQ,这样如果消息服务器有多个,就都会从MQ中获得消息,之后通过获取的消息内容再使用WebSocket推送给对应的客户端就可以了。

WebSocket的onMessage方法增加内容如下:

try {             //尝试获取一个连接             Connection connection = RabbitMQUtil.getConnection();            //尝试创建一个channel             Channel channel = connection.createChannel();            //声明交换机(参数为:交换机名称; 交换机类型,广播模式)             channel.exchangeDeclare("fanoutLogs", BuiltinExchangeType.FANOUT);             //消息发布(参数为:交换机名称; routingKey,忽略。在广播模式中,生产者声明交换机的名称和类型即可)             channel.basicPublish("fanoutLogs","", null,msg.getBytes("UTF-8"));             System.out.println("发布消息");             channel.close();        } catch (IOException |TimeoutException e) {             e.printStackTrace();         }

增加后删除掉原来的Websocket推送部分代码。

到此,相信大家对“分布式下的WebSocket解决方案是什么”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. Websocket原理是什么
  2. 详解Spring Cloud微服务架构下的WebSocket解决方案

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

websocket

上一篇:SpringBoot定时任务及Cron表达式是怎样的

下一篇:Mysql数据分组排名实现的示例分析

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》