在PHP中使用rdkafka处理消息持久化的方法如下:
extension=rdkafka.so
<?php
require_once 'vendor/autoload.php';
use RdKafka\Producer;
use RdKafka\Conf;
$conf = new Conf();
$conf->set('bootstrap.servers', 'localhost:9092');
$conf->set('message.timeout.ms', '30000');
$conf->set('delivery.timeout.ms', '120000');
$conf->set('retries', '3');
$conf->set('offset.store.type', 'file'); // 设置offset存储类型为文件
$conf->set('offset.store.path', '/tmp/kafka-consumer-offsets'); // 设置offset存储路径
$producer = new Producer($conf);
$producer->start(true);
在这个示例中,我们设置了offset.store.type
为file
,并将offset.store.path
设置为/tmp/kafka-consumer-offsets
。这将使得Kafka将消费者的offset持久化到本地文件系统中。
<?php
$producer->send([
[
'topic' => 'test_topic',
'value' => 'Hello, Kafka!',
'key' => '',
],
]);
$producer->stop();
通过以上步骤,你已经成功配置了PHP rdkafka以持久化消息。当消费者消费消息时,它们的offset将被存储在指定的路径中,以便在程序崩溃或重启后能够继续消费。