本文讲述的是kafka4.2服务端对应的windows系统php的rdkafka的安装
一、下载对应的rdkafka文件
下载网址
https://pecl.php.net/package/rdkafka/6.0.5/windows
根据自己PHP 版本 ts或NTS,x64 等信息下载对应的dll文件,否则扩展安装不成功
注意里面的librdkafka.dll文件版本需用最新的,我用1.6的不支持kafka4.2,可以下载php8.5里面的librdkafka.dll
二、将保存的文件放入对应的文件夹
php --ini
看看php.ini文件在哪里
将librdkafka.dll放入php的根目录,将php_rdkafka.dll 放入ext文件夹
如果是phpstuy,可以开启rdkafka扩展
用phpinfo()或者 php -m查看是否存在rdkafka扩展,如果存在说明扩展安装成功
php 生产者 kafka_producer.php
<?php
$conf = new \RdKafka\Conf();
// 配置Kafka broker地址
$conf->set('metadata.broker.list', 'localhost:9092');
// 设置错误回调函数
$conf->setErrorCb(function ($kafka, $err, $reason) {
echo "Kafka error: " . rd_kafka_err2str($err) . " (reason: " . $reason . ")" . PHP_EOL;
});
// 设置统计回调函数 (可选)
$conf->setStatsCb(function ($kafka, $json, $len) {
echo "Kafka Stats: " . $json . PHP_EOL;
});
// 创建Producer对象
$producer = new \RdKafka\Producer($conf);
// 选择Topic
$topic = $producer->newTopic("my_topic");
// 发送消息
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->flush(1000); // 等待1秒
}
echo "Messages sent successfully!" . PHP_EOL;
php消费者 kafka_consumer.php
<?php
// 1. 创建配置对象
$conf = new \RdKafka\Conf();
// 设置 Kafka 服务器地址
$conf->set('metadata.broker.list', 'localhost:9092');
// 设置消费者组ID,这是非常重要的配置
$conf->set('group.id', 'my-consumer-group');
// 如果没有初始偏移量,则从最早的消息开始消费
$conf->set('auto.offset.reset', 'earliest');
// 【关键配置】禁用自动偏移量存储,以兼容旧版 Broker
$conf->set('enable.auto.offset.store', 'false');
// 2. 创建消费者实例
$consumer = new \RdKafka\KafkaConsumer($conf);
// 3. 订阅一个或多个主题
$consumer->subscribe(['my_topic']); // 可以传入数组订阅多个主题,如 ['topic1', 'topic2']
echo "等待消息中... (按 Ctrl+C 停止)\n";
// 4. 循环消费消息
while (true) {
// consume() 方法的参数是超时时间(毫秒)
$message = $consumer->consume(5000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
// 成功接收到消息
echo "收到消息: " . $message->payload . "\n";
// 处理你的业务逻辑...
// 如果关闭了自动提交,需要在这里手动提交偏移量
// $consumer->commit($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
// 已到达分区的末尾,暂时没有新消息
echo "等待新消息...\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
// 等待消息超时,这是正常现象,循环会继续
echo 'Timed out\n';
break;
default:
// 处理其他错误
throw new Exception($message->errstr(), $message->err);
break;
}
}
?>启动生产
php kafka_producer.php
启动消费
php kafka_consumer.php