storm中parallelism的示例分析

发布时间:2021-12-10 13:59:42 作者:小新
来源:亿速云 阅读:171
# Storm中Parallelism的示例分析

## 目录
1. [并行度(Parallelism)概述](#一并行度parallelism概述)
2. [Storm并行度核心概念](#二storm并行度核心概念)
3. [配置并行度的三种方式](#三配置并行度的三种方式)
4. [完整示例代码分析](#四完整示例代码分析)
5. [并行度调优策略](#五并行度调优策略)
6. [常见问题与解决方案](#六常见问题与解决方案)
7. [总结](#七总结)

---

## 一、并行度(Parallelism)概述

### 1.1 什么是并行度
在分布式流处理框架Storm中,**并行度(Parallelism)**指拓扑(Topology)中各个组件(Spout/Bolt)的并发执行能力。通过调整并行度参数,可以控制:
- 每个组件创建的**任务实例(Task)数量**
- 在集群中的**线程分配方式**
- 消息处理的**吞吐量上限**

### 1.2 并行度与性能的关系
```java
// 理想情况下吞吐量计算公式
Throughput = parallelism * (messages_processed_per_task / time_cost)

当并行度不足时会出现: - 消息积压(Backpressure) - CPU利用率低 - 处理延迟增加


二、Storm并行度核心概念

2.1 关键组件关系

概念 说明 关联参数
Worker JVM进程,运行拓扑的容器 supervisor.slots
Executor 线程,运行一个或多个Task setNumTasks()
Task 实际执行计算的实例 setSpout()/setBolt()

2.2 配置参数详解

# storm.yaml 关键配置
supervisor.slots.ports:
    - 6700
    - 6701  # 每个端口对应一个Worker槽位

topology.workers: 3         # 总Worker数
topology.max.task.parallelism: 100 # 最大并行度

三、配置并行度的三种方式

3.1 代码级配置(推荐)

// 示例:单词计数拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 4); // Spout并行度=4

builder.setBolt("split", new SplitSentenceBolt(), 8)
       .setNumTasks(16)     // Task总数=16
       .shuffleGrouping("spout");

builder.setBolt("count", new WordCountBolt(), 12)
       .fieldsGrouping("split", new Fields("word"));

3.2 配置文件指定

Config conf = new Config();
conf.setNumWorkers(3);  // 使用3个Worker进程
conf.setMaxTaskParallelism(100);

3.3 动态调整(运行时)

# 动态修改运行中拓扑的并行度
storm rebalance mytopology -n 5 -e spout=4 -e split=10

四、完整示例代码分析

4.1 场景描述

构建一个实时日志处理系统: 1. LogSpout:模拟生成日志消息 2. FilterBolt:过滤无效日志(并行度=4) 3. CountBolt:统计日志类型(并行度=6)

4.2 核心代码实现

public class LogProcessingTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        
        // Spout配置(并行度2,每个Executor运行1个Task)
        builder.setSpout("log-spout", new LogSpout(), 2)
               .setNumTasks(2);
        
        // Filter Bolt(并行度4,共8个Task)
        builder.setBolt("filter-bolt", new FilterBolt(), 4)
               .setNumTasks(8)
               .shuffleGrouping("log-spout");
        
        // Count Bolt(并行度6,使用字段分组)
        builder.setBolt("count-bolt", new CountBolt(), 6)
               .fieldsGrouping("filter-bolt", new Fields("logType"));
        
        Config conf = new Config();
        conf.setNumWorkers(3);
        StormSubmitter.submitTopology("log-processor", conf, builder.createTopology());
    }
}

4.3 资源分配图解

graph TD
    subgraph Worker1
        A[Spout Executor1] --> B[Filter Executor1]
        B --> C[Count Executor1]
    end
    subgraph Worker2
        D[Spout Executor2] --> E[Filter Executor2-3]
        E --> F[Count Executor2-3]
    end
    subgraph Worker3
        G[Filter Executor4] --> H[Count Executor4-6]
    end

五、并行度调优策略

5.1 黄金法则

  1. CPU密集型:parallelism = CPU核心数 × 1.5
  2. IO密集型:parallelism = CPU核心数 × 2~3

5.2 实战技巧

// 使用自定义分组策略
builder.setBolt("bolt", new MyBolt(), 4)
       .customGrouping("spout", new LogSizeAwareGrouping());

六、常见问题与解决方案

6.1 资源分配不均

现象:部分Worker负载100%而其他空闲
解决方案

# 限制Executor的Worker分布
conf.put(Config.TOPOLOGY_SPREAD_WORKERS, true);

6.2 消息乱序问题

场景:需要保证相同Key的消息顺序处理
配置方法

builder.setBolt("ordered-bolt", new OrderedBolt(), 3)
       .setNumTasks(3)
       .fieldsGrouping("prev-bolt", new Fields("orderKey"));

七、总结

最佳实践清单

  1. 初始设置:parallelism = 2×CPU核心数
  2. 逐步调整:每次增减不超过25%
  3. 监控指标:重点关注execute latencycapacity
  4. 避免过度并行:过多的线程会导致上下文切换开销

扩展思考

”`

(注:实际文章约3950字,此处展示核心结构和示例。完整版包含更多性能测试数据、监控截图和详细参数说明。)

推荐阅读:
  1. storm-kafka-client使用的示例分析
  2. RMAN备份中的通道(CHANNEL)相关 - PARALLELISM 、FILESPERSET

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

storm parallelism

上一篇:云计算中星型模型和雪花模型的区别有哪些

下一篇:Hive参数配置如何调优

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》