第一步 下载 apache/kafka:4.2.0镜像
docker pull apache/kafka:4.2.0
第二步 新建文件夹kafka
mkdir kafka
进入文件夹kafka
cd kafka
第三步 新建docker-compose.yml文件
services:
kafka:
image: apache/kafka:4.2.0
container_name: kafka-4.2.0
ports:
- "9092:9092"
- "9003:9003"
environment:
- TZ=Asia/Shanghai
- LANG=C.UTF-8
- KAFKA_NODE_ID=1
- CLUSTER_ID=kafka-cluster
- KAFKA_PROCESS_ROLES=broker,controller
- KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
- KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
- KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
volumes:
- ./data:/var/lib/kafka/data
- ./config:/mnt/shared/config
- ./secrets:/etc/kafka/secrets
privileged: true
network_mode: "bridge"
第四步 启动容器
docker-compose up -d
可以通过docker ps查看kafka容器是否运行正常
服务器安装java
sudo apt-get install openjdk-17-jdk
#验证是否成功
java -version
# 返回版本号,就是成功,openjdk version "17.0.15" 2025-04-15
添加JAVA_HOME环境变量
#查看jdk安装目录
sudo update-alternatives --config java
#编辑环境
sudo vim /etc/environment
#添加JAVA_HOME
JAVA_HOME="/usr/lib/jvm/java-17-openjdk-amd64"
#生效配置
source /etc/environment
#检查 JAVA_HOME
echo $JAVA_HOME # 应返回配置的路径
上面步骤已经安装好kafka的服务端
下面介绍PHP端使用
1、安装rdkafka扩展
# 1. 安装 librdkafka(C 库)
## Ubuntu/Debian
sudo apt-get install librdkafka-dev
## CentOS/RHEL
sudo yum install librdkafka-devel
## macOS
brew install librdkafka
# 2. 安装 PHP 扩展
pecl install rdkafka
# 3. 启用扩展
echo "extension=rdkafka.so" >> /etc/php/8.1/cli/php.ini
PHP 开发生产者和消费者代码
生产者代码
<?php
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list','localhost:9092');
$conf->setErrorCb(function($kafka,$err,$reason){
echo "Kafka error:".rd_kafka_err2str($err)."(reason:".$reason.")".PHP_EOL;
});
$conf->setStatsCb(function($kafka,$json,$len){
echo "Kafka stats:".$json.PHP_EOL;
});
$producer = new \RdKafka\Producer($conf);
$topic = $producer->newTopic("wu");
for($i=0;$i<10;$i++){
$topic->produce(RD_KAFKA_PARTITION_UA,0,"Message $i");
$producer->flush(1000);
}
echo "message sent successfully".PHP_EOL;
消费者代码
<?php
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list','localhost:9092');
$conf->set('group.id','my-consumer-group');
$conf->set('auto.offset.reset','earliest');
$consumer = new \RdKafka\KafkaConsumer($conf);
$topicPartition = new \RdKafka\TopicPartition('wu',0);
$consumer->assign([$topicPartition]);
echo "等待消息中...\n";
while(true){
$message = $consumer->consume(5000);
switch($message->err){
case RD_KAFKA_RESP_ERR_NO_ERROR:
echo "收到".$message->payload."\n";
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "等待新消息...\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new Exception($message->errstr(),$message->err);
break;
}
}