在Ubuntu上搭建Kafka集群的步骤如下:
sudo apt update
sudo apt install openjdk-8-jdk
java -version
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -xvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
sudo mkdir -p /opt/zookeeper/data
sudo mkdir -p /opt/zookeeper/log
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
nano /opt/zookeeper/conf/zoo.cfg
修改以下配置:dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/log
clientPort=2181
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888
bin/zookeeper-server-start.sh config/zoo.cfg
修改Kafka配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
修改以下配置:
broker.id
:每台机器不同listeners
:例如 listeners=PLAINTEXT://192.168.1.100:9092
advertised.listeners
:例如 advertised.listeners=PLAINTEXT://192.168.1.100:9092
zookeeper.connect
:例如 zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
log.dirs
:例如 /tmp/kafka-logs
num.network.threads
:例如 3
num.io.threads
:例如 8
socket.send.buffer.bytes
:例如 1048576
socket.receive.buffer.bytes
:例如 1048576
socket.request.max.bytes
:例如 104857600
log.flush.interval.messages
:例如 1000
log.flush.interval.ms
:例如 1000
log.segment.bytes
:例如 1073741824
log.retention.hours
:例如 168
log.retention.check.interval.ms
:例如 300000
log.cleaner.min.compaction.lag.ms
:例如 10000
log.cleaner.max.compaction.lag.ms
:例如 900000
log.cleaner.花椒.max.size
:例如 10737418240
num.partitions
:例如 1
default.replication.factor
:例如 1
min.insync.replicas
:例如 1
replica.lag.time.max.ms
:例如 10000
replica.lag.max.messages
:例如 100000
message.max.bytes
:例如 1000000
replica.fetch.max.bytes
:例如 1000000
fetch.min.bytes
:例如 1048576
fetch.max.wait.ms
:例如 100
max.partition.fetch.bytes
:例如 1048576
num.recovery.threads.per.data.dir
:例如 1
transaction.timeout.ms
:例如 180000
transaction.state.log.replication.factor
:例如 1
transaction.state.log.min.isr
:例如 1
transaction.state.log.max.size.mb
:例如 100
transactional.id.max.bytes
:例如 900000000
auto.commit.interval.ms
:例如 1000
auto.commit.delta.bytes
:例如 1048576
group.initial.rebalance.delay.ms
:例如 0
partition.assignment.strategy
:例如 org.apache.kafka.common.cluster.assign.RangeAssignor
num.network.threads
:例如 3
num.io.threads
:例如 8
socket.send.buffer.bytes
:例如 1048576
socket.receive.buffer.bytes
:例如 1048576
socket.request.max.bytes
:例如 104857600
log.flush.interval.messages
:例如 1000
log.flush.interval.ms
:例如 1000
log.segment.bytes
:例如 1073741824
log.retention.hours
:例如 168
log.retention.check.interval.ms
:例如 300000
log.cleaner.min.compaction.lag.ms
:例如 10000
log.cleaner.max.compaction.lag.ms
:例如 900000
log.cleaner.花椒.max.size
:例如 10737418240
num.partitions
:例如 1
default.replication.factor
:例如 1
min.insync.replicas
:例如 1
replica.lag.time.max.ms
:例如 10000
replica.lag.max.messages
:例如 100000
message.max.bytes
:例如 1000000
replica.fetch.max.bytes
:例如 1000000
fetch.min.bytes
:例如 1048576
fetch.max.wait.ms
:例如 100
max.partition.fetch.bytes
:例如 1048576
num.recovery.threads.per.data.dir
:例如 1
transaction.timeout.ms
:例如 180000
transaction.state.log.replication.factor
:例如 1
transaction.state.log.min.isr
:例如 1
transaction.state.log.max.size.mb
:例如 100
transactional.id.max.bytes
:例如 900000000
auto.commit.interval.ms
:例如 1000
auto.commit.delta.bytes
:例如 1048576
group.initial.rebalance.delay.ms
:例如 0
partition.assignment.strategy
:例如 org.apache.kafka.common.cluster.assign.RangeAssignor
启动Kafka:
bin/kafka-server-start.sh config/