博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
php rdkafka扩展发送和接收消息
阅读量:6553 次
发布时间:2019-06-24

本文共 1266 字,大约阅读时间需要 4 分钟。

hot3.png

发送消息

set('group.id', 'test');    $cf = new RdKafka\TopicConf();    $cf->set('offset.store.method', 'broker');    $cf->set('auto.offset.reset', 'smallest');    $rk = new RdKafka\Producer($rcf);    $rk->setLogLevel(LOG_DEBUG);    $rk->addBrokers("127.0.0.1");    $topic = $rk->newTopic("test", $cf);    for($i = 0; $i < 1000; $i++) {        $topic->setMessage(0, 'test' . $i);    }} catch (Exception $e) {    echo $e->getMessage();}

接收消息

set('group.id', 'test'); $cf = new RdKafka\TopicConf();/* $cf->set('offset.store.method', 'file');*/ $cf->set('auto.offset.reset', 'smallest'); $cf->set('auto.commit.enable', true); $rk = new RdKafka\Consumer($rcf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $topic = $rk->newTopic("test", $cf); //$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); while (true) { $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); $msg = $topic->consume(0, 1000); var_dump($msg); if ($msg->err) { echo $msg->errstr(), "\n"; break; } else { echo $msg->payload, "\n"; } $topic->consumeStop(); sleep(1); }} catch (Exception $e) { echo $e->getMessage();}

 

转载于:https://my.oschina.net/u/589570/blog/781674

你可能感兴趣的文章
yarn磁盘监控参数
查看>>
jquery javascript获得网页的高度和宽度
查看>>
mysql设置自增id清零 auto_increment
查看>>
linux(Centos7)服务器硬件改动,进入Emergency模式
查看>>
Easyui Treegrid 分页解决方案
查看>>
hihoCoder1038
查看>>
hdu3976(Electric resistance) 高斯消元
查看>>
BZOJ3874:[AHOI2014&JSOI2014]宅男计划(爬山法)
查看>>
【模板】NTT
查看>>
413. Arithmetic Slices(数组中等差递增子区间的个数)
查看>>
unity调用打印机打印
查看>>
python爬虫知识点总结(二十七)Scrapy分布式原理以及Scrapy-Reids源码解析
查看>>
redux&&createStore
查看>>
开发文档学习英文
查看>>
c++ 类的继承与派生
查看>>
[转] Sublime Text3 配置 NodeJs 环境
查看>>
【leetcode】449. Serialize and Deserialize BST
查看>>
HTTP-web服务器接收到client请求后的处理过程(很详细)
查看>>
Mobile开发之meta篇
查看>>
Flutter-BLoC-第二讲
查看>>