mqtt-plus 架构解析(七):动态订阅与重连恢复,为什么能走同一条协调路径
mqtt-plus 架构解析七动态订阅与重连恢复为什么能走同一条协调路径摘要在很多 MQTT 项目里动态订阅和重连恢复往往是两套逻辑一套负责运行时变化一套负责断线后的重新订阅。mqtt-plus当前的做法更收敛一些它把静态 listener 订阅和运行时动态订阅都纳入统一协调模型里动态变更先通过MqttSubscriptionRefreshEventListener写入MqttSubscriptionManager并立即作用到当前 adapter而MqttSubscriptionReconciler则在连接建立时统一回放静态和动态订阅。本文会结合真实源码拆解这条链路为什么成立以及它当前的边界在哪里。项目地址项目地址https://github.com/mqttplus/mqtt-plus配套的示例工程https://github.com/mqttplus/mqtt-plus-examples如果你对这个方向感兴趣欢迎关注、试用也欢迎一起交流 issue 和 PR。如果这篇文章对你有帮助欢迎点赞、收藏也欢迎给项目一个 Star。到了第 7 篇问题开始进入“运行态稳定性”这一层。前面第 6 篇讲的是一个应用如何同时持有多个 broker 连接实例。但只要连接是真实存在的就一定会碰到三类变化应用启动首次建立连接运行过程中新增或删除订阅连接中断后重新建立需要恢复订阅很多项目在这里会慢慢长成三套分离逻辑启动时订阅一套动态刷新一套重连再补一套而 mqtt-plus 当前的设计更克制它没有把这三件事完全拆开而是让“动态变化的订阅集合”与“连接建立时的统一回放”协作从而收敛成一条协调路径。一、这篇文章到底想回答什么这一篇只回答三个问题动态订阅变更是如何被记录并立即应用的重连之后框架如何恢复静态 listener 订阅和运行时新增订阅为什么首次连接、运行时刷新和重连恢复本质上可以共享同一个协调模型如果只记住一句话那就是mqtt-plus 把“订阅变化”存进MqttSubscriptionManager把“连接建立后的恢复”交给MqttSubscriptionReconciler于是运行时变化和重连恢复自然被串到了一起。二、先看整条协调链路先把三个阶段放在一张图里看清楚。Application startup or reconnectAdapter connectedMqttSubscriptionReconciler.onConnected(brokerId)Replay static listener subscriptionsReplay dynamic subscriptions from MqttSubscriptionManagerAdapter.subscribe(...)Running stateMqttSubscriptionRefreshEventMqttSubscriptionRefreshEventListenerUpdate MqttSubscriptionManagerApply subscribe/unsubscribe to active adapter immediately这张图里最关键的是两条线连接建立时统一做一次“恢复式回放”运行时变化时既更新内存中的动态订阅集合也立即作用到当前 adapter于是系统的行为就会变得很稳定当前连接能立刻生效将来重连还能自动恢复这正是 README 里那句非常关键的话背后的实现含义动态订阅刷新会立即应用到当前 adapter同时也会保留下来用于未来重连恢复。三、动态订阅变更为什么不是直接改 adapter 就结束这一层由MqttSubscriptionRefreshEventListener负责。它监听的是MqttSubscriptionRefreshEvent事件里包含actionbrokerIdtopicqos当前支持两种动作SUBSCRIBEUNSUBSCRIBE它的处理逻辑非常直接如果是SUBSCRIBE调用subscriptionManager.addSubscription(brokerId, topic, qos)尝试从adapterRegistry找到当前 broker 对应的 adapter如果当前 adapter 存在立即执行adapter.subscribe(topic, qos)如果是UNSUBSCRIBE调用subscriptionManager.removeSubscription(brokerId, topic)尝试找到当前 adapter如果当前 adapter 存在立即执行adapter.unsubscribe(topic)这一层最重要的设计不是“用 Spring 事件做桥接”而是动态变更同时更新两个对象当前运行态 adapter持久在内存里的动态订阅集合如果它只改 adapter不写MqttSubscriptionManager那一旦重连这些运行时新增的 topic 就丢了。如果它只写MqttSubscriptionManager不立即改 adapter那当前连接又不会马上生效。也就是说这里必须同时兼顾“现在”和“未来”。设计决策动态订阅刷新不是直接对 adapter 做一次 subscribe/unsubscribe 就结束而是必须同时更新MqttSubscriptionManager。这样做的重点是让运行时变更既能立即生效也能进入未来重连时的恢复集合。四、MqttSubscriptionManager真正保存了什么当前默认实现DefaultMqttSubscriptionManager比很多人想象得更简单。它内部维护的是MapString, SetString subscriptionsByBroker也就是说它按brokerId维护一个 topic 集合。接口也非常克制addSubscription(String brokerId, String topic, int qos)removeSubscription(String brokerId, String topic)getSubscriptions(String brokerId)这里有一个特别值得注意的现实边界虽然addSubscription(...)接口里接收了qos但默认实现当前只保存 topic并没有保存每个动态订阅对应的 qos。这也直接影响了后面的恢复逻辑动态订阅在运行时添加时可以用事件里的 qos 立即订阅但未来重连时Reconciler当前只能把这些动态 topic 以qos0再次回放所以这套设计已经把“动态 topic 会不会丢”解决了但还没有把“动态 qos 是否完整恢复”做到完全闭环。这类细节特别值得写出来因为它体现的不是 bug而是当前实现的真实取舍。五、Reconciler为什么是这篇的真正主角真正把“首次连接”和“重连恢复”统一起来的是MqttSubscriptionReconciler。它本身就是一个MqttConnectionListener也就是说它不是运行在业务事件层而是直接挂在 broker 连接生命周期上。当前实现里它在onConnected(String brokerId)做了两件事从MqttListenerRegistry.resolveByBroker(brokerId)取出所有该 broker 应有的静态 listener 定义从MqttSubscriptionManager.getSubscriptions(brokerId)取出运行时累积的动态 topic然后分别回放到 adapter 上。也就是说它恢复的是两类订阅静态订阅来自MqttListener或注册表中的 listener 定义动态订阅来自运行时事件驱动写入的subscriptionManageronConnected(brokerId)adapterRegistry.find(brokerId)listenerRegistry.resolveByBroker(brokerId)subscriptionManager.getSubscriptions(brokerId)subscribe static topics with definition.getQos()subscribe dynamic topics with qos 0Recovered adapter state这张图其实就是第 7 篇最重要的逻辑骨架。因为它说明首次连接时静态订阅会被统一建立重连时同样的逻辑会再次执行动态订阅不需要单独写一套“重连补偿逻辑”因为已经提前被收进subscriptionManager这就是为什么第 7 篇的题目里会强调“同一条协调路径”。六、为什么说首次连接、动态刷新和重连恢复本质上是同一个模型从实现上看这三件事当然不是同一个方法里完成的。首次连接和重连恢复走MqttSubscriptionReconciler.onConnected()运行时变更走MqttSubscriptionRefreshEventListener.onApplicationEvent(...)但从架构模型上看它们其实共享同一个核心逻辑当前应该有哪些静态订阅当前还应该额外保留哪些动态订阅一旦连接建立就把两者统一回放到 adapter所以真正共享的不是“代码入口”而是“恢复模型”。可以换个更直白的说法动态刷新负责维护“未来要恢复什么”Reconciler 负责在“连接恢复时把它们重新落到 adapter 上”这就让系统避免了一个很常见的问题动态变更是动态变更重连恢复是重连恢复两边维护两份订阅真相mqtt-plus 当前选择的是订阅真相尽量收敛恢复动作统一触发。设计决策mqtt-plus 没有为“首次连接”“动态新增 topic”“断线重连”分别设计三套互相独立的订阅恢复机制而是让动态变更先沉淀到MqttSubscriptionManager再由MqttSubscriptionReconciler在连接建立时统一回放。这样做的重点是减少状态分叉让恢复路径保持单一。七、broker*这种 listener 定义会怎么参与恢复这里还有一个很容易被忽略、但很值得讲的点。MqttListenerRegistry.resolveByBroker(brokerId)的逻辑不是只找definition.getBroker().equals(brokerId)而是broker 等于当前 brokerId或者broker 等于*这意味着某个 listener 明确绑定primary只会在primary恢复时参与回放某个 listener 如果写成broker*那它会在每个 broker 的恢复时都参与回放这个细节和第 6 篇的多 broker 设计是直接衔接的。因为多 broker 不是简单“多几条连接”而是会把 broker 维度带进整个 listener 匹配和恢复模型里。所以第 7 篇其实也补了一层认知broker 维度不只是消息路由时的过滤条件它也决定了连接恢复时哪些静态订阅应该被重新建立八、当前实现的边界在哪里这一篇最值得诚实写出来的边界有两个。1. 动态订阅当前只持久化了 topic没有持久化 qos这意味着运行时新增订阅时当前连接能拿到事件里的 qos但重连恢复时默认实现只能按qos0回放动态 topic这不是“设计不存在”而是“恢复模型已经成立但动态 qos 还没有被完整保留下来”。2. 恢复目前只关心订阅集合不处理更复杂的订阅编排比如当前没有继续抽象动态订阅优先级条件订阅批次部分 broker 的差异化恢复策略去重之外更复杂的订阅冲突解决这说明当前Reconciler的目标非常明确把静态订阅和动态订阅统一回放保证首次连接和重连恢复的基本一致性它是一个“恢复协调器”但还不是一个“高级订阅编排引擎”。九、小结第 7 篇真正想讲清楚的是mqtt-plus 没有把动态订阅和重连恢复拆成两套互相独立的真相来源而是做了一个非常清楚的分工MqttSubscriptionRefreshEventListener负责记录动态变化并立即作用到当前 adapterMqttSubscriptionManager负责保存未来需要恢复的动态 topic 集合MqttSubscriptionReconciler负责在连接建立时统一回放静态和动态订阅于是系统的行为就很稳定当前变化可以立即生效将来重连也能恢复回来首次连接、动态变更、重连恢复都围绕同一个订阅模型展开这也是为什么第 7 篇看起来讲的是“恢复”其实真正讲的是另一件事如何让运行时变化和连接生命周期共享同一套状态模型。下一篇会继续往上走一层进入 Spring Boot 自动装配本身这些核心零件到底是怎么被 starter 粘合起来的。系列导航本文是mqtt-plus 架构解析系列的第 7/10 篇。#主题链接1总览分层架构与设计哲学链接2消息路由一条 MQTT 消息如何到达你的MqttListener链接3Payload 序列化与反序列化双链设计的取舍链接4拦截器链MqttMessageInterceptor的扩展点设计链接5错误处理ErrorAction聚合策略的设计逻辑链接6多 Broker 管理如何让一个应用同时连接多个 MQTT 服务链接7动态订阅与重连恢复Reconciler的协调机制本文8Spring Boot 自动装配零件是怎么被粘合起来的链接9测试体系MqttTestTemplate与EmbeddedBroker的设计链接10从内部项目到开源框架mqtt-plus 的抽取过程与决策链接上一篇多 Broker 管理如何让一个应用同时连接多个 MQTT 服务下一篇Spring Boot 自动装配零件是怎么被粘合起来的