您好,登录后才能下订单哦!
# SpringBoot中怎么利用WebSocket实现即时消息
## 1. WebSocket技术概述
### 1.1 WebSocket与传统HTTP协议对比
WebSocket是一种在单个TCP连接上进行全双工通信的协议,与传统的HTTP协议有着本质区别:
| 特性 | WebSocket | HTTP |
|--------------------|-------------------------------|--------------------------|
| 通信模式 | 全双工 | 半双工 |
| 连接持续时间 | 持久连接 | 短连接(请求-响应后关闭)|
| 数据格式 | 二进制帧/文本帧 | 文本报文 |
| 头部开销 | 首次握手后头部极小 | 每个请求都携带完整头部 |
| 服务器推送能力 | 支持主动推送 | 仅能响应客户端请求 |
| 适用场景 | 实时应用(聊天、游戏等) | 传统Web应用 |
### 1.2 WebSocket协议握手过程
WebSocket建立连接需要经过标准的握手流程:
1. **HTTP升级请求**:客户端发送包含`Upgrade: websocket`头的HTTP请求
```http
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Spring Framework从4.0版本开始提供完整的WebSocket支持,主要包括:
<dependencies>
<!-- WebSocket支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- 前端模板引擎(可选) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/ws")
.setAllowedOrigins("*")
.addInterceptors(new HttpSessionHandshakeInterceptor());
}
@Bean
public WebSocketHandler myHandler() {
return new MyWebSocketHandler();
}
}
public class MyWebSocketHandler extends TextWebSocketHandler {
private static final List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.add(session);
System.out.println("Connection established: " + session.getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String payload = message.getPayload();
System.out.println("Received: " + payload);
// 广播消息给所有客户端
for (WebSocketSession s : sessions) {
try {
s.sendMessage(new TextMessage("Echo: " + payload));
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
sessions.remove(session);
System.out.println("Connection closed: " + session.getId());
}
}
afterConnectionEstablished
被调用handleTextMessage
处理文本消息handleTransportError
处理传输错误afterConnectionClosed
清理资源@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
return container;
}
registry.addHandler(myHandler(), "/ws")
.setAllowedOrigins("*")
.addInterceptors(new HttpSessionHandshakeInterceptor())
.withSockJS()
.setHeartbeatTime(25000); // 25秒心跳
STOMP(Simple Text Oriented Messaging Protocol)提供了基于帧的互操作格式:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 启用简单内存消息代理
config.enableSimpleBroker("/topic", "/queue");
// 配置应用目的地前缀
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp")
.setAllowedOrigins("*")
.withSockJS();
}
}
@Controller
public class StompController {
@MessageMapping("/chat.send")
@SendTo("/topic/public")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
return chatMessage;
}
@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
public ChatMessage addUser(@Payload ChatMessage chatMessage,
SimpMessageHeaderAccessor headerAccessor) {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
return chatMessage;
}
}
@Configuration
@EnableWebSocketMessageBroker
public class RabbitMQWebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("localhost")
.setRelayPort(61613)
.setClientLogin("guest")
.setClientPasscode("guest");
config.setApplicationDestinationPrefixes("/app");
}
}
const socket = new WebSocket('ws://localhost:8080/ws');
socket.onopen = function(e) {
console.log('Connection established');
socket.send('Hello Server!');
};
socket.onmessage = function(event) {
console.log(`Data received: ${event.data}`);
};
socket.onclose = function(event) {
if (event.wasClean) {
console.log(`Connection closed cleanly, code=${event.code}`);
} else {
console.log('Connection died');
}
};
const socket = new SockJS('/ws-stomp');
const stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
console.log('Connected: ' + frame);
stompClient.subscribe('/topic/public', function(message) {
showMessage(JSON.parse(message.body));
});
});
function sendMessage() {
const message = {
content: $('#message').val(),
sender: $('#username').val()
};
stompClient.send("/app/chat.send", {}, JSON.stringify(message));
}
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;
const reconnectDelay = 5000;
function connect() {
stompClient.connect({},
function(frame) {
reconnectAttempts = 0;
// 订阅逻辑...
},
function(error) {
if(reconnectAttempts < maxReconnectAttempts) {
setTimeout(connect, reconnectDelay);
reconnectAttempts++;
}
}
);
}
@Configuration
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.csrf()
.ignoringAntMatchers("/ws/**", "/ws-stomp/**");
}
}
@Configuration
@EnableWebSocketSecurity
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
.simpDestMatchers("/app/**").authenticated()
.simpSubscribeDestMatchers("/user/**").authenticated()
.anyMessage().permitAll();
}
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxSessionIdleTimeout(600000L); // 10分钟
container.setMaxSessionsPerPrincipal(5); // 每个用户最多5个连接
return container;
}
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
return container;
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
return template;
}
}
@SpringBootTest
@WebAppConfiguration
public class WebSocketTest {
@Autowired
private WebSocketHandler handler;
@Test
void testWebSocketHandler() throws Exception {
TestWebSocketSession session = new TestWebSocketSession();
// 测试连接建立
handler.afterConnectionEstablished(session);
assertTrue(session.isOpen());
// 测试消息处理
TextMessage message = new TextMessage("test");
handler.handleMessage(session, message);
assertEquals(1, session.getSentMessages().size());
assertEquals("Echo: test", session.getSentMessages().get(0).getPayload());
}
}
# application.properties
management.endpoints.web.exposure.include=health,info,metrics,websockettrace
management.endpoint.websockettrace.enabled=true
src/main/
├── java/
│ └── com/example/chat/
│ ├── config/ # 配置类
│ ├── controller/ # MVC控制器
│ ├── dto/ # 数据传输对象
│ ├── handler/ # WebSocket处理器
│ └── service/ # 业务服务
└── resources/
├── static/ # 静态资源
├── templates/ # 模板文件
└── application.yml # 配置文件
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessage {
private MessageType type;
private String content;
private String sender;
public enum MessageType {
CHAT, JOIN, LEAVE
}
}
@Controller
public class ChatController {
@MessageMapping("/chat.sendMessage")
@SendTo("/topic/public")
public ChatMessage sendMessage(@Payload ChatMessage chatMessage) {
return chatMessage;
}
@MessageMapping("/chat.addUser")
@SendTo("/topic/public")
public ChatMessage addUser(@Payload ChatMessage chatMessage,
SimpMessageHeaderAccessor headerAccessor) {
headerAccessor.getSessionAttributes().put("username", chatMessage.getSender());
return chatMessage;
}
@SubscribeMapping("/topic/activeUsers")
public List<String> getActiveUsers() {
// 返回当前活跃用户列表
return activeUserService.getActiveUsers();
}
}
问题现象:WebSocket连接无法建立
排查步骤: 1. 检查浏览器控制台错误 2. 验证服务端是否正常启动 3. 使用Postman测试WebSocket端点 4. 检查防火墙/网络策略
典型场景:高并发下连接不稳定
优化方案:
1. 增加心跳间隔:setHeartbeatValue(new long[]{10000,10000})
2. 使用Nginx负载均衡:
location /ws/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
Android实现示例:
val wsUri = "ws://10.0.2.2:8080/ws"
val websocket = OkHttpClient().newWebSocket(
Request.Builder().url(wsUri).build(),
object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
// 处理消息
}
}
)
WebSocket技术为SpringBoot应用提供了强大的实时通信能力,本文详细介绍了:
未来发展趋势: - WebTransport协议替代方案 - QUIC协议支持 - 更好的移动端体验
最佳实践建议: - 对于简单场景使用原生WebSocket API - 复杂消息系统推荐STOMP协议 - 生产环境务必配置消息代理集群 - 始终考虑向后兼容性(SockJS)
附录: - 完整示例代码仓库 - WebSocket RFC文档 - Spring官方文档 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。