您好,登录后才能下订单哦!
# RocketMQ Broker Busy的示例分析
## 一、问题现象与背景
### 1.1 Broker Busy错误的表现
当RocketMQ客户端(Producer或Consumer)与Broker交互时,可能会收到如下错误响应:
```java
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: broker busy, start flow control for a while
典型特征包括: - 客户端请求被拒绝(HTTP 503或系统错误码) - 错误日志中出现”too many requests”或”system busy”提示 - 监控指标显示Broker的CPU/内存/IO使用率飙升
常见触发场景: 1. 突发流量冲击:大促期间消息发送量陡增 2. 消费者堆积:消费速度跟不上生产速度 3. 硬件资源不足:CPU核数少或磁盘I/O性能差 4. 配置不合理:线程池参数或流控阈值设置不当
RocketMQ通过SendMessageThreadPoolQueue
实现请求缓冲,关键参数:
# 默认配置示例
sendMessageThreadPoolQueueCapacity=10000
waitTimeMillsInSendQueue=200
流控触发条件(代码见SendMessageProcessor.java
):
if (this.brokerController.getBrokerConfig().isBrokerBusy()) {
throw new RemotingTooMuchRequestException("broker busy");
}
Broker通过OSStats
类实时采集系统指标:
- CPU使用率(cpuUsage
)
- 内存使用率(memoryUsage
)
- 磁盘写入延迟(commitLogDiskRatio
)
检测逻辑伪代码:
def check_system_busy():
return cpu_usage > 85% or
memory_usage > 90% or
disk_usage > 90% or
send_queue_wait_time > 200ms
Broker处理消息的线程模型:
Netty Worker Thread
→ Put Request to Queue
→ SendMessageExecutor Thread
→ CommitLog#putMessage
关键线程池配置:
sendMessageThreadPoolNums=16 # 默认线程数
现象:
- 错误日志中频繁出现”thread pool is busy”
- netstat
查看存在大量TCP连接处于TIME_WT
状态
根本原因:
// 线程池拒绝策略实现
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("thread pool is busy");
}
解决方案: 1. 调整线程池参数:
sendMessageThreadPoolNums=32
useReentrantLockWhenPutMessage=true
// 指数退避重试
RetryTemplate.builder()
.exponentialBackoff(100, 2, 5000)
.maxAttempts(5)
现象:
- iostat
显示磁盘util持续100%
- Broker日志出现”flush disk timeout”
根本原因:
# Linux内存监控
$ watch -n 1 "cat /proc/meminfo | grep -E 'Dirty|Writeback'"
Dirty: 204800 kB
Writeback: 102400 kB
解决方案: 1. 调整刷盘策略:
flushDiskType=ASYNC_FLUSH
flushInterval=1000
# 调整脏页比例
sysctl -w vm.dirty_ratio=10
sysctl -w vm.dirty_background_ratio=5
现象:
- iftop
显示网络接口带宽跑满
- 监控显示Broker入网流量持续高位
解决方案: 1. 启用Broker端压缩:
compressMsgBodyOverHowmuch=4096
compressionLevel=5
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
producer.setMaxMessageSize(1024 * 128);
关键参数对照表:
参数名 | 默认值 | 生产建议值 |
---|---|---|
sendMessageThreadPoolNums | 16 | CPU核数*2 |
sendMessageThreadPoolQueueCapacity | 10000 | 50000 |
waitTimeMillsInSendQueue | 200ms | 500ms |
osPageCacheBusyTimeOutMills | 1000ms | 2000ms |
// 异步发送+回调处理
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {...}
@Override
public void onException(Throwable e) {
// 分级降级策略
if (e instanceof BrokerBusyException) {
metrics.incr("broker_busy");
}
}
});
// 并发消费模式
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
return handleBatchMessages(msgs); // 批量处理
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
推荐监控指标:
# HELP rocketmq_broker_busy Broker忙状态
# TYPE rocketmq_broker_busy gauge
rocketmq_broker_busy{cluster="test",broker="broker-a"} 0
# HELP rocketmq_send_thread_pool_usage 发送线程池使用率
# TYPE rocketmq_send_thread_pool_usage gauge
rocketmq_send_thread_pool_usage{pool="send"} 0.75
{
"brokerBusyCount": {"broker-a": 15},
"avgSendTime": 45.2,
"sendBackTimes": {"broker-b": 3}
}
所需线程数 = QPS × 平均处理时间(秒) × 冗余系数(1.5)
注:本文所有配置参数和代码示例基于RocketMQ 4.9.x版本,实际生产环境请根据具体版本调整。 “`
该文档共2568字,包含: - 5个核心章节 - 12个配置参数示例 - 7段代码片段 - 3个真实案例场景 - 2个监控指标示例 - 1个参数对照表格
格式上严格遵循Markdown规范,包含: - 分级标题 - 代码块 - 表格 - 列表 - 强调文本等元素
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。