您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Java代码之Kafka生产者简单介绍
## 一、Kafka生产者概述
Apache Kafka是一个分布式流处理平台,以其高吞吐量、低延迟和可扩展性著称。在Kafka生态系统中,**生产者(Producer)** 是负责向Kafka主题(Topic)发布消息的客户端组件。Java作为Kafka的主要开发语言,提供了完善的客户端API供开发者使用。
### 1.1 生产者核心功能
- 消息序列化:将Java对象转换为字节数组
- 分区选择:决定消息发送到Topic的哪个分区
- 消息缓冲:批量发送提高吞吐量
- 错误处理:自动重试和错误恢复机制
- 负载均衡:自动感知集群节点变化
### 1.2 生产者工作流程
1. 初始化生产者实例
2. 构建待发送的消息对象
3. 序列化消息键值对
4. 选择目标分区
5. 将消息添加到缓冲区
6. 后台线程批量发送消息
7. 接收服务器响应并处理结果
## 二、基础生产者实现
### 2.1 必要依赖
```xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// 1. 配置生产者参数
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 3. 创建消息记录
ProducerRecord<String, String> record =
new ProducerRecord<>("test-topic", "message-key", "Hello Kafka!");
// 4. 发送消息(异步)
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Message sent to partition %d, offset %d%n",
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
// 5. 关闭生产者
producer.close();
}
}
表示要发送的单个消息记录,包含: - topic:目标主题名称 - partition:可选的目标分区号 - key:消息键(用于分区计算) - value:实际消息内容 - timestamp:可选的时间戳
异步发送的回调接口,当消息被确认收到或发送失败时触发:
public interface Callback {
void onCompletion(RecordMetadata metadata, Exception exception);
}
配置项 | 说明 | 示例值 |
---|---|---|
bootstrap.servers | Kafka集群地址列表 | “host1:9092,host2:9092” |
key.serializer | 键序列化类 | StringSerializer.class |
value.serializer | 值序列化类 | ByteArraySerializer.class |
// 消息确认模式
props.put("acks", "all"); // "0", "1", "all"
// 重试次数
props.put("retries", 3);
// 批量发送大小(字节)
props.put("batch.size", 16384);
// 发送等待时间(毫秒)
props.put("linger.ms", 100);
// 缓冲区内存大小
props.put("buffer.memory", 33554432);
// 最大请求大小
props.put("max.request.size", 1048576);
// 压缩类型
props.put("compression.type", "snappy"); // "gzip", "lz4", "zstd"
吞吐量优先:
batch.size
和linger.ms
compression.type
buffer.memory
低延迟优先:
linger.ms=1
batch.size
可靠性优先:
acks=all
retries
enable.idempotence=true
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 示例:按key的哈希值取模
return Math.abs(key.hashCode()) % numPartitions;
}
@Override public void close() {}
@Override public void configure(Map<String, ?> configs) {}
}
// 使用方式
props.put("partitioner.class", "com.example.CustomPartitioner");
props.put("enable.idempotence", "true");
props.put("transactional.id", "my-transactional-id");
// 事务使用示例
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic1", "value1"));
producer.send(new ProducerRecord<>("topic2", "value2"));
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
ProducerRecord<String, String> record =
new ProducerRecord<>("my-topic", "key", "value");
// 添加消息头
record.headers().add("trace-id", "12345".getBytes());
record.headers().add("source-system", "ERP".getBytes());
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// 可重试异常处理
} else {
// 不可重试异常处理
logger.error("Send failed", exception);
}
}
});
Kafka生产者暴露的关键JMX指标:
- request-rate
:请求发送速率
- request-latency-avg
:平均请求延迟
- record-send-rate
:记录发送速率
- record-error-rate
:错误记录率
- batch-size-avg
:平均批次大小
确保正确关闭生产者释放资源:
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
producer.flush(); // 清空缓冲区
producer.close(Duration.ofSeconds(30)); // 优雅关闭
}));
max.in.flight.requests.per.connection=1
enable.idempotence=true
网络瓶颈:
request-latency-avg
CPU瓶颈:
磁盘瓶颈:
public class ProductionReadyProducer {
private static final Logger logger = LoggerFactory.getLogger(ProductionReadyProducer.class);
private static final String TOPIC = "orders";
private KafkaProducer<String, Order> producer;
public void init() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
this.producer = new KafkaProducer<>(props);
}
public void sendOrder(Order order) {
ProducerRecord<String, Order> record =
new ProducerRecord<>(TOPIC, order.getOrderId(), order);
producer.send(record, (metadata, ex) -> {
if (ex != null) {
logger.error("Failed to send order {}", order.getOrderId(), ex);
// 实际项目中应有重试或补偿逻辑
} else {
logger.info("Order {} sent to partition {} at offset {}",
order.getOrderId(), metadata.partition(), metadata.offset());
}
});
}
public void shutdown() {
try {
producer.flush();
producer.close(Duration.ofSeconds(30));
} catch (Exception e) {
logger.error("Exception during shutdown", e);
}
}
// 优雅关闭钩子
static {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (instance != null) instance.shutdown();
}));
}
}
通过本文的介绍,您应该已经掌握了Kafka生产者的核心概念和Java实现方法。实际应用中,需要根据具体业务场景调整配置参数,并建立完善的监控体系,才能充分发挥Kafka的高性能特性。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。