kafka--基础知识点--16--最多一次、至少一次、精确一次
个人理解仅供参考。一个消息的传递可以分两个过程a) producer发送消息到 brokerb) consumer从broker读消息并发送。因此对于三种消息的传递策略要分两个阶段来看:a) producer发送消息到 broker对于producer来说有自己的最多一次、至少一次、精确一次策略在生产者发送消息过程中broker崩溃、网络抖动、生产者崩溃会用到这些策略、分区发生变化。b) consumer从broker读消息并发送对于consumer来说也有自己的最多一次、至少一次、精确一次策略在消费者消费消息过程中如果消费者发生崩溃会用到这些策略。1 producer端1.1 ACK策略Kafka 通过acks参数控制消息确认机制可实现三种消息传递语义至多一次At-Most-Once、至少一次At-Least-Once和恰好一次Exactly-Once。以下是详细对应关系kafka–基础知识点–5.1–ACK机制1.1.1 至多一次At-Most-Once语义消息可能丢失但绝不会重复。ACK策略acks0生产者不等待 Broker 确认发送后立即继续下一条消息。若 Broker 未收到消息或崩溃消息丢失且不会重试。配置要求生产者acks0retries0禁用重试。适用场景实时性要求高但允许少量数据丢失的场景如日志采集。1.1.2 至少一次At-Least-Once语义消息绝不会丢失但可能重复。ACK策略acksallacksall生产者等待所有 ISR 副本确认确保消息持久化结合重试可避免丢失。配置要求生产者acksallretriesInteger.MAX_VALUE无限重试。适用场景允许重复但不容忍丢失的场景如支付状态更新。1.1.3 精确一次Exactly-Once语义消息不丢失、不重复。ACK策略acksall 幂等性 事务 重试幂等性通过enable.idempotencetrue确保重复消息不会导致数据不一致。kafka–基础知识点–5.2–producer幂等性事务生产者跨生产者跨分区原子写生产者参数事务id:transactional.idxxx重试参数:retriesx配合消费者参数隔离级别isolation.levelread_committed。kafka–基础知识点–5.3–producer事务配置要求生产者acksallretriesInteger.MAX_VALUEenable.idempotencetrue,transactional.idxxx,retriesx。适用场景要求严格一致性的场景如金融交易、流处理。2 consumer端kafka–基础知识点–9.1–consumer 至多一次、至少一次、精确一次2.1 读取策略isolation.levelread_committed当消费者设置该参数时表示消费者仅消费已提交的事务消息。该参数只有当生产者使用事务时消费者设置该参数才有效。2.2 提交策略2.2.1 自动提交如果消费者设置为自动提交enable.auto.commit参数设置为trueKafka 消费者后台线程每隔 auto.commit.interval.ms 自动提交最近一次 poll() 的 offset而不管消费者是否对消息是否已经成功进行了业务处理。可能会导致消息丢失[先自动提交但消息还没处理完成消费者崩溃]也可能导致重复消费[先消费完成但提交前消费者崩溃重启消费者后再次消费同一条消息]。2.2.2 手动提交2.2.2.1. 至多一次 (At-Most-Once):行为: 消费者读取消息后先提交位移然后再进行业务处理。风险: 如果业务处理逻辑在提交位移之后失败这条消息就永远不会被再次处理了因为位移已经前移。导致消息丢失。2.2.2.2 至少一次 (At-Least-Once):行为: 消费者读取消息后先进行业务处理处理成功后再提交位移。风险: 如果业务处理成功但在提交位移之前消费者崩溃了当消费者恢复后它会从上一次提交的位移重新消费导致消息被重复处理。导致消息重复。2.2.2.3 精确一次 Exactly-Once2.2.2.3.1 幂等性消费 (Idempotent Consumption) - 推荐且最常用思路承认消息可能会被重复传递但从业务逻辑上保证重复执行不会产生负面效果。做法在消费者的处理逻辑中设计幂等性。例如为每条消息生成一个唯一 ID可以是消息key或自定义UUID。在处理前先检查这个 ID 是否已经被处理过比如在数据库里查一下。如果已处理就直接跳过并提交位移视为成功如果未处理则执行业务逻辑。这是最有效、最通用的方法因为它不依赖于任何特定技术而是从业务设计上根本性地解决问题。例如a) 对于流程中的消息每条消息中包含唯一id比如业务id在数据库中将业务id作为Unique key插入重复时会报duplicate key异常不会导致数据库中出现脏数据b) Redis中使用set存储业务id天然幂等性c) 如果不是上面两个场景需要让生产者发送每条数据的时候里面加一个全局唯一的 id然后你这里消费到了之后先根据这个 id 去比如 Redis 里查一下消费过吗如果没有消费过就执行相应业务进行处理然后这个 id 写 Redis最后提交偏移。如果消费过了那如果消费过了那就别处理了保证不重复处理相同的消息即可2.2.2.3.2 事务性输出 (Transactional Output) / 两阶段提交 (2PC) - 复杂且受限思路将消费者的“业务处理”和“位移提交”绑定为一个分布式事务。做法例如使用 Kafka 的事务性生产者将处理结果和位移提交到外部系统如另一个Kafka主题的操作放在一个事务里。但这通常需要外部系统如数据库也支持参与 Kafka 事务通过 Kafka Connect实现复杂度非常高性能和可用性也会受影响。不推荐普通应用使用。3 一条消息从发送到bocker再被消费者从bocker消费整个过程保证精确一次需要producer 使用精确一次策略即1.1.3consumerconsumer 设置’isolation.level’: ‘read_committed’该参数使得consumer只会消费producer事务已提交事务的消息。consumer使用精确一次策略即 2.2.2.3