首页 资源列表 文章列表

ubuntu docker安装kafka4.2,及php的consumer和producer代码

第一步 下载 apache/kafka:4.2.0镜像

docker pull apache/kafka:4.2.0

第二步 新建文件夹kafka

mkdir kafka

进入文件夹kafka

cd kafka

第三步 新建docker-compose.yml文件

services:
  kafka:
    image: apache/kafka:4.2.0
    container_name: kafka-4.2.0
    ports:
      - "9092:9092"
      - "9003:9003"
    environment:
      - TZ=Asia/Shanghai
      - LANG=C.UTF-8
      - KAFKA_NODE_ID=1
      - CLUSTER_ID=kafka-cluster
      - KAFKA_PROCESS_ROLES=broker,controller
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
      - KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
    volumes:
      - ./data:/var/lib/kafka/data
      - ./config:/mnt/shared/config
      - ./secrets:/etc/kafka/secrets
    privileged: true
    network_mode: "bridge"

第四步 启动容器

docker-compose up -d

可以通过docker ps查看kafka容器是否运行正常

服务器安装java

sudo apt-get install openjdk-17-jdk

#验证是否成功

java -version

# 返回版本号,就是成功,openjdk version "17.0.15" 2025-04-15

添加JAVA_HOME环境变量

#查看jdk安装目录

sudo update-alternatives --config java

#编辑环境

sudo vim /etc/environment

#添加JAVA_HOME

JAVA_HOME="/usr/lib/jvm/java-17-openjdk-amd64"

#生效配置

source /etc/environment

#检查 JAVA_HOME

echo $JAVA_HOME  # 应返回配置的路径

上面步骤已经安装好kafka的服务端

下面介绍PHP端使用

1、安装rdkafka扩展

# 1. 安装 librdkafka(C 库)

## Ubuntu/Debian

sudo apt-get install librdkafka-dev


## CentOS/RHEL

sudo yum install librdkafka-devel


## macOS

brew install librdkafka


# 2. 安装 PHP 扩展

pecl install rdkafka


# 3. 启用扩展

echo "extension=rdkafka.so" >> /etc/php/8.1/cli/php.ini


PHP 开发生产者和消费者代码

生产者代码

<?php
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list','localhost:9092');
$conf->setErrorCb(function($kafka,$err,$reason){
        echo "Kafka error:".rd_kafka_err2str($err)."(reason:".$reason.")".PHP_EOL;
});
$conf->setStatsCb(function($kafka,$json,$len){
        echo "Kafka stats:".$json.PHP_EOL;
});

$producer = new \RdKafka\Producer($conf);
$topic = $producer->newTopic("wu");

for($i=0;$i<10;$i++){
        $topic->produce(RD_KAFKA_PARTITION_UA,0,"Message $i");
        $producer->flush(1000);
}
echo "message sent successfully".PHP_EOL;

消费者代码

<?php
$conf = new \RdKafka\Conf();
$conf->set('metadata.broker.list','localhost:9092');
$conf->set('group.id','my-consumer-group');
$conf->set('auto.offset.reset','earliest');

$consumer = new \RdKafka\KafkaConsumer($conf);
$topicPartition = new \RdKafka\TopicPartition('wu',0);
$consumer->assign([$topicPartition]);
echo "等待消息中...\n";
while(true){
        $message = $consumer->consume(5000);
        switch($message->err){
        case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo "收到".$message->payload."\n";
                break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "等待新消息...\n";
                break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
        default:
                throw new Exception($message->errstr(),$message->err);
                break;
        }
}