python连接有sasl认证的kafka

一直以来都是在用kafka-python这个库连接kafka,但现在公司对kafka做了安全升级,加入了sasl认证,sasl.mechanisms用的是SCRAM-SHA-256,kafka-python并不支持,谷歌了一下,可以换成confluent_kafka。

pip install confluent_kafka

生产端示例代码

import json
from datetime import datetime
from confluent_kafka import Producer


topic_name = 'python_test'

conf = {
    'bootstrap.servers': '10.110.18.214:8911,10.110.16.96:8911,10.110.19.242:8911',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': '432425',
    'sasl.password': '534456'
}


def delivery_report(err, msg):
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


producer = Producer(**conf)

data = {
    'name': 'sheng',
    'time': str(datetime.now())
}

for i in range(10):
    producer.produce(topic_name, (json.dumps(data)).encode() , callback=delivery_report)
    
producer.flush()

消费端代码

from confluent_kafka import Consumer


topic_name = 'python_test'
KAFKA_BROKER_SERVERS = "10.110.18.214:8911,10.110.16.96:8911,10.110.19.242:8911"
consumer = Consumer({
    'bootstrap.servers': KAFKA_BROKER_SERVERS,
    'group.id': 'test_sasl',
    'auto.offset.reset': 'earliest',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.mechanisms': 'SCRAM-SHA-256',
    'sasl.username': '432425',
    'sasl.password': '534456'
})

consumer.subscribe([topic_name])

while True:
    msg = consumer.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

consumer.close()

扫描关注, 与我技术互动

QQ交流群: 211426309

加入知识星球, 每天收获更多精彩内容

分享日常研究的python技术和遇到的问题及解决方案