storm实时排序TopN怎么使用

发布时间:2021-12-23 14:20:04 作者:iii
来源:亿速云 阅读:200
# Storm实时排序TopN怎么使用

## 一、Storm与实时计算概述

Apache Storm是一个开源的分布式实时计算系统,由Twitter开发并开源。它能够可靠地处理无界数据流(即持续不断产生的数据),广泛应用于实时分析、在线机器学习、持续计算等场景。与批处理框架(如Hadoop MapReduce)不同,Storm的特点是低延迟(毫秒级响应)和高吞吐量。

### 1.1 Storm核心概念
- **Topology(拓扑)**:Storm作业的抽象表示,由Spout和Bolt组成的有向无环图(DAG)
- **Spout**:数据源组件,负责从外部数据源(如Kafka、MQ等)读取数据并发射到拓扑中
- **Bolt**:处理组件,负责对数据进行各种处理(过滤、聚合、连接等)
- **Tuple**:Storm中的基本数据单元,由一组键值对组成
- **Stream Grouping**:定义Tuple如何在Bolt之间分发

### 1.2 实时TopN的应用场景
- 电商实时热销商品排行
- 社交媒体热门话题追踪
- 网络流量实时监控(TopN攻击IP)
- 金融交易实时异常检测

## 二、TopN排序实现原理

### 2.1 基本思路
在Storm中实现TopN排序通常需要:
1. **数据分片处理**:将数据按关键字段分组(如商品ID)
2. **局部聚合**:在每个Bolt实例中维护局部TopN
3. **全局聚合**:将局部结果汇总得到全局TopN

### 2.2 数据结构选择
高效的TopN实现依赖于合适的数据结构:

| 数据结构 | 插入复杂度 | 查询TopN复杂度 | 适用场景 |
|---------|-----------|---------------|---------|
| 普通List | O(1)      | O(nlogn)       | 小数据量 |
| 二叉堆   | O(logn)   | O(1)获取Top1   | 中等数据 |
| TreeMap  | O(logn)   | O(logn)        | 大数据量 |

推荐使用`TreeMap`或`PriorityQueue`,因为它们能高效维护有序集合。

## 三、Storm实现TopN的完整示例

### 3.1 项目环境准备
```xml
<!-- Maven依赖 -->
<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-core</artifactId>
    <version>2.3.0</version>
    <scope>provided</scope>
</dependency>

3.2 Spout实现(模拟数据源)

public class RandomSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private Random random = new Random();
    private String[] products = {"iPhone", "iPad", "MacBook", "AirPods", "Watch"};

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        Utils.sleep(100); // 控制发射速度
        String product = products[random.nextInt(products.length)];
        int count = random.nextInt(100);
        collector.emit(new Values(product, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("product", "count"));
    }
}

3.3 局部TopN Bolt实现

public class PartialRankingBolt extends BaseRichBolt {
    private OutputCollector collector;
    private int topN;
    private TreeMap<Integer, String> localRankings;

    public PartialRankingBolt(int topN) {
        this.topN = topN;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.localRankings = new TreeMap<>(Collections.reverseOrder());
    }

    @Override
    public void execute(Tuple tuple) {
        String product = tuple.getStringByField("product");
        Integer count = tuple.getIntegerByField("count");
        
        // 更新局部排名
        localRankings.put(count, product);
        
        // 保持只保留TopN
        if (localRankings.size() > topN) {
            localRankings.pollLastEntry();
        }
        
        // 定时发送当前排名(例如每5秒)
        if (System.currentTimeMillis() % 5000 == 0) {
            collector.emit(new Values(new ArrayList<>(localRankings.entrySet())));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("partialRankings"));
    }
}

3.4 全局TopN Bolt实现

public class GlobalRankingBolt extends BaseRichBolt {
    private int topN;
    private TreeMap<Integer, String> globalRankings;
    
    public GlobalRankingBolt(int topN) {
        this.topN = topN;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.globalRankings = new TreeMap<>(Collections.reverseOrder());
    }

    @Override
    public void execute(Tuple tuple) {
        List<Map.Entry<Integer, String>> partial = (List<Map.Entry<Integer, String>>) tuple.getValueByField("partialRankings");
        
        // 合并局部结果
        for (Map.Entry<Integer, String> entry : partial) {
            globalRankings.put(entry.getKey(), entry.getValue());
            if (globalRankings.size() > topN) {
                globalRankings.pollLastEntry();
            }
        }
        
        // 打印当前全局TopN
        System.out.println("--- Global Top " + topN + " ---");
        int rank = 1;
        for (Map.Entry<Integer, String> entry : globalRankings.entrySet()) {
            System.out.println(rank++ + ". " + entry.getValue() + ": " + entry.getKey());
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // 无需发射,仅打印
    }
}

3.5 构建Topology

public class TopNTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        int topN = 5;
        
        builder.setSpout("spout", new RandomSpout(), 1);
        builder.setBolt("partial", new PartialRankingBolt(topN), 3)
               .shuffleGrouping("spout");
        builder.setBolt("global", new GlobalRankingBolt(topN), 1)
               .globalGrouping("partial");
        
        Config conf = new Config();
        conf.setDebug(true);
        
        // 本地模式运行
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("topn-demo", conf, builder.createTopology());
        
        // 生产环境使用StormSubmitter
        // StormSubmitter.submitTopology("topn-prod", conf, builder.createTopology());
    }
}

四、性能优化与注意事项

4.1 性能优化技巧

  1. 合理设置并行度

    • Spout和第一个Bolt的并行度可以较高(如3-5个)
    • 全局聚合Bolt通常设为1(避免数据分散)
  2. 使用高效的序列化

    conf.registerSerialization(ProductCount.class, ProductCountSerializer.class);
    
  3. 批处理优化

    // 在Bolt中积累一定量数据再处理
    @Override
    public void execute(Tuple tuple) {
       buffer.add(tuple);
       if(buffer.size() >= BATCH_SIZE) {
           processBatch();
           buffer.clear();
       }
    }
    

4.2 常见问题解决

  1. 内存溢出

    • 为TreeMap设置大小限制
    • 使用LRU策略淘汰旧数据
  2. 数据倾斜

    // 使用fieldsGrouping代替shuffleGrouping
    builder.setBolt("partial", new PartialRankingBolt(), 3)
          .fieldsGrouping("spout", new Fields("product"));
    
  3. 时间窗口处理

    // 滑动窗口实现示例
    private Map<Long, TreeMap<Integer, String>> timeWindows = new HashMap<>();
    

五、扩展应用:带时间窗口的TopN

5.1 滑动窗口实现

public class SlidingWindowRankingBolt extends BaseRichBolt {
    private Map<String, SlidingWindowCounter> counters = new HashMap<>();
    private int windowLengthInSeconds;
    private int emitFrequencyInSeconds;
    
    @Override
    public void execute(Tuple tuple) {
        String product = tuple.getStringByField("product");
        counters.computeIfAbsent(product, k -> 
            new SlidingWindowCounter(windowLengthInSeconds))
            .increment();
    }
}

5.2 与外部存储集成

// 将结果写入Redis
public void saveToRedis(TreeMap<Integer, String> rankings) {
    Jedis jedis = new Jedis("localhost");
    jedis.del("topn-products");
    rankings.forEach((count, product) -> 
        jedis.zadd("topn-products", count, product));
}

六、总结

Storm实现实时TopN排序的关键点: 1. 合理设计拓扑结构(分阶段处理) 2. 选择高效的数据结构(TreeMap/PriorityQueue) 3. 注意内存管理和数据倾斜问题 4. 根据业务需求选择合适的时间窗口

实际应用中,可以结合Kafka等消息队列作为数据源,使用Redis存储最终结果,构建完整的实时分析系统。对于更复杂的场景,可以考虑使用Apache Flink或Spark Streaming等框架,它们内置了更丰富的窗口操作和状态管理功能。 “`

推荐阅读:
  1. mongodb 分组 topN
  2. storm项目之实时流式计算介绍

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

storm topn

上一篇:Storm如何接收数据

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》