RocketMQ客户端编程模型
RocketMQ的运行架构和消息模型可以总结为一个中心化的存储集群 一套标准化的发布订阅模型。它通过NameServer做服务发现Broker做消息存储再配合生产者和消费者这两大客户端构成了一个高吞吐、低延迟的分布式消息系统。一、核心架构组件NameServer轻量级路由中心。它负责管理Broker集群的元数据如Broker地址、Topic路由信息为生产者和消费者提供服务发现功能。。Broker消息存储核心。这是RocketMQ最关键的组件负责消息的接收、存储、查询和消费投递。Broker通常采用主从Master-Slave架构部署Master负责处理写入请求Slave负责高可用和部分读请求。单个Broker节点可以承载成千上万个Topic这得益于其优秀的存储设计。Producer生产者消息发布者。它是业务系统中负责创建并发送消息的客户端。生产者会定期从NameServer拉取Topic的路由信息然后与目标Broker建立长连接并将消息发送到该Topic下的某个队列中。Consumer消费者消息订阅者。它是业务系统中负责处理消息的客户端。消费者同样从NameServer获取路由然后向Broker发起拉取请求获取消息进行业务处理。同一个消费组ConsumerGroup内的多个消费者会共同分担消息实现水平扩展。二、核心消息模型主题Topic逻辑上的消息分类。它是消息的第一级容器用于区分不同的业务例如订单Topic、商品Topic。一个Topic下会包含多个消息队列。消息队列MessageQueue物理存储分片。Topic由多个Queue组成每个Queue内部的消息是严格有序的。通过增加Queue的数量可以水平提升Topic的吞吐能力。消息Message数据传输的最小单元。每条消息都归属于一个Topic和一个Queue拥有唯一的IDMessage ID和可选的Key业务键用于快速查找与Tag标签用于消息过滤。消费者分组ConsumerGroup消费身份逻辑分组。这是发布订阅模型的核心。同一个Group内的多个消费者实例共同消费Topic中的消息彼此是竞争关系一条消息只会被Group内的一个消费者处理而不同的Group订阅同一个Topic彼此独立都能收到全量消息从而实现一对多的广播效果。三、消息发送方式发送方式特点可靠性适用场景同步发送 (Sync)发送后阻塞等待Broker返回结果。最高。可通过SendResult判断消息是否成功。重要的业务通知、订单状态更新等对可靠性要求极高的场景。异步发送 (Async)发送后不阻塞通过回调接口处理结果。高。失败时可重试且不阻塞主线程吞吐量高。高并发、对响应时间敏感的业务如电商下单、秒杀系统。单向发送 (Oneway)只管发送不等待任何响应。低。消息可能丢失无任何重试机制。日志收集、监控数据上报等允许少量丢失、追求极致性能的场景。四、可靠性机制确认与重试生产者确认同步发送返回的 SendResult 包含 SendStatus如 SEND_OK、FLUSH_DISK_TIMEOUT 等告知发送结果。异步发送则通过 SendCallback 的 onSuccess 和 onException 方法来感知结果。消费者确认消息处理回调必须返回 ConsumeConcurrentlyStatus。CONSUME_SUCCESS消费成功Broker会更新消费位点。RECONSUME_LATER消费失败Broker会稍后重试推送该消息。重试与死信RocketMQ会自动为每个消费者组创建一个重试队列。消费失败的消息会被先移入重试队列避免阻塞正常消息。每条消息默认最多重试 16次。超过后消息会被移入死信队列DLQ需要人工介入处理。五、高级特性消息消费模式集群模式Clustering默认模式。同一个消费者组内的多个消费者共同消费队列中的消息每条消息只会被组内一个消费者实例消费。这是负载均衡的模式。广播模式Broadcasting每条消息都会被同一个消费者组内的所有消费者实例消费一次。即消息会“广播”给组内的每一个消费者。顺序消息要保证消息的全局或局部顺序需在生产者和消费者两端协作完成。生产者使用 MessageQueueSelector 将同一业务键如订单ID的消息都发送到同一个队列中。消费者使用 MessageListenerOrderly 进行消费它会确保一个队列同时只有一个线程在处理。事务消息RocketMQ解决分布式事务问题的核心特性它通过两阶段提交和事务回查机制保证了本地事务和消息发送的原子性——要么两者都成功要么都不成功。在保证消息可靠投递的同时实现了最终一致性。典型场景如“下单支付扣库存”生产者发送半消息到Broker消费者不可见生产者执行本地事务executeLocalTransaction()插入订单待支付成功→UNKNOW失败→ROLLBACKBroker回查本地事务checkLocalTransaction()查询订单支付状态已支付→COMMIT待支付→UNKNOW已取消→ROLLBACK最终本地事务状态COMMIT→ 消息对消费者可见多次UNKNOW后超时 →ROLLBACK消息删除通过两阶段提交回查机制将“支付结果”作为事务最终状态的判断依据。第一阶段生产者向Broker发送一条半消息Half Message消息成功写入Broker并持久化消费者不可见无法消费消息状态为UN_KNOW第二阶段生产者执行本地事务后根据结果向Broker发送提交Commit 或回滚Rollback 指令提交 → 半消息转为普通消息消费者可见回滚 → 半消息被删除消费者永远看不到消息过滤RocketMQ在Broker端提供两种过滤方式可有效减少无效网络传输Tag过滤简单、高效适合标签明确的场景。例如consumer.subscribe(TOPIC, TAG_A || TAG_B);SQL92过滤功能强大支持自定义属性过滤。需要Broker开启enablePropertyFilter配置。例如consumer.subscribe(TOPIC, MessageSelector.bySql(a 5 and b abc));延迟消息通过设定延迟级别让消息在一段时间后投递适用于订单超时未支付等场景。RocketMQ支持18个延迟级别如1s, 5s, 10s, ... 2h。设置方法message.setDelayTimeLevel(3); // 3级对应10秒延迟消费幂等控制在分布式消息系统中消费幂等是一个核心设计问题。由于网络抖动、消费者重启、Broker重试等原因消息可能会被重复投递因此消费者必须实现幂等处理确保重复消息不会导致业务数据异常。消息消费幂等的核心根据唯一标识判断消息是否已被处理过避免重复执行业务逻辑。Message IDRocketMQ为每条消息生成的全局唯一标识符主要用于消息的唯一性标识和链路追踪。Key业务层面的自定义标识符由生产者在发送消息时设置用于业务关联和消息过滤。不建议仅依赖Message ID做幂等因为极端情况下可能重复Broker重启、重试等。优先使用业务Key作为幂等IDMessage ID作为兜底。