一、先安装库Python 环境安装 confluent_kafka 库
pip install confluent-kafka
备注:记得运行的虚拟环境
python3 生产者代码
from confluent_kafka import Producer
import json
conf = {
'bootstrap.servers': 'localhost:9092',
'security.protocol': 'PLAINTEXT'
}
producer = Producer(**conf)
def delivery_report(err,msg):
if err is not None:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()}[{msg.partition()}]')
data = {'mo':'morrisonwu'}
producer.produce('p_topic',json.dumps(data).encode('utf-8'),callback=delivery_report)
producer.flush()
python3消费者代码
from confluent_kafka import Consumer, KafkaError,TopicPartition
import json
conf = {
'bootstrap.servers': ' localhost:9092',
'group.id': 'p_group',
'auto.offset.reset': 'earliest',
'security.protocol': 'PLAINTEXT'
}
consumer = Consumer(conf)
topic = "p_topic"
partition = 0
try:
consumer.assign([TopicPartition(topic=topic,partition=partition,offset=0)])
while True:
msg = consumer.poll(1.0)
if msg is None:
print('1')
continue
if msg.error():
print('error')
else:
print(f'收到消息:partition={msg.partition()},offset={msg.offset()}')
print(f'value:{msg.value().decode("utf-8") if msg.value() else None}')
except KeyboardInterrupt:
pass
finally:
consumer.close()