您好,登录后才能下订单哦!
这篇文章主要介绍“Hadoop如何安装配置”,在日常操作中,相信很多人在Hadoop如何安装配置问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Hadoop如何安装配置”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
1)在/home/atguigu/bin目录下创建脚本xcall.sh
vim xcall.sh
2)在脚本中编写如下内容
#! /bin/bash #1、判断是否输入参数 if [ $# -lt 1 ] then echo "必须输入至少一个参数..." exit fi #2、执行命令 for host in hadoop102 hadoop103 hadoop104 do echo "===================$host==================" ssh $host "$*" done #xcall.sh mkdir -p /xx/xx
3)修改脚本执行权限
chmod +x xcall.sh
4)启动脚本
xcall.sh jps
安装Hadoop
1)生产环境服务器磁盘情况
2)在hdfs-site.xml文件中配置多目录,注意新挂载磁盘的访问权限问题。
HDFS的DataNode节点保存数据的路径由dfs.datanode.data.dir参数决定,其默认值为file://${hadoop.tmp.dir}/dfs/data,若服务器有多个磁盘,必须对该参数进行修改。如服务器磁盘如上图所示,则该参数应修改为如下的值。
<property> <name>dfs.datanode.data.dir</name> <value>file:///dfs/data1,file:///hd2/dfs/data2,file:///hd3/dfs/data3,file:///hd4/dfs/data4</value> </property>
注意:每台服务器挂载的磁盘不一样,所以每个节点的多目录配置可以不一致。单独配置即可。
总结: 1、HDFS存储多目录 1、好处: 1、增大存储容量 2、提高IO、并发 2、如何实现存储多目录: 需要在hdfs-site.xml中配置 <property> <name>dfs.datanode.data.dir</name> <value>file://磁盘挂载点1/存储路径,file://磁盘挂载点2/存储路径</value> </property> 在真实的生产环境中,每天服务器磁盘数以及挂载点可能都不太一样,所以该配置需要在每台服务器上单独配置
数据可能累积在某个或者某个datanode节点上,或者服务器有多块磁盘的时候,数据累积在某一块磁盘上,所以此时可能会造成某些 节点或者磁盘负载压力会比较大,所以需要在节点以及磁盘上进行数据均衡 1、节点数据均衡 1、开启节点数据均衡: start-balancer --threshould N N代表节点之间磁盘的利用率不能超过N% 2、停止节点数据均衡: stop-balancer 2、磁盘的数据均衡 1、生成执行计划: hdfs diskbalancer -plan 主机名 <代表指定主机需要执行磁盘的数据均衡> 2、开启均衡: hdfs diskbalancer -execute 主机.plan.json 3、查看均衡进度: hdfs diskbalancer -query 主机名 4、停止均衡: hdfs diskbalancer -cancel 主机.plan.json 不管是节点数据均衡还是磁盘数据均衡都需要选择集群空闲的时候进行,因为均衡 的时候需要消耗大量的磁盘IO以及网络IO。
hadoop本身并不支持lzo压缩,如果想要支持lzo压缩,需要额外操作配置。
1)将编译好后的hadoop-lzo-0.4.20.jar 放入hadoop-3.1.3/share/hadoop/common/
[atguigu@hadoop102 common]$ pwd /opt/module/hadoop-3.1.3/share/hadoop/common [atguigu@hadoop102 common]$ ls hadoop-lzo-0.4.20.jar
2)同步hadoop-lzo-0.4.20.jar到hadoop103、hadoop104
[atguigu@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar
3)core-site.xml增加配置支持LZO压缩
<property> <name>io.compression.codecs</name> <value> org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value> </property> <property> <name>io.compression.codec.lzo.class</name> <value>com.hadoop.compression.lzo.LzoCodec</value> </property>
com.hadoop.compression.lzo.LzoCodec: 后续MR读取lzo压缩文件的时候不会切片 com.hadoop.compression.lzo.LzopCodec: 后续MR读取lzo压缩文件的时候会切片<前提是生成索引文件>
4)同步core-site.xml到hadoop103、hadoop104
[atguigu@hadoop102 hadoop]$ xsync core-site.xml
5)启动及查看集群
[atguigu@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh [atguigu@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。
hadoop jar /opt/module/hadoop/share/hadoop/common/hadoop-lzo-4.1.x.jar com.hadoop.compression.lzo.DistributedLzoIndexer 需要建索引的lzo文件名
1、HDFS吞吐量测试 ①、写吞吐量: hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles N -size 128MB //-nrFiles个数应该设置为集群所有CPU个数-1 ②、读吞吐量: hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles N -size 128MB 测试结果: 2020-04-16 13:41:24,724 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write 2020-04-16 13:41:24,724 INFO fs.TestDFSIO: Date & time: Thu Apr 16 13:41:24 CST 2020 2020-04-16 13:41:24,724 INFO fs.TestDFSIO: Number of files: 10 2020-04-16 13:41:24,725 INFO fs.TestDFSIO: Total MBytes processed: 1280 2020-04-16 13:41:24,725 INFO fs.TestDFSIO: Throughput mb/sec: 8.88 <单个maptask写的速率> 2020-04-16 13:41:24,725 INFO fs.TestDFSIO: Average IO rate mb/sec: 8.96 2020-04-16 13:41:24,725 INFO fs.TestDFSIO: IO rate std deviation: 0.87 2020-04-16 13:41:24,725 INFO fs.TestDFSIO: Test exec time sec: 67.61 HDFS读/写的吞吐量 = 单个maptask写的速率 * nrFiles ③、删除测试生成数据 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
2、MR计算性能测试:<使用Sort程序评测MapReduce> (1)使用RandomWriter来产生随机数,每个节点运行10个Map任务,每个Map产生大约1G大小的二进制随机数 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar randomwriter random-data (2)执行Sort程序 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar sort random-data sorted-data (3)验证数据是否真正排好序了 hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar testmapredsort -sortInput random-data -sortOutput sorted-data
1、namenode线程池的线程数调整: 原因:namenode中有一个线程池,线程池中的线程主要用于datanode与namenode心跳以及客户端获取/写入元数据的请求,所以如 果并发比较大或者datanode比较多,就可能导致线程池中线程不够用。通常需要增大参数dfs.namenode.handler.count的默认值10 如何调整: 在hdfs-site.xml中配置: <property> <name>dfs.namenode.handler.count</name> <value>10</value> </property> 调整为多少: 20 * log(集群机器台数) <以e为底> 2、调整nodemanager能够使用的资源数 原因: 服务器的资源不会全部给nodemanager使用,默认提供给nodemanager使用的内存8G,在真实生产中该默认值不够用需要进 行调整 如何调整: 在yarn-site.xml中配置: <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>1024</value> </property> 一般设置为服务器总内存的70%-80%
安装Zookeeper
#! /bin/bash #1、判断参数是否传入 if [ $# -lt 1 ] then echo "必须传入参数..." exit fi #2、根据参数执行对应的逻辑 case $1 in "start") for host in hadoop102 hadoop103 hadoop104 do echo "=======================start $host zookeeper===============" ssh $host "/opt/module/zookeeper/bin/zkServer.sh start" done ;; "stop") for host in hadoop102 hadoop103 hadoop104 do echo "=======================stop $host zookeeper===============" ssh $host "/opt/module/zookeeper/bin/zkServer.sh stop" done ;; "status") for host in hadoop102 hadoop103 hadoop104 do echo "=======================status $host zookeeper===============" ssh $host "/opt/module/zookeeper/bin/zkServer.sh status" done ;; *) echo "参数输入错误..." ;; esac
zookeeper常用指令 1、创建节点: create 节点路径 2、保存数据: set 路径 数据 3、获取数据: get 路径 4、查看节点: ls 路径 5、删除节点: 1、删除非空节点: deleteall 路径 2、删除空节点: delete 路径
安装Kafka
#! /bin/bash #1、判断参数是否传入 if [ $# -lt 1 ] then echo "必须传入参数..." exit fi #2、根据参数匹配逻辑 case $1 in "start") for host in hadoop102 hadoop103 hadoop104 do echo "=================start $host kafka==============" ssh $host "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties" done ;; "stop") for host in hadoop102 hadoop103 hadoop104 do echo "=================stop $host kafka==============" ssh $host "/opt/module/kafka/bin/kafka-server-stop.sh" done ;; "status") for host in hadoop102 hadoop103 hadoop104 do echo "=================status $host kafka==============" pid=$(ssh $host "ps -ef | grep server.properties | grep -v grep") [ "$pid" ] && echo "kafka进程正常" || echo "kafka进程不存在" done ;; *) echo "参数输入错误...." ;; esac
1、shell常用命令 1、topic相关 1、创建topic: bin/kafka-topics.sh --create --topic topic名称 --bootstrap-server hadoop102:9092,hadoop103:9092 --partitions 分区数 --replication-factor 副本数 2、查看集群所有topic: bin/kafka-topics.sh --list --bootstrap-server hadoop102:9092 3、查看topic详细信息: bin/kafka-topics.sh --describe --topic topic名称 --bootstrap-server hadoop102:9092 4、修改topic<只能修改分区,而且只能增加分区>: bin/kafka-topics.sh --alter --topic topic名称 --bootstrap-server hadoop102:9092 --partitions 分区数 5、删除topic: bin/kafka-topics.sh --delete --topic topic名称 --bootstrap-server hadoop102:9092 2、生产者相关: bin/kafka-console-producer.sh --topic topic名称 --broker-list hadoop102:9092 3、消费者相关: 1、消费数据:bin/kafka-console-consumer.sh --topic topic名称 --bootstrap-server hadoop102:9092 [--group 消费者id] [--from-beginning] 2、查看消费者组消费topic的进度: bin/kafka-consumer-group.sh --all-groups --all-topics --describe --bootstrap-server hadoop102:9092 4、数据相关: bin/kafka-dump-log.sh --files 待查看文件路径 --print-data-log
1、kafka吞吐量测试: 1、读吞吐量: bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic topic名称 --messages 总共拉取多少条数据 测试结果: start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153 开始测试时间,测试结束数据,共消费数据9.5368MB,吞吐量2.0714MB/s,共消费100010条,平均每秒消费21722.4153条。 MB.sec就是吞吐量测试结果 2、写吞吐量: bin/kafka-producer-perf-test.sh --record-size 指定每条数据大小 --topic topic名称 --num-records 写入topic数据条数 --producer-props bootstrap.servers=hadoop102:9092 --throughput 写入的速率[-1代表不限制] 100000 records sent, 95877.277085 records/sec (9.14 MB/sec), 187.68 ms avg latency, 424.00 ms max latency, 155 ms 50th, 411 ms 95th, 423 ms 99th, 424 ms 99.9th. 9.14 MB/sec就是吞吐量测试结果
Kafka机器数量(经验公式)=2*(峰值生产速度*副本数/100)+1 先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka的数量。 比如我们的峰值生产速度是50M/s。副本数为2。 Kafka机器数量=2*(50*2/100)+ 1=3台
1)创建一个只有1个分区的topic 2)测试这个topic的producer吞吐量和consumer吞吐量。 3)假设他们的值分别是Tp和Tc,单位可以是MB/s。 4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc) 例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s; 分区数=100 / 20 =5分区 https://blog.csdn.net/weixin_42641909/article/details/89294698 分区数一般设置为:3-10个
安装三台Flume(日志采集:102、103,消费kafka:104)
tailDirSource,kafkaChannel
(1)在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
[atguigu@hadoop102 conf]$ vim file-flume-kafka.conf
在文件配置如下内容
#1、定义agent、source、channel名称 a1.sources = r1 a1.channels = c1 #2、描述source<taildir> a1.sources.r1.type = TAILDIR #指定文件组的名称 a1.sources.r1.filegroups = f1 #指定组监控的目录 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* #指定断点续传文件 a1.sources.r1.positionFile = /opt/module/flume/position.json #指定一个批次采集多少数据 a1.sources.r1.batchSize = 100 #3、描述lan截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.ETLInterceptor$Builder #4、描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel #指定kafka集群地址 a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 #指定数据写入的topic名称 a1.channels.c1.kafka.topic = applog #数据写入kafka的时候是否以event格式写入: true=是 false: 不是,只写body数据 a1.channels.c1.parseAsFlumeEvent = false #5、关联source->channel a1.sources.r1.channels = c1
@Override public Event intercept(Event event) { byte[] body = event.getBody(); String log = new String(body, StandardCharsets.UTF_8); if (JSONUtils.isJSONValidate(log)) { return event; } else { return null; } } @Override public List<Event> intercept(List<Event> list) { Iterator<Event> iterator = list.iterator(); while (iterator.hasNext()){ Event next = iterator.next(); if(intercept(next)==null){ iterator.remove(); } } return list; }
#! /bin/bash #1、判断参数是否传入 if [ $# -lt 1 ] then echo "必须传入参数...." fi #2、根据参数匹配逻辑 case $1 in "start") for host in hadoop102 hadoop103 do echo "===============开启$host服务器采集===============" ssh $host "nohup /opt/module/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/module/flume-1.9.0/conf/ -f /opt/module/flume-1.9.0/jobs/file_to_kafka.conf -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume-1.9.0/logs 2>&1 &" done ;; "stop") for host in hadoop102 hadoop103 do echo "===============停止$host服务器采集===============" ssh $host "ps -ef |grep file_to_kafka.conf | grep -v grep | awk '{print \$2}'| xargs kill -9" done ;; *) echo "参数输入错误..." ;; esac
kafkaSource、fileChannel/memoryChannel、hdfsSink
@Override public Event intercept(Event event) { Map<String, String> headers = event.getHeaders(); String log = new String(event.getBody(), StandardCharsets.UTF_8); JSONObject jsonObject = JSONObject.parseObject(log); String ts = jsonObject.getString("ts"); headers.put("timestamp", ts); return event; } @Override public List<Event> intercept(List<Event> list) { events.clear(); for (Event event : list) { events.add(intercept(event)); } return events; }
#1、定义agent、channel、source、sink名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 #2、描述source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource #指定kafka集群地址 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 #指定从哪个topic读取数据 a1.sources.r1.kafka.topics = applog #指定消费者组的id a1.sources.r1.kafka.consumer.group.id = g1 #指定source从kafka一个批次拉取多少条消息: batchSize<=事务容量<=channel容量 a1.sources.r1.batchSize = 100 #指定消费者组第一个消费topic的数据的时候从哪里开始消费 a1.sources.r1.kafka.consumer.auto.offset.reset = earliest #3、描述lan截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.interceptor.MyTimestampInterceptor$Builder #4、描述channel a1.channels.c1.type = file #指定数据保存在本地磁盘哪个目录 a1.channels.c1.dataDirs = /opt/module/flume/datas #指定channel内存中event的指针数据 a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint #指定checkpoint的持久化的间隔时间 a1.channels.c1.checkpointInterval = 5000 #指定channel容量 a1.channels.c1.capacity = 1000000 #5、描述sink a1.sinks.k1.type = hdfs #指定数据存储目录 a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/applog/%Y%m%d #指定文件的前缀 a1.sinks.k1.hdfs.filePrefix = log- #指定滚动生成文件的时间间隔 a1.sinks.k1.hdfs.rollInterval = 30 #指定滚动生成文件的大小 a1.sinks.k1.hdfs.rollSize = 132120576 #指定写入多少个event之后滚动生成新文件<一般禁用> a1.sinks.k1.hdfs.rollCount = 0 #指定sink每个批次从channel拉取多少数据 a1.sinks.k1.hdfs.batchSize = 100 #指定写入hdfs的时候压缩格式 #a1.sinks.k1.hdfs.codeC = lzop #指定文件写入的是格式 SequenceFile-序列化文件, DataStream-文本文件, CompressedStream-压缩文件 #a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.fileType = DataStream #6、关联source->channel->sink a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
1)在/home/atguigu/bin目录下创建脚本f2.sh
[atguigu@hadoop102 bin]$ vim f2.sh
在脚本中填写如下内容
#! /bin/bash case $1 in "start"){ for i in hadoop104 do echo " --------启动 $i 消费flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log2.txt 2>&1 &" done };; "stop"){ for i in hadoop104 do echo " --------停止 $i 消费flume-------" ssh $i "ps -ef | grep kafka-flume-hdfs | grep -v grep |awk '{print \$2}' | xargs -n1 kill" done };; esac
2)增加脚本执行权限
[atguigu@hadoop102 bin]$ chmod u+x f2.sh
3)f2集群启动脚本
[atguigu@hadoop102 module]$ f2.sh start
4)f2集群停止脚本
[atguigu@hadoop102 module]$ f2.sh stop
flume默认的内存最大是2000M,一般在项目中需要设置为4G左右 Xms: 启动内存大小 Xmx: 运行时最大内存大小 Xms与Xmx在设置的时候最好设置的一样。因为设置的不一样,启动时Xms内存比较小,导致很快内存不够用,需要进行内存的扩容<在扩容 之前会进行GC>,GC会影响性能。两者设置为一样就不会出现内存扩容,减少GC次数
到此,关于“Hadoop如何安装配置”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。