当前位置: 首页 > news >正文

confluent-kafka入门教程

文章目录

  • 官方文档
  • 与kafka-python的对比
  • 配置
    • 文档
    • 配置项
  • Producer代码示例
  • Consumer代码示例

官方文档

confluent_kafka API — confluent-kafka 2.8.0 documentation

Quick Start for Confluent Cloud | Confluent Documentation

与kafka-python的对比

对比维度confluent-kafkakafka-python
性能表现基于librdkafka构建,处理大规模消息时,吞吐量高、延迟低,性能出色纯Python实现,受GIL限制,处理大量并发任务时存在性能瓶颈,高负载下消息处理速度较慢
功能特性具备丰富高级特性,如细粒度配置、复杂分区控制、消息压缩、安全认证等功能相对基础,能满足常见Kafka使用场景,高级特性支持不足
易用性功能丰富、配置选项多,学习曲线较陡,初学者上手难度大接口设计简单直观,易于理解和使用,初学者能快速上手
社区支持由Confluent公司维护,有专业团队和丰富资源,更新维护及时,社区活跃度高开源社区项目,社区较活跃,但资源和支持力度相对较弱
兼容性依赖librdkafka,在不同操作系统和环境中可能存在兼容性问题,需额外配置安装纯Python实现,兼容性好,在各种Python环境中可方便使用,无需额外依赖
适用场景适用于对性能要求高、需高级特性的大规模生产环境,如金融交易系统、实时数据处理平台适合开发和测试环境,以及对性能要求不高的小型项目,如简单日志收集系统、数据监控工具

配置

文档

https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

配置项

字典类型, 配置字段如下:

bootstrap.servers: kafka服务地址, 以逗号分隔

statistics.interval.ms: 发送统计间隔, 需要注册一个统计毁掉函数, 默认值为0,表示禁用统计, 粒度为1000ms

security.protocol: 可选值为“plaintext, ssl, sasl_plaintext, sasl_ssl”, 默认值为“plaintext”, 交互协议

sasl.mechanisms: 用于认证的SASL机制, 支持“GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER”, 默认为“GSSAPI”

sasl.username: SASL用户名, 支持的认证机制为“PLAIN and SASL-SCRAM-…”

sasl.password: SASL密码, 支持的认证机制同sasl.username

group.id: 消费者组id

group.instance.id: 启用静态组成员关系, 静态组成员允许在配置的session.timeout.ms时间范围内, 离开或重新加入组而不会引起组内再平衡。这个使用时候最好配置一个大点的session.timeout.ms值, 可以避免组内再平衡引起的短暂的服务不可用。需要kafka server端版本号>=2.3.0

session.timeout.ms: 客户端组回话或检测失败超时时间, 默认值为45000ms

group.protocol: 使用的组协议, 可选值为“classic”和“consumer”, 当前默认值为classic, 以后版本更新默认值会改为consumer

max.poll.interval.ms: 默认值为30000, 高级消费者调用消费消息函数(例如 rd_kafka_consumer_poll ())之间允许的最大时间间隔。如果超过此时间间隔,消费者将被视为失败,并且消费者组将进行重新平衡,以便将分区重新分配给另一个消费者组成员。警告:此时可能无法进行偏移提交。注意:建议为长时间处理应用程序设置 enable.auto.offset.store=false,然后在消息处理后显式存储偏移(使用 offsets_store()),以确保在处理完成之前不会自动提交偏移。

enable.auto.commit: 默认值为true, 在后台自动并定期提交偏移量。注意:设置为false不会阻止使用者获取先前提交的起始偏移量。为了规避此行为,请在调用assign()时设置每个分区的特定起始偏移量。

auto.commit.interval.ms: 默认值为5000ms, 消费者偏移量被提交(写入)到偏移量存储的毫秒级频率。(0 = 禁用)。此设置由高级消费者使用。

enable.auto.offset.store: 默认值为true, 自动存储提供给应用程序的最后一条消息的偏移量。偏移量存储是每个分区下一个要(自动)提交的偏移量的内存存储。

linger.ms: 默认值为5ms, 在将消息发送前,等待生产者队列中的消息累积以构建消息批次(消息集)的毫秒级延迟。较高的值允许更大、更有效的(开销更小、压缩更好)消息批次累积,但会增加消息传递延迟。

retries: 默认值为2147483647, 消息发送失败的重试次数

retry.backoff.ms: 默认值为100,重试时间间隔, 指数级增长, 受 retry.backoff.max.ms 的限制。

batch.size: 默认值为1000000字节数, 消息批次的最大字节数

Producer代码示例

from random import choice
from confluent_kafka import Producerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username':     '<CLUSTER API KEY>','sasl.password':     '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms':   'PLAIN','acks':              'all'}# Create Producer instanceproducer = Producer(config)# Optional per-message delivery callback (triggered by poll() or flush())# when a message has been successfully delivered or permanently# failed delivery (after retries).def delivery_callback(err, msg):if err:print('ERROR: Message failed delivery: {}'.format(err))else:print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))# Produce data by selecting random values from these lists.topic = "purchases"user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']count = 0for _ in range(10):user_id = choice(user_ids)product = choice(products)producer.produce(topic, product, user_id, callback=delivery_callback)count += 1# Block until the messages are sent.producer.poll(10000)producer.flush()

Consumer代码示例

from confluent_kafka import Consumerif __name__ == '__main__':config = {# User-specific properties that you must set'bootstrap.servers': '<BOOTSTRAP SERVERS>','sasl.username':     '<CLUSTER API KEY>','sasl.password':     '<CLUSTER API SECRET>',# Fixed properties'security.protocol': 'SASL_SSL','sasl.mechanisms':   'PLAIN','group.id':          'kafka-python-getting-started','auto.offset.reset': 'earliest'}# Create Consumer instanceconsumer = Consumer(config)# Subscribe to topictopic = "purchases"consumer.subscribe([topic])# Poll for new messages from Kafka and print them.try:while True:# 如果一次想拉取多个消息, 可以用consumer.consume方法, 该方法返回的是一个Message列表# msg_list = consumer.consume(num_messages=消息数量, timeout=如果没有消息, 最长等待的超时时间msg = consumer.poll(1.0)if msg is None:# Initial message consumption may take up to# `session.timeout.ms` for the consumer group to# rebalance and start consumingprint("Waiting...")elif msg.error():print("ERROR: %s".format(msg.error()))else:# Extract the (optional) key and value, and print.print("Consumed event from topic {topic}: key = {key:12} value = {value:12}".format(topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))except KeyboardInterrupt:passfinally:# Leave group and commit final offsetsconsumer.close()

相关文章:

  • Windows 下 MongoDB ZIP 版本安装指南
  • 【Linux系统篇】:从匿名管道到命名管道--如何理解进程通信中的管道?
  • 《如何结合XMind和DeepSeek高效生成思维导图》
  • Obsidian 文件夹体系构建 -INKA
  • 华为OD机试真题—— 最少数量线段覆盖/多线段数据压缩(2025A卷:100分)Java/python/JavaScript/C++/C语言/GO六种最佳实现
  • 网工_传输层协议概述
  • 无感改造,完美监控:Docker 多阶段构建 Go 应用无侵入观测
  • 【ES6新特性】Proxy进阶实战
  • 第IV部分有效应用程序的设计模式
  • 驱动速腾雷达16线并用rviz显示点云
  • C++进程间通信开发实战:高效解决项目中的IPC问题
  • 【c语言基础学习】qsort快速排序函数介绍与使用
  • 3D开发工具HOOPS助力Hexagon智能制造突破技术瓶颈,重塑测量软件用户体验!
  • 算法——置换与排列【基础】
  • LVGL Video控件和Radiobtn控件详解
  • 【无标题】Spark-SQL编程(2)
  • 玩转Docker | 使用Docker部署Xnote笔记工具
  • 从Gradio App创建Discord Bot/Slack Bot/Website Widget(2)——从Gradio App创建Slack Bot
  • 智谱开源 9B/32B 系列模型,性价比超 DeepSeek-R1,Z.ai 平台上线
  • 疾控01-实验室信息管理系统需求分析
  • 文旅部副部长饶权出任国家文物局局长
  • “网红”谭媛去世三年:未停更的账号和困境中的家庭
  • 生于1982年,孙晋出任共青团广西壮族自治区委员会书记
  • 大理洱源4.8级地震致442户房屋受损,无人员伤亡
  • 山西省援疆前方指挥部总指挥刘鹓已任忻州市委副书记
  • “2025未来地球:科学与应用大会”在江西景德镇开幕