您好,登录后才能下订单哦!
1、首先安装kafka扩展
#安装librdkafka: 版本: https://github.com/edenhill/librdkafka/releases/tag/v0.9.2 $ git clone https://github.com/edenhill/librdkafka.git $ ./configure $ make $ sudo make install #安装 rdkafka.so 版本:https://github.com/arnaud-lb/php-rdkafka/releases/tag/3.0.1 $ git clone https://github.com/arnaud-lb/php-rdkafka.git $ cd php-rdkafka $ phpize $ ./configure $ make all -j 5 $ sudo make install
2、生产者代码示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test'); //topicname
$cf = new RdKafka\TopicConf();
$cf->set('offset.store.method', 'broker');
$cf->set('auto.offset.reset', 'smallest');
$rk = new RdKafka\Producer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname
for($i = 0; $i < 10; $i++) {
$topic->produce(0,0,'test' . $i);
}
3、消费者代码示例
$rcf = new RdKafka\Conf();
$rcf->set('group.id', 'test');
$rcf->set('broker.version.fallback', '0.8.2'); //brokername,kafkaversion
$cf = new RdKafka\TopicConf();
$cf->set('auto.offset.reset', 'smallest');
$cf->set('auto.commit.enable', true);
$rk = new RdKafka\Consumer($rcf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1"); //brokeraddr
$topic = $rk->newTopic("test", $cf); //topicname,topicobject
$topic->consumeStart(0,10); //partition,offset
$msg = $topic->consume(0, 1000); //partition,timeout
var_dump($msg);
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。