根据业务需求配置Ubuntu Kafka涉及多个步骤,包括安装和配置Kafka、Zookeeper,以及根据具体业务场景调整Kafka的配置参数。以下是一个详细的指南:
首先,确保你的Ubuntu系统上安装了Java和Zookeeper。
sudo apt update
sudo apt install openjdk-8-jdk
java -version
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar xvf zookeeper-3.4.6.tar.gz
sudo mv zookeeper-3.4.6 /usr/local/zookeeper
配置Zookeeper:
sudo cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
EOF
启动Zookeeper:
sudo /usr/local/zookeeper/bin/zkServer.sh start
下载并解压Kafka:
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -zvxf kafka_2.12-3.5.2.tgz
sudo mv kafka_2.12-3.5.2 /usr/local/kafka
配置Kafka:
编辑/usr/local/kafka/config/server.properties
文件,设置以下参数:
broker.id
: 每个Kafka Broker的唯一标识符。listeners
: Kafka Broker监听的网络地址和端口。log.dirs
: Kafka Broker用于存储消息日志的目录。示例配置:
broker.id=1
listeners=PLAINTEXT://your_server_ip:9092
log.dirs=/tmp/kafka-logs
启动Kafka:
sudo /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
创建一个Topic,用于生产者和消费者之间的通信:
sudo /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic your_topic_name
根据业务需求,可以配置Kafka生产者以发送消息到Kafka集群。以下是一个简单的生产者示例:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "your_server_ip:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("your_topic_name", "key", "value"));
producer.close();
}
}
根据业务需求,可以配置Kafka消费者以从Kafka集群接收消息。以下是一个简单的消费者示例:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "your_server_ip:9092");
props.put("group.id", "your_group_id");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("your_topic_name"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
根据具体的业务需求,可能需要调整以下配置参数:
acks
: 生产者确认消息已经被写入Leader的参数,可以是0
(不等待确认)、1
(等待Leader写入Leader)或all
(等待Leader写入所有ISR)。retries
: 生产者在遇到可恢复的错误时重试的次数。batch.size
: 生产者批量发送消息的大小。linger.ms
: 生产者在发送消息前等待更多消息加入批次的时间。buffer.memory
: 生产者可以使用的最大内存量。为了确保Kafka在系统重启后自动启动,可以配置开机自启:
sudo systemctl enable zookeeper
sudo systemctl enable kafka
sudo systemctl start zookeeper
sudo systemctl start kafka