Kafka生产者配置是确保消息可靠、高效传输的关键。以下是一些主要的配置要点:
必选配置项
- bootstrap.servers:指定broker的地址清单,以host:port形式。建议至少指定两个broker,以便一个宕机时仍可连接。
- key.serializer 和 value.serializer:指定发送消息的key和value的序列化类型。必须写全类名。
关键基础配置的含义
- acks:指定有多少个分区副本收到消息,生产者才认为消息发送成功。对消息丢失的可能性有重要影响。
acks=0
:生产者发送消息后,不会等待broker的响应,可能导致消息丢失。
acks=1
:只要broker集群的首领节点收到消息,生产者就会收到成功响应,可能存在消息丢失风险。
acks=all
:只有当所有参与分区复制的节点都收到消息时,生产者才会收到成功响应,最安全但延迟最高。
- buffer.memory:配置生产者内存缓冲区的大小,用于缓冲要发送到broker的消息。
- compression.type:指定消息压缩方式,如snappy、gzip等,可以降低网络传输开销和存储开销。
- retries:生产者从broker收到可重试的错误时的重试次数。
- batch.size:发送到同一个主题、同一个分区的消息,生产者会将其放到同一个批次中,此参数指定了一个批次可以使用的内存大小。
- max.in.flight.requests.per.connection:此参数指定了生产者在收到broker响应之前可以发送多少个小希,值越高吞吐量越大但会占用越多的内存。
- max.request.size:指定了生产者发送请求的大小上限。
其他重要配置
- linger.ms:指定生产者发送ProducerBatch之前等待ProducerRecord加入ProducerBatch的时间,增大该参数会增加消息延迟,但是能提升一定的吞吐量。
- receive.buffer.bytes 和 send.buffer.bytes:设置socket接收和发送消息缓冲区的大小。
- request.timeout.ms:设置Producer等待请求响应的最长时间。
- metadata.fetch.timeout.ms:Producer在获取元数据时等待服务器返回响应的时间。
生产者发送消息方式
- 发送并忘记:生产者把消息发送给broker,但不关注消息是否正常达到。
- 同步发送:使用send()方法发送消息,会返回一个Future对象,通过其get方法可同步等待broker的响应。
- 异步发送:使用send()方法发送消息,指定一个回调函数,broker在返回时会调用其回调函数。
以上配置要点涵盖了Kafka生产者的核心配置,根据具体的应用场景和需求,可能还需要进行更多的调优和配置。