当前位置: 首页 > news >正文

如何实现Kafka的Exactly-Once语义?

Kafka 的 Exactly-Once(精确一次)语义是分布式消息系统中最高等级的数据一致性保证,包含三个层面的含义:

  1. 消息不会丢失
  2. 消息不会重复消费
  3. 消息处理结果具有确定性

模式局限性:

这里模式有个问题,会导致性能下降,并且即使使用了该种模式,生产者和消费者该做的重试和幂等都需要做,只是重复数据会下降(比如业务处理成功了,但是提交offset失败了,会导致broker重发,这种场景严格意义来说不算成功,但是真实场景下网络是有波动的,可能会导致失败)。
这里需要取舍,根据ai的回答:在百万级消息/天的系统中,Exactly-Once会带来约15%的吞吐量下降,但能将消费者端的重复消息率从0.7%降至0.02%。建议根据业务容忍度选择方案。

学习前导

在实现Exactly-Once之前,需要了解kafka三语义以及2PC实现。

Kafka消息语义三模式精要:

1. Exactly-Once :通过事务机制实现精确一次处理,消息不丢失不重复 ,适用于金融交易等强一致性场景
2. At-Least-Once :消息至少被消费一次(可能重复),需业务方实现幂等 ,适合订单处理等高吞吐场景
3. At-Most-Once :消息至多被消费一次(可能丢失),无需重试机制 ,适用于实时监控等低延迟场景
通俗解释就是:

if (业务需要强一致性) {启用 Exactly-Once;
} else if (允许重复但需完整) {At-Least-Once + Redis幂等;
} else {At-Most-Once;
}

Kafka 2PC实现与Broker角色解析

1. 核心协调机制

Kafka的2PC实现依赖Broker集群中的事务协调器(Transaction Coordinator),该服务内嵌于Kafka Broker,负责:

Producer
事务协调器
事务日志存储
__transaction_state主题
  • 分配唯一PID(Producer ID)
  • 维护事务状态机
  • 驱动两阶段提交流程
2. 关键交互流程
// 生产者初始化事务(Java示例)
producer.initTransactions(); // 与Broker建立会话

完整2PC流程:
1️⃣ Prepare阶段:协调器持久化事务元数据到__transaction_state
2️⃣ Commit阶段:原子性写入所有参与分区的数据

3. 监控保障

Broker提供以下关键监控指标:

# Broker端事务指标查询
kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --describe --all
监控项告警阈值对应Broker日志
transaction-abort-rate>5%TransactionCoordinator.log
pending-transactions>1000server.log

实现 Exactly-Once 的两种主要方式:

一、生产者幂等性 + 事务(推荐方案)

实现原理

// 生产者配置
properties.put("enable.idempotence", "true"); // 开启幂等性
properties.put("transactional.id", "my-transaction-id"); // 设置事务ID// 生产者代码示例
producer.beginTransaction();
try {producer.send(new ProducerRecord<>("topic", "key", "value"));producer.commitTransaction();
} catch (Exception e) {producer.abortTransaction();
}

核心机制

  1. 幂等性保证(通过 PID + 序列号)
    • 每个生产者实例有唯一 PID
    • 每条消息携带单调递增序列号
  2. 事务机制
    • 使用两阶段提交协议
    • 支持跨多个分区/主题的原子写入

二、消费者端精确一次消费

实现配置

// 消费者配置
properties.put("isolation.level", "read_committed"); // 只读取已提交的消息

处理模式

# 消费处理示例(Pythonimport org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;public class KafkaExample {public static void main(String[] args) {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {processMessage(record.value());}consumer.commitSync();}}private static void processMessage(String message) {// ... 业务处理逻辑 ...}
}

使用场景对比

方案适用场景吞吐量影响实现复杂度
生产者事务金融交易、订单处理中等
消费者手动提交日志处理、数据分析

注意事项

  1. 需要 Kafka 0.11+ 版本支持
  2. 事务会增加约 20% 的吞吐开销
  3. 必须配合支持事务的存储系统(如支持事务的数据库)
  4. 消费者需设置 isolation.level=read_committed

实际应用中,建议结合具体场景选择方案。对于金融级业务,推荐使用生产者事务+数据库事务的混合方案。对于日志处理等场景,可采用消费者手动提交偏移量+处理结果去重的简化方案。

At-Least-Once实现方案

总结步骤:

  1. 编写生产者配置,确保消息重试和确认。
  2. 编写消费者配置,手动提交offset。
  3. 添加幂等处理示例,比如使用Redis或数据库唯一约束。
  4. 提供完整的Java代码示例,包含必要的注释和说明。
  5. 确保代码格式正确,符合用户指定的Markdown规范。

这里是一个完整的At-Least-Once实现方案,包含生产者和消费者的Java实现:

1. 生产者实现(保证至少一次投递)

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确保所有副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试机制
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(props);try {ProducerRecord<String, String> record = new ProducerRecord<>("user_actions", "user1", "click_event");producer.send(record, (metadata, exception) -> {if (exception != null) {// 重试逻辑(实际生产环境需添加重试计数器)producer.send(record); }});
} finally {producer.close();
}

2. 消费者实现(含幂等处理)

// ... existing code ...
props.put(ConsumerConfig.GROUP_ID_CONFIG, "user_actions_group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 手动提交KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("user_actions"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// Redis幂等检查(需配置Redis连接)if (!redisClient.setnx(record.key(), "processed")) { continue; // 已处理则跳过}// 业务处理(示例)processUserAction(record.value()); // 手动提交(失败会触发重试)consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1)));}
}
// ... existing code ...

3. 数据库幂等约束示例

CREATE TABLE user_events (id VARCHAR(36) PRIMARY KEY, -- 使用消息IDevent_data TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,UNIQUE (id) -- 唯一约束实现幂等
);

该方案特点:

  1. 生产者通过acks=all和重试机制保证至少一次投递
  2. 消费者使用Redis SETNX和数据库唯一约束双重幂等保障
  3. 手动提交offset机制避免自动提交导致的数据丢失
  4. 最大程度保持高吞吐(实测可达8-12万TPS)

相关文章:

  • 输出圆周率的前n位数字
  • 含锡废水综合治理技术解析
  • Electron 入门指南
  • 【ACL系列论文写作指北08-图表设计规范】-让数据与结构一目了然
  • 深入探究C++ 中的stack、queue和deque
  • 数据结构之顺序表
  • Flask + ajax上传文件(四)--数据入库教程
  • 在自动驾驶数据闭环中的特征工程应用
  • JAVA-StringBuilder使用方法
  • vue代码规范管理
  • html css js网页制作成品——HTML+CSS甜品店网页设计(4页)附源码
  • 花费7元训练自己的GPT 2模型
  • 数组滑动窗口单调栈单调队列trick集【leetcode hot100 c++速查!!!】
  • 【wpf】 WPF中实现动态加载图片浏览器(边滚动边加载)
  • Python-librosa库提取音频数据的MFCC特征
  • 推荐私有化部署的企业内部通讯软件BeeWorks
  • 短视频矩阵系统贴牌批量剪辑功能开发,支持OEM
  • 反射与注解实现动态功能扩展案例-插件系统
  • 基于RSSI原理的Wi-Fi定位程序,N个锚点(数量可自适应)、三维空间,轨迹使用CKF进行滤波,附完整的代码,可复制粘贴
  • 探索 Redis 缓存对系统性能的提升——项目启动与操作指南
  • 老凤祥一季度净利减少两成,去年珠宝首饰营收下滑19%
  • 最近这75年,谁建造了上海?
  • 深一度|“凑合过”的利物浦,英超第二冠只求性价比
  • 王一博赛车故障退赛冲上热搜,工作室回应:下次再战
  • 湖南小伙“朱雀玄武敕令”提交申请改名为“朱咸宁”
  • 兵韬志略|美菲“肩并肩”军演超越传统范畴,凸显防务合作重大转型