您好,登录后才能下订单哦!
kafka集群是把状态保存在zookeeper中的,首先要搭建zookeeper集群。
wget http://xxxxx.oss-cn-xxxx.aliyuncs.com/xxxx/jdk-8u171-linux-x64.rpm
yum localinstall jdk-8u171-linux-x64.rpm -y
wget http://xxx-xx.oss-cn-xxx.aliyuncs.com/xxx/kafka_2.12-1.1.0.tgz
官网下载链接:http://kafka.apache.org/downloads
解压kafka
tar -zxvf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0 kafka
直接使用kafka自带的zookeeper建立zk集群
cd /data/kafka
vim conf/zookeeper.properties
#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
#syncLimit:
这个配置项标识 Leader 与Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是5*2000=10秒
#dataDir:
快照日志的存储路径
#dataLogDir:需手动创建
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
进入dataDir目录,将三台服务器上的myid文件分别写入1、2、3。
myid是zk集群用来发现彼此的标识,必须创建,且不能相同。
echo "1" > /data/kafka/zk/myid
echo "2" > /data/kafka/zk/myid
echo "3" > /data/kafka/zk/myid
zookeeper不会主动的清除旧的快照和日志文件,需要定期清理。
#!/bin/bash
#snapshot file dir
dataDir=/data/kafka/zk/version-2
#tran log dir
dataLogDir=/data/kafka/log/zk/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。
进入kafka目录,执行zookeeper命令
cd /data/kafka
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper.log 2>&1 &
没有报错,而且jps查看有zk进程就说明启动成功了。
vim conf/server.properties
部分参数含义:
先每台设置host,listeners里要设置,否则后面消费消息会报错。
broker.id 每台都不能相同
num.network.threads 设置为cpu核数
num.partitions 分区数设置视情况而定,上面有讲分区数设置
default.replication.factor kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
nohup ./bin/kafka-server-start.sh config/server.properties > logs/kafka.log 2>&1 &
执行jps检查
./bin/kafka-topics.sh --create --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --replication-factor 2 --partitions 1 --topic test1
--replication-factor 2 #复制两份
--partitions 1 #创建1个分区
--topic #主题为test1
#模拟客户端去发送消息,生产者
./bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic test1
#模拟客户端去接受消息,消费者
./bin/kafka-console-consumer.sh --zookeeper kafka1:2181,kafka2:2181,kafka3:2181 --from-beginning --topic test1
#然后在生产者处输入任意内容,在消费端查看内容。
./bin/kafka-topics.sh --list --zookeeper xxxx:2181
#显示创建的所有topic
./bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic test1
#Topic:ssports PartitionCount:1 ReplicationFactor:2 Configs:
# Topic: test1 Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
#分区为为1 复制因子为2 他的 test1的分区为0
#Replicas: 0,1 复制的为0,1
修改配置文件server.properties添加如下配置:
delete.topic.enable=true
配置完重启kafka、zookeeper。
如果不想修改配置文件可删除topc及相关数据目录
#删除kafka topic
./bin/kafka-topics.sh --delete --zookeeper xxxx:2181,xxxx:2181 --topic test1
#删除kafka相关数据目录
rm -rf /data/kafka/log/kafka/test*
#删除zookeeper相关路径
rm -rf /data/kafka/zk/test*
rm -rf /data/kafka/log/zk/test*
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。