Netty中怎么整合WebSocket

发布时间:2021-06-18 18:08:57 作者:Leah
来源:亿速云 阅读:492

Netty中怎么整合WebSocket,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

WebSocket协议是基于

TCP的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端 ,它是先进行一次Http的连接,连接成功后转为TCP连接。

现在我们来做一个WebSocket HelloWorld,意思为接收一条WebSocket客户端发送过来的消息,然后刷到所有连接上的客户端,大家都可以看到这条消息。

@Slf4j@AllArgsConstructorpublic class WebSocketServer {private int port;    public void run() throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup();        EventLoopGroup worker = new NioEventLoopGroup();        try {
            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {@Override                        protected void initChannel(SocketChannel ch) throws Exception {//web基于http协议的解码器                            ch.pipeline().addLast(new HttpServerCodec());                            //对大数据流的支持                            ch.pipeline().addLast(new ChunkedWriteHandler());                            //对http message进行聚合,聚合成FullHttpRequest或FullHttpResponse                            ch.pipeline().addLast(new HttpObjectAggregator(1024 * 64));                            //websocket服务器处理对协议,用于指定给客户端连接访问的路径                            //该handler会帮你处理一些繁重的复杂的事                            //会帮你处理握手动作:handshaking(close,ping,pong) ping + pong = 心跳                            //对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));                            //添加我们的自定义channel处理器                            ch.pipeline().addLast(new WebSocketHandler());                        }
                    });            log.info("服务器启动中");            ChannelFuture future = bootstrap.bind(port).sync();            future.channel().closeFuture().sync();        } finally {
            worker.shutdownGracefully();            boss.shutdownGracefully();        }

    }
}

channel处理器

/** * TextWebSocketFrame: 在netty中,用于为websocket专门处理文本的对象,frame是消息的载体 */@Slf4jpublic class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {//用于记录和管理所有客户端的channel    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    @Override    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//获取客户端传输过来的消息        String content = msg.text();        log.info("接收到的数据" + content);        clients.stream().forEach(channel -> channel.writeAndFlush(new TextWebSocketFrame("[服务器在]" + LocalDateTime.now() + "接收到消息:" + content)
        ));        //下面的方法与上面一致//        clients.writeAndFlush(new TextWebSocketFrame("[服务器在]" + LocalDateTime.now() +//                "接收到消息:" + content));    }/**     * 当客户端连接服务端之后(打开连接)     * 获取客户端的channel,并且放到ChannelGroup中去进行管理     * @param ctx     * @throws Exception     */    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {clients.add(ctx.channel());    }@Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//当触发handlerRemoved,ChannelGroup会自动移除对应的客户端的channel        //所以下面这条语句可不写//        clients.remove(ctx.channel());        log.info("客户端断开,channel对应的长id为:" + ctx.channel().id().asLongText());        log.info("客户端断开,channel对应的短id为:" + ctx.channel().id().asShortText());    }
}

服务器启动

@SpringBootApplicationpublic class WebsocketApplication {   public static void main(String[] args) throws InterruptedException {
      SpringApplication.run(WebsocketApplication.class, args);      new WebSocketServer(10101).run();   }

}

客户端写一个html文件,代码如下

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8" />
        <title></title>
    </head>
    <body>
        <div>发送消息:</div>
        <input type="text" id="msgContent" />
        <input type="button" value="发送" onclick="CHAT.chat()" />
        
        <div>接受消息:</div>
        <div id="receiveMsg" ></div>
        
        <script type="application/javascript">
            window.CHAT = {
                socket: null,
                init: function() {
                    if (window.WebSocket) {
                        CHAT.socket = new WebSocket("ws://127.0.0.1:10101/ws");
                        CHAT.socket.onopen = function() {
                            console.log("连接建立成功");
                        },
                        CHAT.socket.onclose = function() {
                            console.log("连接关闭");
                        },
                        CHAT.socket.onerror = function() {
                            console.log("发生错误");
                        },
                        CHAT.socket.onmessage = function(e) {
                            console.log("接收到消息" + e.data);
                            var receiveMsg = document.getElementById("receiveMsg");
                            var html = receiveMsg.innerHTML;
                            receiveMsg.innerHTML = html + "<br/>" + e.data;
                        }
                    }else {
                        alert("浏览器不支持WebSocket协议...");
                    }
                },
                chat: function() {
                    var msg = document.getElementById("msgContent");
                    CHAT.socket.send(msg.value);
                }
            }
            CHAT.init();
        </script>
    </body>
</html>

打开浏览器如下

Netty中怎么整合WebSocket

此时我们输入一段话,发送

Netty中怎么整合WebSocket

Netty中怎么整合WebSocket

Netty中怎么整合WebSocket

我们可以看到所有打开的连接,都会收到相同的消息。

此时服务器的日志

2019-10-16 22:31:16.066  INFO 1376 --- [ntLoopGroup-3-5] c.g.w.netty.websocket.WebSocketHandler   : 接收到的数据helloworld
2019-10-16 22:31:33.131  INFO 1376 --- [ntLoopGroup-3-5] c.g.w.netty.websocket.WebSocketHandler   : 接收到的数据你好,中国

如果我们关闭一个页面,服务器的日志为

2019-10-16 22:36:39.390  INFO 1376 --- [ntLoopGroup-3-7] c.g.w.netty.websocket.WebSocketHandler   : 客户端断开,channel对应的长id为:acde48fffe001122-00000560-00000007-9ca78ac7e2b907ab-0b15dfcb
2019-10-16 22:36:39.390  INFO 1376 --- [ntLoopGroup-3-7] c.g.w.netty.websocket.WebSocketHandler   : 客户端断开,channel对应的短id为:0b15dfcb

现在我们来做一个点对点的聊天功能。

首先WebSocketServer添加空闲处理器

@Slf4j@AllArgsConstructorpublic class WebSocketServer {private int port;    public void run() throws InterruptedException {
        EventLoopGroup boss = new NioEventLoopGroup();        EventLoopGroup worker = new NioEventLoopGroup();        try {
            ServerBootstrap bootstrap = new ServerBootstrap();            bootstrap.group(boss,worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childOption(ChannelOption.TCP_NODELAY,true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {@Override                        protected void initChannel(SocketChannel ch) throws Exception {//web基于http协议的解码器                            ch.pipeline().addLast(new HttpServerCodec());                            //对大数据流的支持                            ch.pipeline().addLast(new ChunkedWriteHandler());                            //对http message进行聚合,聚合成FullHttpRequest或FullHttpResponse                            ch.pipeline().addLast(new HttpObjectAggregator(1024 * 64));                            //websocket服务器处理对协议,用于指定给客户端连接访问的路径                            //该handler会帮你处理一些繁重的复杂的事                            //会帮你处理握手动作:handshaking(close,ping,pong) ping + pong = 心跳                            //对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同                            ch.pipeline().addLast(new WebSocketServerProtocolHandler("/ws"));                            //针对客户端,如果1分钟时没有向服务端发送读写心跳(All),则主动断开                            //如果是读空闲或者是写空闲,不处理                            ch.pipeline().addLast(new IdleStateHandler(20,40,60));                            //添加我们的自定义channel处理器                            ch.pipeline().addLast(new WebSocketHandler());                        }
                    });            log.info("服务器启动中");            ChannelFuture future = bootstrap.bind(port).sync();            future.channel().closeFuture().sync();        } finally {
            worker.shutdownGracefully();            boss.shutdownGracefully();        }

    }
}

然后我们需要增加两个实体类——消息类和聊天类

首先添加一个消息的接口

public interface Chat {/**     * 保存消息到数据库     * @param chatMsg     */    public void save(Chat chatMsg);    /**     * 签名消息     * @param msgIdList     */    public void updateMsgSigned(List<Long> msgIdList);    /**     * 查找未读未签名消息     * @param acceptUserId     * @return     */    public List<ChatMsg> findUnReadChat(Long acceptUserId);}

添加一个消息的类来实现该接口

@ToString@Datapublic class ChatMsg implements Serializable,Chat{@JSONField(serializeUsing = ToStringSerializer.class)private Long senderId; //发送者的用户id    @JSONField(serializeUsing = ToStringSerializer.class)private Long receiverId; //接收者的用户id    private String msg;    @JSONField(serializeUsing = ToStringSerializer.class)private Long msgId; //用于消息的签收    private MsgSignFlagEnum signed; //消息签收状态    private LocalDateTime createDate;    @Override    @Transactional    public void save(Chat chatMsg) {
        ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class);        IdService idService = SpringBootUtil.getBean(IdService.class);        ((ChatMsg)chatMsg).setMsgId(idService.genId());        ((ChatMsg)chatMsg).setCreateDate(LocalDateTime.now());        chatDao.saveChat((ChatMsg) chatMsg);    }@Transactional    @Override    public void updateMsgSigned(List<Long> msgIdList) {
        ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class);        chatDao.updateMsgSigned(msgIdList);    }@Transactional    @Override    public List<ChatMsg> findUnReadChat(Long acceptUserId) {
        ChatDao chatDao = SpringBootUtil.getBean(ChatDao.class);        return chatDao.findUnReadMsg(acceptUserId);    }
}

其中MsgSignFlagEnum为一个枚举类型,代码如下

public enum MsgSignFlagEnum implements LocalisableAll {unsign(0,"未签收"),    signed(1,"已签收");    public final int type;    public final String value;    private MsgSignFlagEnum(int type,String value) {this.type = type;        this.value = value;    }@Override    public int getType() {return type;    }@Override    public String getValue() {return value;    }
}

建立一个消息的工厂

public class ChatMsgFactory {public static Chat createChatMsgService() {return new ChatMsg();    }
}

然后是聊天类

@Datapublic class DataContent implements Serializable {private Integer action; //动作类型    private ChatMsg chatMsg; //用户的聊天内容entity    private String extand; //扩展字段}

我们根据动作类型来定义一个枚举

public enum MsgActionEnum {CONNECT(1,"第一次(或重连)初始化连接"),    CHAT(2,"聊天消息"),    SIGNED(3,"消息签收"),    KEEPALIVE(4,"客户端保持心跳");    public final Integer type;    public final String content;    private MsgActionEnum(Integer type,String content) {this.type = type;        this.content = content;    }
}

在写WebSocketHandler之前,我们需要将用户Id跟Channel做一个绑定

/** * 用户id和channel的关联关系的处理 */@Slf4jpublic class UserChannelRel {private static Map<Long,Channel> manager = new HashMap<>();    public static void put(Long senderId,Channel channel) {manager.put(senderId,channel);    }public static Channel get(Long senderId) {return manager.get(senderId);    }public static void output() {manager.entrySet().stream().forEach(entry ->log.info("UserId:" + entry.getKey() + ",ChannelId:" +
                    entry.getValue().id().asLongText())
        );    }
}

最后WebSocketHandler改写如下

/** * TextWebSocketFrame: 在netty中,用于为websocket专门处理文本的对象,frame是消息的载体 */@Slf4jpublic class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {//用于记录和管理所有客户端的channel    private static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    private Chat chatMsgService = ChatMsgFactory.createChatMsgService();    @Override    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//获取客户端传输过来的消息        String content = msg.text();        log.info("content为:" + content);        Channel currentChannel = ctx.channel();        //解析传输过来的消息转成聊天对象        DataContent dataContent = JSONObject.parseObject(content,DataContent.class);        //获取聊天对象的动作        Integer action = dataContent.getAction();        if (action == MsgActionEnum.CONNECT.type) {//当websocket第一次open的时候,初始化channel,把用的channel和userId关联起来            Long senderId = dataContent.getChatMsg().getSenderId();            UserChannelRel.put(senderId,currentChannel);            //测试            users.stream().forEach(channel -> log.info(channel.id().asLongText()));            UserChannelRel.output();        }else if (action == MsgActionEnum.CHAT.type) {//聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收]            ChatMsg chatMsg = dataContent.getChatMsg();            String msgText = chatMsg.getMsg();            Long receiverId = chatMsg.getReceiverId();            Long senderId = chatMsg.getSenderId();            //保存数据库            chatMsgService.save(chatMsg);            Channel receiverChannel = UserChannelRel.get(receiverId);            if (receiverChannel == null) {//接收方离线状态,此处无需处理            }else {
                Channel findChannel = users.find(receiverChannel.id());                if (findChannel != null) {
                    findChannel.writeAndFlush(new TextWebSocketFrame(
                            JSONObject.toJSONString(chatMsg)
                    ));                }else {//接收方离线,此处无需处理                }
            }
        }else if (action == MsgActionEnum.SIGNED.type) {//签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收]            //扩展字段在signed类型的消息中,代表需要去签收的消息id,逗号间隔            String msgIdsStr = dataContent.getExtand();            log.info("extand为:" + msgIdsStr);            String[] msgIds = msgIdsStr.split(",");            List<Long> msgIdList = new ArrayList<>();            for (String mId : msgIds) {if (!StringUtils.isEmpty(mId)) {
                    msgIdList.add(Long.valueOf(mId));                }
            }log.info(msgIdList.toString());            if (!CollectionUtils.isEmpty(msgIdList)) {//批量签收                chatMsgService.updateMsgSigned(msgIdList);            }
        }else if (action == MsgActionEnum.KEEPALIVE.type) {//心跳类型的消息            log.info("收到来自channel为[" + currentChannel + "]的心跳包");        }
    }/**     * 当客户端连接服务端之后(打开连接)     * 获取客户端的channel,并且放到ChannelGroup中去进行管理     * @param ctx     * @throws Exception     */    @Override    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {users.add(ctx.channel());    }@Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//当触发handlerRemoved,ChannelGroup会自动移除对应的客户端的channel        //所以下面这条语句可不写//        clients.remove(ctx.channel());        log.info("客户端断开,channel对应的长id为:" + ctx.channel().id().asLongText());        log.info("客户端断开,channel对应的短id为:" + ctx.channel().id().asShortText());    }@Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();        ctx.channel().close();        users.remove(ctx.channel());    }@Override    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {//IdleStateEvent是一个用户事件,包含读空闲/写空闲/读写空闲        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;            if (event.state() == IdleState.READER_IDLE) {log.info("进入读空闲");            }else if (event.state() == IdleState.WRITER_IDLE) {log.info("进入写空闲");            }else if (event.state() == IdleState.ALL_IDLE) {log.info("channel关闭前,用户数量为:" + users.size());                //关闭无用的channel,以防资源浪费                ctx.channel().close();                log.info("channel关闭后,用户数量为:" + users.size());            }

        }
    }
}

此处我们可以看到有两个地方注释了//接收方离线状态,此处无需处理。所以我们需要在用户上线的时候获取未签名的消息,只需要通过Controller从数据库获取就好

@RestControllerpublic class UnReadMessageController {   private Chat chatService = ChatMsgFactory.createChatMsgService();   @SuppressWarnings("unchecked")   @PostMapping("/notification-anon/getunreadmeg")   public Result<List<ChatMsg>> getUnReadMessage(@RequestParam("receiverid") Long receiverId) {      return Result.success(chatService.findUnReadChat(receiverId));   }
}

由于我们登录用的是OAuth框架,在user模块添加

@PostMapping("/users-anon/finduser")public LoginAppUser findUserByName(@RequestParam("username") String username) {return appUserService.findByUsername(username);}

修改网关登录,登录后可以获取用户的信息,首先添加User模块的feign

@FeignClient("user-center")public interface UserClient {@PostMapping("/users-anon/finduser")
    LoginAppUser findUserByName(@RequestParam("username") String username);}
/** * 登陆、刷新token、退出 * * @author 关键 */@Slf4j@RestControllerpublic class TokenController {@Autowired    private Oauth3Client oauth3Client;    @Autowired    private UserClient userClient;    /**     * 系统登陆<br>     * 根据用户名登录<br>     * 采用oauth3密码模式获取access_token和refresh_token     *     * @param username     * @param password     * @return     */    @SuppressWarnings("unchecked")@PostMapping("/sys/login")public Result<Map> login(@RequestParam String username,@RequestParam String password) {
        Map<String, String> parameters = new HashMap<>();        parameters.put(OAuth3Utils.GRANT_TYPE, "password");        parameters.put(OAuth3Utils.CLIENT_ID, "system");        parameters.put("client_secret", "system");        parameters.put(OAuth3Utils.SCOPE, "app");//    parameters.put("username", username);        // 为了支持多类型登录,这里在username后拼装上登录类型        parameters.put("username", username + "|" + CredentialType.USERNAME.name());        parameters.put("password", password);        Map<String, Object> tokenInfo = oauth3Client.postAccessToken(parameters);        AppUser user = userClient.findUserByName(username);        tokenInfo.put("user",user);        saveLoginLog(username, "用户名密码登陆");        return Result.success(tokenInfo);    }@Autowired    private LogClient logClient;    /**     * 登陆日志     *     * @param username     */    private void saveLoginLog(String username, String remark) {log.info("{}登陆", username);        // 异步        CompletableFuture.runAsync(() -> {try {
                Log log = Log.builder().username(username).module(LogModule.LOGIN).remark(remark).createTime(new Date())
                        .build();                logClient.save(log);            } catch (Exception e) {// do nothing            }

        });    }/**     * 系统刷新refresh_token     *     * @param refresh_token     * @return     */    @PostMapping("/sys/refresh_token")public Map<String, Object> refresh_token(String refresh_token) {
        Map<String, String> parameters = new HashMap<>();        parameters.put(OAuth3Utils.GRANT_TYPE, "refresh_token");        parameters.put(OAuth3Utils.CLIENT_ID, "system");        parameters.put("client_secret", "system");        parameters.put(OAuth3Utils.SCOPE, "app");        parameters.put("refresh_token", refresh_token);        return oauth3Client.postAccessToken(parameters);    }/**     * 退出     *     * @param access_token     */    @GetMapping("/sys/logout")public void logout(String access_token, @RequestHeader(required = false, value = "Authorization") String token) {if (StringUtils.isBlank(access_token)) {if (StringUtils.isNoneBlank(token)) {
                access_token = token.substring(OAuth3AccessToken.BEARER_TYPE.length() + 1);            }
        }oauth3Client.removeToken(access_token);    }
}

做如上修改后,我们登录后可以获得用户的Id,以上省略Dao,Mapper

这个时候服务端就全部完成了,我们再来看一下客户端

先建立一个全局变量app.js

window.app = {
	
	/**
	 * 和后端的枚举对应
	 */
	CONNECT: 1,			//"第一次(或重连)初始化连接"),
	CHAT: 2,			//"聊天消息"),
	SIGNED: 3,			//"消息签收"),
	KEEPALIVE: 4,		//"客户端保持心跳");

	/**
	* 和后端的ChatMsg聊天模型对象保持一致
	* @param {Object} senderId
	* @param {Object} receiverId
	* @param {Object} msg
	* @param {Object} msgId
	*/
	ChatMsg: function(senderId,receiverId,msg,msgId) {
		this.senderId = senderId;
		this.receiverId = receiverId;
		this.msg = msg;
		this.msgId = msgId;
	},

	/**
	* 构建消息模型对象
	* @param {Object} action
	* @param {Object} chatMsg
	* @param {Object} extand
	*/
	DataContent: function(action,chatMsg,extand) {
		this.action = action;
		this.chatMsg = chatMsg;
		this.extand = extand;
	}
}

由于这里只是聊天样例,没有处理登录功能,我们现在就以1,2来代表两个用户的id

用户id为1的代码,文件名index.html

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>发送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="发送" onclick="CHAT.chat(1,2,msgContent.value,app.CHAT,null)" />
		
		<div>接受消息:</div>
		<div id="receiveMsg" ></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("连接建立成功");
							
							CHAT.chat(1,null,null,app.CONNECT,null);
							//每次连接的时候获取未读消息
							fetchUnReadMsg();
							//定时发送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("连接关闭");
						},
						CHAT.socket.onerror = function() {
							console.log("发生错误");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息签收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("浏览器不支持WebSocket协议...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(1,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=1',{
					data:{},
					dataType:'json',//服务器返回json格式数据
					type:'post',//HTTP请求类型
					timeout:10000,//超时时间设置为10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量签收未读消息
							CHAT.chat(1,null,null,app.SIGNED,ids);
						}
					}
				});
			}
		</script>
	</body>
</html>

用户id为2的代码,文件名receive.html

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>发送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="发送" onclick="CHAT.chat(2,1,msgContent.value,app.CHAT,null)" />
		
		<div>接受消息:</div>
		<div id="receiveMsg" ></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("连接建立成功");
							
							CHAT.chat(2,null,null,app.CONNECT,null);
							//每次连接的时候获取未读消息
							fetchUnReadMsg();
							//定时发送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("连接关闭");
						},
						CHAT.socket.onerror = function() {
							console.log("发生错误");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息签收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("浏览器不支持WebSocket协议...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(2,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{
					data:{},
					dataType:'json',//服务器返回json格式数据
					type:'post',//HTTP请求类型
					timeout:10000,//超时时间设置为10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量签收未读消息
							CHAT.chat(2,null,null,app.SIGNED,ids);
						}
					}
				});
		</script>
	</body>
</html>

这里都是通过Json来做数据的序列化的,之后会修改为ProtoBuffer来处理。

现在来增加发图片的功能。首先我们需要搭建好一个fastdfs服务器,具体可以参考分布式文件系统FastDFS安装配置 以及Springboot 2.0+FastDFS开发配置 。

这里只放入我们需要的Controller

@RestControllerpublic class FastDFSController {@Autowired    private FastDFSClientWrapper dfsClient;    @PostMapping("/files-anon/fdfsupload")@SuppressWarnings("unchecked")public Result<String> upload(@RequestParam("file") MultipartFile file) throws Exception {
        String imgUrl = dfsClient.uploadFile(file);        return Result.success(imgUrl);    }
}

然后修改我们的用户id为1的前端代码

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>发送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="发送" onclick="CHAT.chat(1,2,msgContent.value,app.CHAT,null)" />
		<input type="file" id="file" name="file">
		<input type="button" id="button" value="发送图片" >
		<div>接受消息:</div>
		<div id="receiveMsg" ></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript" src="js/jquery-3.3.1.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("连接建立成功");
							
							CHAT.chat(1,null,null,app.CONNECT,null);
							//每次连接的时候获取未读消息
							fetchUnReadMsg();
							//定时发送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("连接关闭");
						},
						CHAT.socket.onerror = function() {
							console.log("发生错误");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息签收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("浏览器不支持WebSocket协议...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(1,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{
					data:{},
					dataType:'json',//服务器返回json格式数据
					type:'post',//HTTP请求类型
					timeout:10000,//超时时间设置为10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量签收未读消息
							CHAT.chat(2,null,null,app.SIGNED,ids);
						}
					}
				});
			}
			$(function () {
			        $("#button").click(function () {
			            var form = new FormData();
			            form.append("file", document.getElementById("file").files[0]);
			             $.ajax({
			                 url: "http://xxx.xxx.xxx.xxx:8010/files-anon/fdfsupload",        //后台url
			                 data: form,
			                 cache: false,
			                 async: false,
			                 type: "POST",                   //类型,POST或者GET
			                 dataType: 'json',              //数据返回类型,可以是xml、json等
			                 processData: false,
			                 contentType: false,
			                 success: function (data) {      //成功,回调函数
			                     if (data.code == 200) {
			                     	console.log(data.data);
									CHAT.chat(1,2,"<img src='" + data.data + "' height='200' width='200' />",app.CHAT,null);
			                     }   
			                 }
			             });
			
			        })
			
			    })
		</script>
	</body>
</html>

意思即为使用ajax访问我们的上传文件Controller,获取上传成功后的url,将该url拼接到<img />的标签中,当称普通聊天信息发送出去即可。

用户id为2的前端代码相同。

<!DOCTYPE html>
<html>
	<head>
		<meta charset="utf-8" />
		<title></title>
	</head>
	<body>
		<div>发送消息:</div>
		<input type="text" id="msgContent" />
		<input type="button" value="发送" onclick="CHAT.chat(2,1,msgContent.value,app.CHAT,null)" />
		<input type="file" id="file" name="file">
		<input type="button" id="button" value="发送图片" >
		<div>接受消息:</div>
		<div id="receiveMsg" ></div>
		<script type="application/javascript" src="js/app.js"></script>
		<script type="application/javascript" src="js/mui.min.js"></script>
		<script type="application/javascript" src="js/jquery-3.3.1.min.js"></script>
		<script type="application/javascript">
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {
						CHAT.socket = new WebSocket("ws://127.0.0.1:9999/ws");
						CHAT.socket.onopen = function() {
							console.log("连接建立成功");
							
							CHAT.chat(2,null,null,app.CONNECT,null);
							//每次连接的时候获取未读消息
							fetchUnReadMsg();
							//定时发送心跳,30秒一次
							setInterval("CHAT.keepalive()",30000);
						},
						CHAT.socket.onclose = function() {
							console.log("连接关闭");
						},
						CHAT.socket.onerror = function() {
							console.log("发生错误");
						},
						CHAT.socket.onmessage = function(e) {
							console.log("接收到消息" + e.data);
							var receiveMsg = document.getElementById("receiveMsg");
							var html = receiveMsg.innerHTML;
							var chatMsg = JSON.parse(e.data);
							receiveMsg.innerHTML = html + "<br/>" + chatMsg.msg;
							//消息签收
							CHAT.chat(chatMsg.receiverId,null,null,app.SIGNED,String(chatMsg.msgId));
						}
					}else {
						alert("浏览器不支持WebSocket协议...");
					}
				},
				chat: function(senderId,receiverId,msg,action,extand) {
					var chatMsg = new app.ChatMsg(senderId,receiverId,msg,null);
					var dataContent = new app.DataContent(action,chatMsg,extand);
					CHAT.socket.send(JSON.stringify(dataContent));
				},
				keepalive: function() {
					CHAT.chat(2,null,null,app.KEEPALIVE,null);
					fetchUnReadMsg();
				}
			}
			CHAT.init();
			function fetchUnReadMsg() {
				mui.ajax('http://127.0.0.1:8008/notification-anon/getunreadmeg?receiverid=2',{
					data:{},
					dataType:'json',//服务器返回json格式数据
					type:'post',//HTTP请求类型
					timeout:10000,//超时时间设置为10秒;
					success:function(data){
						if (data.code == 200) {
							var contactList = data.data;
							var ids = "";
							console.log(JSON.stringify(contactList));
							var receiveMsg = document.getElementById("receiveMsg");
							for (var i = 0;i < contactList.length;i++) {
								var msgObj = contactList[i];
								var html = receiveMsg.innerHTML;
								receiveMsg.innerHTML = html + "<br/>" + msgObj.msg;
								ids = ids + msgObj.msgId + ",";
							}
							//批量签收未读消息
							CHAT.chat(2,null,null,app.SIGNED,ids);
						}
					}
				});
			}
			$(function () {
			        $("#button").click(function () {
			            var form = new FormData();
			            form.append("file", document.getElementById("file").files[0]);
			             $.ajax({
			                 url: "http://xxx.xxx.xxx.xxx:8010/files-anon/fdfsupload",        //后台url
			                 data: form,
			                 cache: false,
			                 async: false,
			                 type: "POST",                   //类型,POST或者GET
			                 dataType: 'json',              //数据返回类型,可以是xml、json等
			                 processData: false,
			                 contentType: false,
			                 success: function (data) {      //成功,回调函数
			                     if (data.code == 200) {
			                     	console.log(data.data);
									CHAT.chat(2,1,"<img src='" + data.data + "' height='200' width='200' />",app.CHAT,null);
			                     }   
			                 }
			             });
			
			        })
			
			    })
		</script>
	</body>
</html>

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。

推荐阅读:
  1. Netty4 之 简单搭建WebSocket服务
  2. WebSocket 整合 Springboot

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

websocket netty

上一篇:shell 中怎么判断文件夹或文件是否存在

下一篇:python清洗文件中数据的方法

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》