ubuntu单机版搭建storm环境的示例分析

发布时间:2021-11-15 17:20:27 作者:小新
来源:亿速云 阅读:154
# Ubuntu单机版搭建Storm环境的示例分析

## 目录
- [一、Storm框架概述](#一storm框架概述)
  - [1.1 实时计算需求背景](#11-实时计算需求背景)
  - [1.2 Storm核心特性与架构](#12-storm核心特性与架构)
  - [1.3 对比其他流处理框架](#13-对比其他流处理框架)
- [二、环境准备与前置条件](#二环境准备与前置条件)
  - [2.1 Ubuntu系统要求](#21-ubuntu系统要求)
  - [2.2 Java环境配置](#22-java环境配置)
  - [2.3 ZooKeeper安装](#23-zookeeper安装)
- [三、Storm单机版安装部署](#三storm单机版安装部署)
  - [3.1 下载与解压Storm](#31-下载与解压storm)
  - [3.2 配置storm.yaml详解](#32-配置stormyaml详解)
  - [3.3 启动Nimbus和Supervisor](#33-启动nimbus和supervisor)
- [四、拓扑开发与提交实战](#四拓扑开发与提交实战)
  - [4.1 创建WordCount拓扑](#41-创建wordcount拓扑)
  - [4.2 Maven项目配置](#42-maven项目配置)
  - [4.3 提交与监控拓扑](#43-提交与监控拓扑)
- [五、常见问题排查指南](#五常见问题排查指南)
  - [5.1 端口冲突解决方案](#51-端口冲突解决方案)
  - [5.2 资源不足错误处理](#52-资源不足错误处理)
  - [5.3 日志分析与调试技巧](#53-日志分析与调试技巧)
- [六、性能优化建议](#六性能优化建议)
  - [6.1 Worker与Executor调优](#61-worker与executor调优)
  - [6.2 消息可靠性保障](#62-消息可靠性保障)
  - [6.3 JVM参数优化](#63-jvm参数优化)
- [七、扩展应用场景](#七扩展应用场景)
  - [7.1 结合Kafka的数据管道](#71-结合kafka的数据管道)
  - [7.2 机器学习实时预测](#72-机器学习实时预测)
  - [7.3 物联网数据处理](#73-物联网数据处理)
- [八、总结与展望](#八总结与展望)

## 一、Storm框架概述

### 1.1 实时计算需求背景
在大数据时代背景下,企业对数据处理的时效性要求越来越高。传统批处理模式(如Hadoop MapReduce)通常存在分钟级甚至小时级的延迟,无法满足以下典型场景需求:
- 金融领域的实时风控检测
- 电商平台的实时推荐系统
- 物联网设备的实时状态监控
- 社交网络的实时趋势分析

Storm作为分布式实时计算系统的开创者,由Nathan Marz在BackType公司创建(后被Twitter收购),其核心设计目标就是实现"持续不断的数据流处理"。

### 1.2 Storm核心特性与架构
Storm的核心架构采用主从模式(Master-Worker),主要包含以下组件:

| 组件        | 角色说明                          |
|-------------|---------------------------------|
| Nimbus      | 主节点,负责拓扑分发和任务调度     |
| Supervisor  | 工作节点,执行具体任务             |
| ZooKeeper   | 协调服务,维护集群状态             |
| Worker      | JVM进程,运行Executor线程         |
| Executor    | 线程,运行一个或多个Task           |

数据流(Stream)作为基本抽象概念,通过Spout(数据源)和Bolt(处理单元)组成有向无环图(DAG),形成拓扑(Topology)。

### 1.3 对比其他流处理框架
与Spark Streaming、Flink等后起之秀相比,Storm具有以下特点:

- **延迟特性**:Storm实现真正的逐条处理(毫秒级延迟),而Spark Streaming采用微批处理(秒级延迟)
- **状态管理**:原生版本状态支持较弱(需自行实现),Trident扩展提供状态管理
- **Exactly-Once**:通过Trident可实现,但核心API仅保证At-Least-Once
- **成熟度**:作为最早的开源流处理系统,社区生态成熟但近年活跃度下降

## 二、环境准备与前置条件

### 2.1 Ubuntu系统要求
建议使用Ubuntu 18.04 LTS或20.04 LTS版本,最小化硬件配置:
```bash
# 查看系统版本
lsb_release -a

# 更新软件包
sudo apt update && sudo apt upgrade -y

2.2 Java环境配置

Storm需要Java 8或Java 11运行环境:

# 安装OpenJDK
sudo apt install openjdk-8-jdk -y

# 验证安装
java -version
javac -version

# 设置环境变量(示例)
echo 'export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64' >> ~/.bashrc
source ~/.bashrc

2.3 ZooKeeper安装

Storm依赖ZooKeeper进行协调服务,单机版安装步骤:

# 下载安装包(示例版本3.6.3)
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
tar -xzf apache-zookeeper-3.6.3-bin.tar.gz
cd apache-zookeeper-3.6.3-bin

# 配置zoo.cfg
cp conf/zoo_sample.cfg conf/zoo.cfg
sed -i 's/dataDir=\/tmp\/zookeeper/dataDir=\/var\/lib\/zookeeper/g' conf/zoo.cfg

# 启动服务
bin/zkServer.sh start

# 验证状态
bin/zkCli.sh -server 127.0.0.1:2181

三、Storm单机版安装部署

3.1 下载与解压Storm

推荐使用Storm 2.4.0版本(截至2023年最新稳定版):

wget https://archive.apache.org/dist/storm/apache-storm-2.4.0/apache-storm-2.4.0.tar.gz
tar -xzf apache-storm-2.4.0.tar.gz
cd apache-storm-2.4.0

3.2 配置storm.yaml详解

编辑conf/storm.yaml关键配置:

# ZooKeeper服务器列表
storm.zookeeper.servers:
  - "localhost"

# Nimbus工作目录(需预先创建)
nimbus.seeds: ["localhost"]
storm.local.dir: "/home/user/storm-data"

# Worker配置
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

# UI端口(默认8080)
ui.port: 8080

# 日志配置(可选)
worker.log.level: INFO

3.3 启动Nimbus和Supervisor

分别启动各个组件:

# 启动Nimbus(后台运行)
nohup bin/storm nimbus > /dev/null 2>&1 &

# 启动Supervisor
nohup bin/storm supervisor > /dev/null 2>&1 &

# 启动UI界面
nohup bin/storm ui > /dev/null 2>&1 &

# 验证进程
jps
# 应看到Nimbus、Supervisor、QuorumPeerMain等进程

四、拓扑开发与提交实战

4.1 创建WordCount拓扑

经典词频统计示例代码结构:

public class WordCountTopology {
    public static class SentenceSpout extends BaseRichSpout {
        private SpoutOutputCollector collector;
        private String[] sentences = {
            "storm tutorial example",
            "real time processing",
            "big data analytics"
        };
        
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
    }
    
    public static class SplitBolt extends BaseRichBolt {
        private OutputCollector collector;
        
        @Override
        public void execute(Tuple input) {
            String sentence = input.getString(0);
            for(String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }
    
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("sentence-spout", new SentenceSpout());
        builder.setBolt("split-bolt", new SplitBolt()).shuffleGrouping("sentence-spout");
        
        Config config = new Config();
        StormSubmitter.submitTopology("word-count", config, builder.createTopology());
    }
}

4.2 Maven项目配置

pom.xml关键依赖配置:

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>2.4.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

4.3 提交与监控拓扑

打包并提交拓扑:

mvn clean package
storm jar target/wordcount-topology-1.0.jar com.example.WordCountTopology

# 查看运行中拓扑
storm list

# 终止拓扑
storm kill word-count -w 10  # 等待10秒后关闭

通过UI界面(http://localhost:8080)可监控: - 拓扑执行概览 - Spout/Bolt吞吐量 - Worker资源使用 - 错误日志查看

五、常见问题排查指南

5.1 端口冲突解决方案

常见端口冲突及处理方法:

端口 服务 解决方案
8080 Storm UI 修改storm.yaml的ui.port
2181 ZooKeeper 检查zk配置或停止冲突服务
6700+ Worker slots 调整supervisor.slots.ports

5.2 资源不足错误处理

典型资源错误示例:

java.lang.OutOfMemoryError: Java heap space

优化方案: 1. 增加worker内存:

worker.heap.memory.mb: 2048
  1. 调整并行度:
builder.setBolt("split-bolt", new SplitBolt(), 4)  // 增加executor数量
       .shuffleGrouping("sentence-spout");

5.3 日志分析与调试技巧

日志文件位置:

logs/nimbus.log
logs/supervisor.log
logs/workers-artifacts/<topology-id>/<worker-port>/worker.log

使用Storm CLI调试:

# 查看特定拓扑日志
storm logs -n 100 word-count

# 远程调试(需提前配置)
bin/storm debug <topology-name>

六、性能优化建议

6.1 Worker与Executor调优

资源配置黄金法则: - 每个Worker分配1-2GB内存 - 每个物理核心对应2-3个Executor - 避免Worker跨NUMA节点

示例配置:

Config conf = new Config();
conf.setNumWorkers(4);  // 4个worker进程
builder.setBolt("bolt", new MyBolt(), 8)  // 8个executor
       .setNumTasks(16)  // 16个task
       .shuffleGrouping("spout");

6.2 消息可靠性保障

实现完全可靠处理需: 1. Spout实现可靠消息ID

collector.emit(new Values("data"), msgId);
collector.ack(msgId);
  1. Bolt锚定元组
collector.emit(input, new Values(word));
collector.ack(input);
  1. 配置ACK超时
topology.message.timeout.secs: 30

6.3 JVM参数优化

在storm.yaml中添加:

worker.childopts: "-Xmx2g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"

推荐GC策略: - 小内存(<4GB):UseParallelGC - 大内存:UseG1GC - 极低延迟:ZGC(需Java 11+)

七、扩展应用场景

7.1 结合Kafka的数据管道

构建Kafka-Spout实现高吞吐数据摄入:

builder.setSpout("kafka-spout", new KafkaSpout<>(getKafkaConfig()), 2);

private static KafkaSpoutConfig<String, String> getKafkaConfig() {
    return KafkaSpoutConfig.builder("kafka-broker:9092", "input-topic")
            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "storm-group")
            .setFirstPollOffsetStrategy(EARLIEST)
            .build();
}

7.2 机器学习实时预测

集成PMML模型实现实时评分:

public class ScoringBolt extends BaseRichBolt {
    private PMMLModel model;
    
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        model = PMMLUtil.loadModel("fraud-detection.pmml");
    }
    
    @Override
    public void execute(Tuple input) {
        double score = model.score(input.getValues());
        collector.emit(new Values(score));
    }
}

7.3 物联网数据处理

设备状态监控拓扑设计:

TemperatureSpout -> 
  FilterBolt(>50°C) -> 
    AlertBolt(SMS通知)
  AggregateBolt(5min窗口) -> 
    DashboardBolt

八、总结与展望

通过本指南,我们完成了从零开始搭建Storm单机环境的全过程,包括: 1. 基础环境准备(Java、ZooKeeper) 2. Storm核心组件配置与启动 3. 拓扑开发与提交全流程 4. 运维监控与性能优化

虽然新一代流处理框架(如Flink)在易用性和功能完整性上有所超越,但Storm仍然在以下场景具有独特价值: - 超低延迟要求的场景(<10ms) - 简单、稳定的流处理需求 - 已有Storm技术栈的遗留系统维护

未来发展方向建议: - 探索Storm与Kafka Streams的集成 - 尝试Trident API实现状态管理 - 考虑容器化部署(Docker/K8s)

附录: - Storm官方文档 - 示例代码仓库 - 性能调优白皮书 “`

推荐阅读:
  1. python环境搭建的示例分析
  2. Storm搭建

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

ubuntu storm

上一篇:aws和ubuntu如何添加新硬盘

下一篇:Ubuntu上如何安装CH340/341的驱动

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》