在CentOS上集成Hadoop分布式文件系统(HDFS)与Apache Kafka,通常涉及将Kafka作为数据的生产者或消费者,并将数据写入HDFS或从HDFS读取数据。以下是一个简化的应用案例,展示如何使用Kafka将数据写入HDFS。
kafka-topics.sh --create --topic order-created-topic --partitions 12 --replication-factor 3 --bootstrap-server localhost:9092
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("order-created-topic", orderId, orderJson));
producer.close();
SparkConf conf = new SparkConf().setAppName("Kafka to HDFS");
conf.setMaster("local[*]");
JavaPairRDD<String, String> lines = KafkaUtils.createDirectStream(
conf,
"order-created-topic",
new StringDeserializer(),
new StringDeserializer()
).mapToPair(record -> new Tuple2<>(record.value(), record.key()));
lines.saveAsHadoopFile("/path/to/hdfs/directory",
new TextOutputFormat<String, String>(),
"org.apache.hadoop.mapred.lib.MultipleTextOutputFormat",
new Configuration(false)
);
请注意,上述代码示例和配置可能需要根据实际环境进行调整。在实际应用中,还需要考虑数据的序列化方式、错误处理、资源管理等因素。此外,对于生产环境,还需要考虑安全性配置,如SSL/TLS加密和身份验证。