首页
文章
留言
首页
文章
留言
PHP使用Kafka
2017 年 02 月 12 日
后端
PHP
Kafka
Kafka 是一种高吞吐的分布式消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。 #### 安装 librdkafka 库 ```plaintext git clone https://github.com/edenhill/librdkafka.git ./configure make && make install ``` #### 安装 php-rdkafka 扩展 ```plaintext php -m | grep kafka rdkafka ``` #### 生产者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); // 从终端接收输入 $oInputHandler = fopen('php://stdin', 'r'); while (true) { echo "\nEnter messages:\n"; $sMsg = trim(fgets($oInputHandler)); // 空消息意味着退出 if (empty($sMsg)) { break; } // 发送消息 $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); } echo "done\n"; ``` #### 消费者 ```php setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers('localhost:9092'); $oObjTopic = $objRdKafka->newTopic('test'); /** * consumeStart * 第一个参数标识分区,生产者是往分区0发送的消息,这里也从分区0拉取消息 * 第二个参数标识从什么位置开始拉取消息,可选值为 * RD_KAFKA_OFFSET_BEGINNING : 从开始拉取消息 * RD_KAFKA_OFFSET_END : 从当前位置开始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜测跟RD_KAFKA_OFFSET_END一样 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一个参数是分区,第二个参数是超时时间 $oMsg = $oObjTopic->consume(0, 1000); // 没拉取到消息时,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $oMsg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } } ```
0
相关文章
PHP常用函数总结
Redis、MemCache、MongoDB比较
Python爬虫之Beautiful Soup的使用
PHP开发之字符串处理
PHP开发之PDO使用总结
全部分类
前端
后端
运维
架构
算法
数据库
移动应用
桌面应用
程序开发
热门标签
NoSQL
CentOS
MongoDB
JavaScript
OpenResty
MySQL
Redis
CSS
Shell
Linux
爬虫
PHP
Python
Elasticsearch
iOS
多线程
Kubernetes
Git
Sphinx
Nginx
macOS
Android
Lua
HTML
Kafka
Supervisor
Objective-C
Docker
GUI
C++
Qt
Composer
热门文章
PHP使用Kafka
HTML5常用特性总结
iOS开发之Touch ID指纹解锁实例
iOS开发之收集崩溃信息
10种常见的软件架构模式
JavaScript常用函数总结
CSS设置图片水平及垂直居中
iOS开发之面向对象
Kubernetes介绍
OpenResty+Lua+Kafka收集日志