首页 资源列表 文章列表

python3写kafka4.2的消费者和生产者基础代码

一、先安装库Python 环境安装 confluent_kafka 库

      pip install confluent-kafka

      备注:记得运行的虚拟环境

python3 生产者代码

from confluent_kafka import Producer
import json

conf = {
        'bootstrap.servers': 'localhost:9092',
        'security.protocol': 'PLAINTEXT'
        }

producer = Producer(**conf)

def delivery_report(err,msg):
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()}[{msg.partition()}]')

data = {'mo':'morrisonwu'}
producer.produce('p_topic',json.dumps(data).encode('utf-8'),callback=delivery_report)

producer.flush()

python3消费者代码

from confluent_kafka import Consumer, KafkaError,TopicPartition
import json

conf = {
        'bootstrap.servers': ' localhost:9092',
        'group.id': 'p_group',
        'auto.offset.reset': 'earliest',
        'security.protocol': 'PLAINTEXT'
        }
consumer = Consumer(conf)
topic = "p_topic"
partition = 0

try:
    consumer.assign([TopicPartition(topic=topic,partition=partition,offset=0)])
    while True:
        msg = consumer.poll(1.0)
        if msg is None:
            print('1')
            continue
        if msg.error():
            print('error')
        else:
            print(f'收到消息:partition={msg.partition()},offset={msg.offset()}')
            print(f'value:{msg.value().decode("utf-8") if msg.value() else None}')
except KeyboardInterrupt:
    pass
finally:
    consumer.close()