您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Storm集群WordCount的示例分析
## 一、Storm框架概述
### 1.1 实时计算与Storm
Apache Storm是一个开源的分布式实时计算系统,由Nathan Marz团队在BackType创建(后被Twitter收购)。它能够以可靠的方式处理无限的数据流,实现实时分析、在线机器学习、持续计算等场景。与Hadoop的批处理模式不同,Storm采用流式处理模型,延迟可达到秒级甚至毫秒级。
**核心特点:**
- 高可靠性:保证每条消息至少被处理一次(exactly-once语义可选)
- 水平扩展:通过增加节点线性提升处理能力
- 容错机制:任务失败时自动重启
- 多语言支持:支持Java、Python等多种语言开发组件
### 1.2 Storm架构组成
```mermaid
graph TD
Nimbus-->|提交拓扑|Zookeeper
Supervisor-->|心跳|Zookeeper
Nimbus-->|分配任务|Supervisor
Supervisor-->Worker
Worker-->Executor
Executor-->Task
关键角色说明: - Nimbus:主节点,负责拓扑分发、任务调度和故障监测 - Supervisor:工作节点,监听并执行分配给它的任务 - Zookeeper:协调服务,维护集群状态和配置信息 - Worker:JVM进程,执行特定拓扑的子集 - Executor:线程,运行一个或多个相同类型的Task - Task:实际的数据处理单元
WordCount作为经典案例,在Storm中的实现展示了流式处理的核心思想。数据流动过程如下:
Spout(数据源) --> SplitBolt(分词) --> CountBolt(计数) --> ReportBolt(报告)
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {
"storm cluster is powerful",
"learn storm by example",
"word count topology demo"
};
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
Utils.sleep(1000); // 防止过度消耗CPU
String sentence = sentences[new Random().nextInt(sentences.length)];
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split("\\s+");
for(String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private Map<String, Integer> counts = new HashMap<>();
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
counts.put(word, counts.getOrDefault(word, 0) + 1);
collector.emit(new Values(word, counts.get(word)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("sentence-spout", new SentenceSpout());
builder.setBolt("split-bolt", new SplitSentenceBolt())
.shuffleGrouping("sentence-spout");
builder.setBolt("count-bolt", new WordCountBolt())
.fieldsGrouping("split-bolt", new Fields("word"));
Config config = new Config();
config.setNumWorkers(3);
if(args != null && args.length > 0) {
// 集群模式
StormSubmitter.submitTopology(args[0], config,
builder.createTopology());
} else {
// 本地测试模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", config,
builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();
}
}
}
参数名 | 默认值 | 说明 |
---|---|---|
topology.workers | 1 | 工作进程数 |
topology.max.spout.pending | null | Spout最大未完成元组数 |
message.timeout.secs | 30 | 消息超时时间(秒) |
topology.debug | false | 是否开启调试模式 |
通过设置并行度参数实现水平扩展:
builder.setSpout("sentence-spout", new SentenceSpout(), 2); // 2个executor
builder.setBolt("split-bolt", new SplitSentenceBolt(), 4)
.setNumTasks(8) // 每个executor运行2个task
.shuffleGrouping("sentence-spout");
分组类型 | 数据分发方式 | 适用场景 |
---|---|---|
Shuffle | 随机均匀分配 | 无状态处理 |
Fields | 相同字段值发往同一任务 | 需要状态维护的操作(如计数) |
All | 广播到所有任务 | 全局通知类操作 |
Global | 全部发往同一任务(最低ID) | 汇总统计类操作 |
通过锚定(anchoring)和ack机制实现:
// 在Spout中发送时添加MessageID
collector.emit(new Values(sentence), messageId);
// Bolt中处理时锚定输入元组
collector.emit(tuple, new Values(word));
collector.ack(tuple);
topology.max.spout.pending
设置config.registerSerialization(MyClass.class);
worker.heap.memory.mb
supervisor.slots.ports
控制每节点worker数graph LR
LogSpout -->|原始日志| FilterBolt
FilterBolt -->|错误日志| AlarmBolt
FilterBolt -->|正常日志| StatsBolt
Storm的WordCount示例虽然简单,但完整展示了实时流处理的核心概念。随着Flink等新框架的出现,Storm在某些场景已被替代,但其设计思想仍具参考价值。建议学习者进一步探索: - Trident高级抽象 - 与Kafka的集成方案 - 状态管理优化技巧
最佳实践建议:生产环境中建议使用Storm 2.x版本,其引入的分布式API和性能改进(如流量控制)能显著提升稳定性。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。