在Ubuntu上搭建Kafka集群的步骤如下:
sudo apt update
sudo apt install openjdk-8-jdk
java -version
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.13-2.5.0.tgz
tar -xvf kafka_2.13-2.5.0.tgz
cd kafka_2.13-2.5.0
sudo mkdir -p /opt/zookeeper/data
sudo mkdir -p /opt/zookeeper/log
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
nano /opt/zookeeper/conf/zoo.cfg
修改以下配置:dataDir=/opt/zookeeper/data
dataLogDir=/opt/zookeeper/log
clientPort=2181
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888
bin/zookeeper-server-start.sh config/zoo.cfg
修改Kafka配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
cp config/server.properties config/server-3.properties
修改以下配置:
broker.id:每台机器不同listeners:例如 listeners=PLAINTEXT://192.168.1.100:9092advertised.listeners:例如 advertised.listeners=PLAINTEXT://192.168.1.100:9092zookeeper.connect:例如 zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181log.dirs:例如 /tmp/kafka-logsnum.network.threads:例如 3num.io.threads:例如 8socket.send.buffer.bytes:例如 1048576socket.receive.buffer.bytes:例如 1048576socket.request.max.bytes:例如 104857600log.flush.interval.messages:例如 1000log.flush.interval.ms:例如 1000log.segment.bytes:例如 1073741824log.retention.hours:例如 168log.retention.check.interval.ms:例如 300000log.cleaner.min.compaction.lag.ms:例如 10000log.cleaner.max.compaction.lag.ms:例如 900000log.cleaner.花椒.max.size:例如 10737418240num.partitions:例如 1default.replication.factor:例如 1min.insync.replicas:例如 1replica.lag.time.max.ms:例如 10000replica.lag.max.messages:例如 100000message.max.bytes:例如 1000000replica.fetch.max.bytes:例如 1000000fetch.min.bytes:例如 1048576fetch.max.wait.ms:例如 100max.partition.fetch.bytes:例如 1048576num.recovery.threads.per.data.dir:例如 1transaction.timeout.ms:例如 180000transaction.state.log.replication.factor:例如 1transaction.state.log.min.isr:例如 1transaction.state.log.max.size.mb:例如 100transactional.id.max.bytes:例如 900000000auto.commit.interval.ms:例如 1000auto.commit.delta.bytes:例如 1048576group.initial.rebalance.delay.ms:例如 0partition.assignment.strategy:例如 org.apache.kafka.common.cluster.assign.RangeAssignornum.network.threads:例如 3num.io.threads:例如 8socket.send.buffer.bytes:例如 1048576socket.receive.buffer.bytes:例如 1048576socket.request.max.bytes:例如 104857600log.flush.interval.messages:例如 1000log.flush.interval.ms:例如 1000log.segment.bytes:例如 1073741824log.retention.hours:例如 168log.retention.check.interval.ms:例如 300000log.cleaner.min.compaction.lag.ms:例如 10000log.cleaner.max.compaction.lag.ms:例如 900000log.cleaner.花椒.max.size:例如 10737418240num.partitions:例如 1default.replication.factor:例如 1min.insync.replicas:例如 1replica.lag.time.max.ms:例如 10000replica.lag.max.messages:例如 100000message.max.bytes:例如 1000000replica.fetch.max.bytes:例如 1000000fetch.min.bytes:例如 1048576fetch.max.wait.ms:例如 100max.partition.fetch.bytes:例如 1048576num.recovery.threads.per.data.dir:例如 1transaction.timeout.ms:例如 180000transaction.state.log.replication.factor:例如 1transaction.state.log.min.isr:例如 1transaction.state.log.max.size.mb:例如 100transactional.id.max.bytes:例如 900000000auto.commit.interval.ms:例如 1000auto.commit.delta.bytes:例如 1048576group.initial.rebalance.delay.ms:例如 0partition.assignment.strategy:例如 org.apache.kafka.common.cluster.assign.RangeAssignor启动Kafka:
bin/kafka-server-start.sh config/