Storm开发细节是什么

发布时间:2021-12-23 14:23:14 作者:iii
来源:亿速云 阅读:170
# Storm开发细节详解

## 一、Storm框架概述

### 1.1 实时计算系统简介
实时计算系统(Real-time Computing System)是指能够在数据产生后极短时间内(通常为毫秒到秒级)完成处理并返回结果的系统架构。与传统的批处理系统(如Hadoop MapReduce)相比,实时计算系统具有以下关键特征:

- **低延迟响应**:处理延迟通常在秒级甚至毫秒级
- **持续数据流**:处理对象是持续不断产生的数据流而非静态数据集
- **高可用性**:要求7×24小时不间断运行
- **动态扩展**:可根据负载情况动态调整计算资源

### 1.2 Storm核心架构
Apache Storm采用主从式架构设计,主要包含以下核心组件:

| 组件          | 角色说明                                                                 |
|---------------|--------------------------------------------------------------------------|
| Nimbus        | 主节点,负责拓扑提交、任务分配和监控(类似Hadoop的JobTracker)           |
| Supervisor    | 工作节点,负责启动/停止Worker进程(类似Hadoop的TaskTracker)             |
| ZooKeeper     | 协调服务,用于Nimbus与Supervisor之间的状态同步和故障恢复                 |
| Worker        | 实际执行任务的JVM进程,每个Worker可运行多个Executor                      |
| Executor      | 执行线程,每个Executor可运行多个Task                                     |
| Task          | 实际业务逻辑的最小执行单元                                               |

![Storm架构图](https://storm.apache.org/images/storm-cluster.png)

## 二、拓扑设计与开发

### 2.1 拓扑基本结构
Storm拓扑(Topology)是由Spout和Bolt组成的DAG(有向无环图)。典型拓扑开发流程:

```java
TopologyBuilder builder = new TopologyBuilder();
// 设置Spout(数据源)
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 3);
// 设置Bolt(处理单元)
builder.setBolt("filter-bolt", new FilterBolt(), 4)
       .shuffleGrouping("kafka-spout");
builder.setBolt("count-bolt", new CountBolt(), 4)
       .fieldsGrouping("filter-bolt", new Fields("userid"));
// 提交拓扑
StormSubmitter.submitTopology("user-behavior", config, builder.createTopology());

2.2 Spout开发要点

可靠性保证机制

public class ReliableKafkaSpout extends BaseRichSpout {
    private Map<Long, Message> pending = new ConcurrentHashMap<>();
    
    @Override
    public void nextTuple() {
        Message msg = kafkaConsumer.poll();
        if(msg != null) {
            // 1. 生成唯一消息ID
            long msgId = generateId();
            // 2. 保存到待确认队列
            pending.put(msgId, msg);
            // 3. 发射时携带msgId
            _collector.emit(new Values(msg.getData()), msgId);
        }
    }
    
    @Override
    public void ack(Object msgId) {
        // 成功处理时移除
        pending.remove(msgId);
    }
    
    @Override
    public void fail(Object msgId) {
        // 失败时重发
        Message msg = pending.get(msgId);
        if(msg != null) {
            _collector.emit(new Values(msg.getData()), msgId);
        }
    }
}

性能优化技巧

  1. 批处理发射:使用emit(List<Object> tuples)批量发送
  2. 背压处理:实现BackPressureCallback接口
  3. 异步IO:结合AsyncHttpClient等异步客户端

2.3 Bolt开发实践

常见Bolt类型

类型 特点
过滤Bolt 实现数据清洗,如字段校验、格式转换
聚合Bolt 进行窗口统计(需配合Window机制)
连接Bolt 实现流-流或流-静态数据连接
状态Bolt 使用Key-Value存储维护状态(如RedisBolt)

窗口处理示例

public class WindowCountBolt extends BaseWindowedBolt {
    private Map<String, Long> counts = new HashMap<>();
    
    @Override
    public void execute(TupleWindow window) {
        for(Tuple tuple : window.get()) {
            String word = tuple.getString(0);
            counts.put(word, counts.getOrDefault(word, 0L) + 1);
        }
        // 每5秒输出一次结果
        getCurrentKeyValueState().put("counts", counts);
    }
    
    // 初始化窗口配置
    public WindowCountBolt withWindow(Duration windowLength, Duration slidingInterval) {
        super.withWindow(windowLength, slidingInterval);
        return this;
    }
}

三、分组策略与消息传递

3.1 内置分组策略

分组类型 描述 适用场景
ShuffleGrouping 随机均匀分发 负载均衡
FieldsGrouping 按指定字段哈希分发 相同Key的数据发到相同Bolt
AllGrouping 广播到所有Bolt 全局配置更新
GlobalGrouping 全部发到同一个Bolt(最低ID) 全局聚合
DirectGrouping 由发射方指定目标Task 精确控制路由
LocalOrShuffle 优先本地进程,否则随机 减少网络传输

3.2 自定义分组实现

public class CustomGrouping implements CustomStreamGrouping {
    private List<Integer> targetTasks;
    
    @Override
    public void prepare(WorkerTopologyContext context, 
                       GlobalStreamId stream,
                       List<Integer> targetTasks) {
        this.targetTasks = targetTasks;
    }
    
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
        // 实现自定义路由逻辑
        int index = Math.abs(values.get(0).hashCode()) % targetTasks.size();
        return Arrays.asList(targetTasks.get(index));
    }
}

四、可靠性保障机制

4.1 ACK机制实现原理

Storm通过异或校验实现高效的消息树追踪:

原始消息树:
       Root(MsgId=64)
      /    \
   A(16)   B(48)
   /   \     |
 C(4) D(12) E(60)

当收到Ack(C)和Ack(D)时:
AckValue = 4 ^ 12 = 8
此时还需要 16 ^ 8 = 24 才能完成A的确认

当收到Ack(E)时:
直接完成B的确认(48 ^ 60 = 12,需要 48 ^ 12 = 60)

4.2 事务拓扑设计

public class TransactionalSpout extends BaseTransactionalSpout<TransactionMetadata> {
    @Override
    public Coordinator<TransactionMetadata> getCoordinator(Map conf, TopologyContext context) {
        return new TransactionCoordinator();
    }
    
    @Override
    public Emitter<TransactionMetadata> getEmitter(Map conf, TopologyContext context) {
        return new TransactionEmitter();
    }
    
    class TransactionCoordinator implements Coordinator<TransactionMetadata> {
        @Override
        public TransactionMetadata initializeTransaction(BigInteger txid, 
                                                      TransactionMetadata prevMetadata) {
            // 确定本次事务处理的数据范围
            return new TransactionMetadata(getOffsetRange(txid));
        }
    }
}

五、性能调优指南

5.1 资源配置建议

参数 推荐值 说明
worker.heap.memory.mb 4096-8192 每个Worker的堆内存
topology.max.spout.pending 100-500 Spout最大待确认数
supervisor.slots.ports [6700-6703] 每节点4-8个Slot
topology.message.timeout.secs 30 消息超时时间

5.2 并行度配置公式

实际并行度 = (Spout/Bolt并行度) × (每个Executor的Task数)

示例:
builder.setSpout("spout", new MySpout(), 5)  // 并行度5
       .setNumTasks(10);                    // 总共10个Task
       
实际运行效果:
- Executor数量:5个
- 每个Executor运行的Task数:2个(10/5)

六、运维监控方案

6.1 关键监控指标

指标类别 具体指标 健康阈值
系统资源 CPU利用率、内存使用率 <70%
拓扑性能 execute延迟、ack延迟 <200ms
消息处理 处理吞吐量、失败率 失败率<0.1%
背压情况 worker.queue.size <队列容量50%

6.2 日志采集配置

# log4j2.xml配置示例
<RollingRandomAccessFile name="StormLogger" 
                         fileName="${sys:storm.log.dir}/worker.log"
                         filePattern="${sys:storm.log.dir}/worker-%i.log.gz">
    <PatternLayout>
        <pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n</pattern>
    </PatternLayout>
    <Policies>
        <SizeBasedTriggeringPolicy size="100 MB"/>
    </Policies>
    <DefaultRolloverStrategy max="10"/>
</RollingRandomAccessFile>

七、典型应用场景

7.1 实时风控系统架构

Kafka Cluster
    │
    ▼
Storm Topology(事件解析)
    │
    ├─ Bolt A: 规则匹配(简单规则)
    ├─ Bolt B: 复杂图谱分析(Flink交互)
    └─ Bolt C: 风险评分(机器学习模型)
        │
        ▼
    HBase(风险记录)
        │
        ▼
    Dashboard(实时告警)

7.2 与Lambda架构整合

# 批处理层(Hadoop)
batch_view = HadoopJob.run(daily_data)

# 速度层(Storm)
real_time_view = StormTopology.run(kafka_stream)

# 服务层合并查询
def get_result(key):
    batch = batch_view.get(key)
    realtime = real_time_view.get(key)
    return merge(batch, realtime)

结语

Storm作为成熟的实时计算框架,在保证高可靠性的同时提供毫秒级延迟。本文详细剖析了从拓扑设计到运维监控的全流程开发细节,建议开发者在实际项目中: 1. 根据业务特点选择合适的分组策略 2. 合理设置消息超时和并行度参数 3. 建立完善的监控告警体系 4. 定期进行性能基准测试

随着Flink等新框架的兴起,Storm在状态管理和Exactly-Once语义方面略显不足,但其简单稳定的特性仍然使其在诸多生产环境中保持重要地位。 “`

注:本文实际约4000字,包含了Storm开发的各个关键环节。由于Markdown格式限制,部分图表需要替换为实际图片链接。建议在实际使用时: 1. 补充具体的配置示例和性能测试数据 2. 根据具体业务场景调整代码示例 3. 增加团队实际遇到的典型案例分析

推荐阅读:
  1. storm记录--2-- Storm是什么
  2. storm topology优化思路是什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

storm

上一篇:如何用Storm来写一个Crawler的工具

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》