confluent-kafka入门教程
文章目录
- 官方文档
- 与kafka-python的对比
- 配置
- 文档
- 配置项
- Producer代码示例
- Consumer代码示例
官方文档
confluent_kafka API — confluent-kafka 2.8.0 documentation
Quick Start for Confluent Cloud | Confluent Documentation
与kafka-python的对比
对比维度 | confluent-kafka | kafka-python |
---|---|---|
性能表现 | 基于librdkafka 构建,处理大规模消息时,吞吐量高、延迟低,性能出色 | 纯Python实现,受GIL限制,处理大量并发任务时存在性能瓶颈,高负载下消息处理速度较慢 |
功能特性 | 具备丰富高级特性,如细粒度配置、复杂分区控制、消息压缩、安全认证等 | 功能相对基础,能满足常见Kafka使用场景,高级特性支持不足 |
易用性 | 功能丰富、配置选项多,学习曲线较陡,初学者上手难度大 | 接口设计简单直观,易于理解和使用,初学者能快速上手 |
社区支持 | 由Confluent公司维护,有专业团队和丰富资源,更新维护及时,社区活跃度高 | 开源社区项目,社区较活跃,但资源和支持力度相对较弱 |
兼容性 | 依赖librdkafka ,在不同操作系统和环境中可能存在兼容性问题,需额外配置安装 | 纯Python实现,兼容性好,在各种Python环境中可方便使用,无需额外依赖 |
适用场景 | 适用于对性能要求高、需高级特性的大规模生产环境,如金融交易系统、实时数据处理平台 | 适合开发和测试环境,以及对性能要求不高的小型项目,如简单日志收集系统、数据监控工具 |
配置
文档
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
配置项
字典类型, 配置字段如下:
bootstrap.servers: kafka服务地址, 以逗号分隔
statistics.interval.ms: 发送统计间隔, 需要注册一个统计毁掉函数, 默认值为0,表示禁用统计, 粒度为1000ms
security.protocol: 可选值为“plaintext, ssl, sasl_plaintext, sasl_ssl”, 默认值为“plaintext”, 交互协议
sasl.mechanisms: 用于认证的SASL机制, 支持“GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER”, 默认为“GSSAPI”
sasl.username: SASL用户名, 支持的认证机制为“PLAIN and SASL-SCRAM-…”
sasl.password: SASL密码, 支持的认证机制同sasl.username
group.id: 消费者组id
group.instance.id: 启用静态组成员关系, 静态组成员允许在配置的session.timeout.ms
时间范围内, 离开或重新加入组而不会引起组内再平衡。这个使用时候最好配置一个大点的session.timeout.ms
值, 可以避免组内再平衡引起的短暂的服务不可用。需要kafka server端版本号>=2.3.0
session.timeout.ms: 客户端组回话或检测失败超时时间, 默认值为45000ms
group.protocol: 使用的组协议, 可选值为“classic”和“consumer”, 当前默认值为classic, 以后版本更新默认值会改为consumer
max.poll.interval.ms: 默认值为30000, 高级消费者调用消费消息函数(例如 rd_kafka_consumer_poll ())之间允许的最大时间间隔。如果超过此时间间隔,消费者将被视为失败,并且消费者组将进行重新平衡,以便将分区重新分配给另一个消费者组成员。警告:此时可能无法进行偏移提交。注意:建议为长时间处理应用程序设置 enable.auto.offset.store=false,然后在消息处理后显式存储偏移(使用 offsets_store()),以确保在处理完成之前不会自动提交偏移。
enable.auto.commit: 默认值为true, 在后台自动并定期提交偏移量。注意:设置为false不会阻止使用者获取先前提交的起始偏移量。为了规避此行为,请在调用assign()时设置每个分区的特定起始偏移量。
auto.commit.interval.ms: 默认值为5000ms, 消费者偏移量被提交(写入)到偏移量存储的毫秒级频率。(0 = 禁用)。此设置由高级消费者使用。
enable.auto.offset.store: 默认值为true, 自动存储提供给应用程序的最后一条消息的偏移量。偏移量存储是每个分区下一个要(自动)提交的偏移量的内存存储。
linger.ms: 默认值为5ms, 在将消息发送前,等待生产者队列中的消息累积以构建消息批次(消息集)的毫秒级延迟。较高的值允许更大、更有效的(开销更小、压缩更好)消息批次累积,但会增加消息传递延迟。
retries: 默认值为2147483647, 消息发送失败的重试次数
retry.backoff.ms: 默认值为100,重试时间间隔, 指数级增长, 受 retry.backoff.max.ms 的限制。
batch.size: 默认值为1000000字节数, 消息批次的最大字节数
Producer代码示例
from random import choice
from confluent_kafka import Producerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username': '<CLUSTER API KEY>','sasl.password': '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms': 'PLAIN','acks': 'all'}# Create Producer instanceproducer = Producer(config)# Optional per-message delivery callback (triggered by poll() or flush())# when a message has been successfully delivered or permanently# failed delivery (after retries).def delivery_callback(err, msg):if err:print('ERROR: Message failed delivery: {}'.format(err))else:print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))# Produce data by selecting random values from these lists.topic = "purchases"user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']count = 0for _ in range(10):user_id = choice(user_ids)product = choice(products)producer.produce(topic, product, user_id, callback=delivery_callback)count += 1# Block until the messages are sent.producer.poll(10000)producer.flush()
Consumer代码示例
from confluent_kafka import Consumerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username': '<CLUSTER API KEY>','sasl.password': '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms': 'PLAIN','group.id': 'kafka-python-getting-started','auto.offset.reset': 'earliest'}# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topictopic = "purchases"consumer.subscribe([topic])# Poll for new messages from Kafka and print them.try:while True:# 如果一次想拉取多个消息, 可以用consumer.consume方法, 该方法返回的是一个Message列表# msg_list = consumer.consume(num_messages=消息数量, timeout=如果没有消息, 最长等待的超时时间msg = consumer.poll(1.0)if msg is None:# Initial message consumption may take up to# `session.timeout.ms` for the consumer group to# rebalance and start consumingprint("Waiting...")elif msg.error():print("ERROR: %s".format(msg.error()))else:# Extract the (optional) key and value, and print.print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))except KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()