要将Kafka数据写入Redis,可以按照以下步骤进行:
以下是一个示例代码,展示了如何将Kafka数据写入Redis:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.Properties;
public class KafkaToRedisExample {
private static final String KAFKA_BOOTSTRAP_SERVERS = "localhost:9092";
private static final String KAFKA_TOPIC = "your-kafka-topic";
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
public static void main(String[] args) {
// 创建Kafka消费者配置
Properties kafkaProps = new Properties();
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BOOTSTRAP_SERVERS);
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-redis-example-group");
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 创建Redis客户端
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
// 创建Kafka消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
// 持续从Kafka消费消息并写入Redis
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 解析Kafka消息
String key = record.key();
String value = record.value();
// 写入Redis
jedis.set(key, value);
System.out.println("Wrote to Redis: " + key + " - " + value);
}
// 提交消费位移
consumer.commitAsync();
}
}
}
请根据你的实际情况修改KAFKA_BOOTSTRAP_SERVERS
、KAFKA_TOPIC
、REDIS_HOST
和REDIS_PORT
等配置。这个示例代码使用了Kafka的Java客户端和Jedis库来连接Kafka和Redis。