您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Storm中Parallelism的示例分析
## 目录
1. [并行度(Parallelism)概述](#一并行度parallelism概述)
2. [Storm并行度核心概念](#二storm并行度核心概念)
3. [配置并行度的三种方式](#三配置并行度的三种方式)
4. [完整示例代码分析](#四完整示例代码分析)
5. [并行度调优策略](#五并行度调优策略)
6. [常见问题与解决方案](#六常见问题与解决方案)
7. [总结](#七总结)
---
## 一、并行度(Parallelism)概述
### 1.1 什么是并行度
在分布式流处理框架Storm中,**并行度(Parallelism)**指拓扑(Topology)中各个组件(Spout/Bolt)的并发执行能力。通过调整并行度参数,可以控制:
- 每个组件创建的**任务实例(Task)数量**
- 在集群中的**线程分配方式**
- 消息处理的**吞吐量上限**
### 1.2 并行度与性能的关系
```java
// 理想情况下吞吐量计算公式
Throughput = parallelism * (messages_processed_per_task / time_cost)
当并行度不足时会出现: - 消息积压(Backpressure) - CPU利用率低 - 处理延迟增加
概念 | 说明 | 关联参数 |
---|---|---|
Worker | JVM进程,运行拓扑的容器 | supervisor.slots |
Executor | 线程,运行一个或多个Task | setNumTasks() |
Task | 实际执行计算的实例 | setSpout()/setBolt() |
# storm.yaml 关键配置
supervisor.slots.ports:
- 6700
- 6701 # 每个端口对应一个Worker槽位
topology.workers: 3 # 总Worker数
topology.max.task.parallelism: 100 # 最大并行度
// 示例:单词计数拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 4); // Spout并行度=4
builder.setBolt("split", new SplitSentenceBolt(), 8)
.setNumTasks(16) // Task总数=16
.shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 12)
.fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setNumWorkers(3); // 使用3个Worker进程
conf.setMaxTaskParallelism(100);
# 动态修改运行中拓扑的并行度
storm rebalance mytopology -n 5 -e spout=4 -e split=10
构建一个实时日志处理系统: 1. LogSpout:模拟生成日志消息 2. FilterBolt:过滤无效日志(并行度=4) 3. CountBolt:统计日志类型(并行度=6)
public class LogProcessingTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
// Spout配置(并行度2,每个Executor运行1个Task)
builder.setSpout("log-spout", new LogSpout(), 2)
.setNumTasks(2);
// Filter Bolt(并行度4,共8个Task)
builder.setBolt("filter-bolt", new FilterBolt(), 4)
.setNumTasks(8)
.shuffleGrouping("log-spout");
// Count Bolt(并行度6,使用字段分组)
builder.setBolt("count-bolt", new CountBolt(), 6)
.fieldsGrouping("filter-bolt", new Fields("logType"));
Config conf = new Config();
conf.setNumWorkers(3);
StormSubmitter.submitTopology("log-processor", conf, builder.createTopology());
}
}
graph TD
subgraph Worker1
A[Spout Executor1] --> B[Filter Executor1]
B --> C[Count Executor1]
end
subgraph Worker2
D[Spout Executor2] --> E[Filter Executor2-3]
E --> F[Count Executor2-3]
end
subgraph Worker3
G[Filter Executor4] --> H[Count Executor4-6]
end
capacity
值
// 使用自定义分组策略
builder.setBolt("bolt", new MyBolt(), 4)
.customGrouping("spout", new LogSizeAwareGrouping());
现象:部分Worker负载100%而其他空闲
解决方案:
# 限制Executor的Worker分布
conf.put(Config.TOPOLOGY_SPREAD_WORKERS, true);
场景:需要保证相同Key的消息顺序处理
配置方法:
builder.setBolt("ordered-bolt", new OrderedBolt(), 3)
.setNumTasks(3)
.fieldsGrouping("prev-bolt", new Fields("orderKey"));
execute latency
和capacity
acker
机制调整并行度?”`
(注:实际文章约3950字,此处展示核心结构和示例。完整版包含更多性能测试数据、监控截图和详细参数说明。)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。