你好我是fengxin_rou这是我的个人主页fengxin_rou的主页❄️欢迎查看我的专栏我的专栏《Java后端学习》、《JAVASE基础》、《JUC并发》、《redis》、《JVM虚拟机》、《MYSQL》、《黑马点评》、《rabbitmq》、《JavaWebAI的talis学习系统》、《苍穹外卖》目录前言一、整体架构流程与核心原理1.1 架构拓扑链路1.2 关键基础概念解释二、Canal→Kafka 桥接器核心实现2.1 组件职责2.2 核心源码实现2.3 核心逻辑说明三、Canal Outbox 消费者实现3.1 组件职责3.2 核心源码实现3.3 设计亮点四、关系事件处理器幂等与业务处理4.1 核心职责4.2 幂等与业务逻辑源码4.3 关键设计解析结语前言在社交类系统用户关注 / 粉丝关系场景中直面数据强一致性、高并发解耦、缓存与数据库双写同步等痛点。传统同步落库方式耦合度高、吞吐瓶颈明显而Outbox 事件驱动结合Canal Binlog 增量订阅架构可实现业务解耦、数据异步同步、缓存自动维护是中大型社交系统用户关系模块的最优落地方案之一。本文从架构原理、核心组件源码、事件处理流程、幂等设计四大维度完整拆解实战落地细节。一、整体架构流程与核心原理1.1 架构拓扑链路整套架构采用MySQL 业务表 Outbox 事件表 Canal 监听 BinlogKafka 消息中转 消费业务处理的标准 CDC 事件驱动模型。 核心链路流转用户执行关注 / 取关操作写入following关注业务表同时写入Outbox 事件表生成业务事件Canal实时监听 MySQL Binlog 日志捕获 Outbox 表数据变更Canal 桥接器过滤无关表事件解析 Binlog 为标准 JSON 消息投递至 Kafka 指定 Topic消费者订阅 Kafka 消息反序列化为关系事件完成伪从表同步、Redis 缓存维护、用户计数更新。整个架构最大优势是业务代码无侵入通过 Binlog 增量订阅实现数据变更被动感知异步化解耦核心业务与缓存、统计、冗余表同步逻辑。1.2 关键基础概念解释空批次Empty BatchCanal 拉取的消息条目列表为空代表当前 MySQL 无任何数据变更事件无业务消息需要处理。心跳消息HeartbeatCanal 服务端推送batchId-1的特殊消息核心作用是维持客户端与服务端长连接避免长时间无数据传输导致连接超时断开同时做服务健康状态探活。当检测到空批次或心跳消息时架构采用间隔休眠轮询策略减少无效 CPU 空转保障服务资源合理利用。二、Canal→Kafka 桥接器核心实现2.1 组件职责CanalKafkaBridge作为架构中转核心负责连接 Canal 服务、监听 Binlog、过滤 Outbox 表事件、序列化为 JSON 并投递 Kafka。只监听配置过滤表达式指定的 Outbox 表忽略其他业务表变更减少消息冗余。2.2 核心源码实现/** * 启动桥接器消费 Canal 并投递到 Kafka。 */ Override public void start() { if (running) { log.info(Canal bridge start skipped: running{} enabled{} host{} port{} dest{} filter{}, running, enabled, host, port, destination, filter); return; } // 标记运行并使用全局线程池异步执行主循环 running true; taskExecutor.execute(() - { try { // 创建Canal单实例连接器并建立连接 connector CanalConnectors.newSingleConnector( new InetSocketAddress(host, port), destination, username, password); log.info(Canal connecting to {}:{} dest{} user{} filter{}, host, port, destination, username, filter); connector.connect(); // 订阅过滤表达式仅拉取关心的Outbox表 connector.subscribe(filter); // 回滚到上次确认位点保证消息消费一致性 connector.rollback(); log.info(Canal connected and subscribed: host{} port{} dest{} filter{} batchSize{} intervalMs{}ms, host, port, destination, filter, batchSize, intervalMs); while (running) { // 拉取一批未确认消息不自动ack Message message connector.getWithoutAck(batchSize); long batchId message.getId(); // 空批次或心跳消息休眠后继续轮询 if (batchId -1 || message.getEntries() null || message.getEntries().isEmpty()) { try { Thread.sleep(intervalMs); } catch (InterruptedException ignored) {} continue; } // 遍历解析行级数据变更事件 for (CanalEntry.Entry entry : message.getEntries()) { // 只处理行数据变更忽略事务、DDL等事件 if (entry.getEntryType() ! CanalEntry.EntryType.ROWDATA) { continue; } CanalEntry.RowChange rowChange; try { rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { continue; } // 仅转发新增、更新事件 CanalEntry.EventType eventType rowChange.getEventType(); if (eventType ! CanalEntry.EventType.INSERT eventType ! CanalEntry.EventType.UPDATE) { continue; } // 封装消息并投递Kafka ArrayNode dataArray objectMapper.createArrayNode(); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { ObjectNode rowNode objectMapper.createObjectNode(); for (CanalEntry.Column col : rowData.getAfterColumnsList()) { // 提取Outbox核心payload事件内容 if (payload.equalsIgnoreCase(col.getName())) { rowNode.put(payload, col.getValue()); } } dataArray.add(rowNode); } ObjectNode msgNode objectMapper.createObjectNode(); msgNode.put(table, entry.getHeader().getTableName()); msgNode.put(type, eventType CanalEntry.EventType.INSERT ? INSERT : UPDATE); msgNode.set(data, dataArray); // 发送至canal-outbox主题 String json objectMapper.writeValueAsString(msgNode); kafka.send(OutboxTopics.CANAL_OUTBOX, json); } // 手动确认批次推进消费位点避免消息重放 connector.ack(batchId); } } catch (Exception e) { log.error(Canal bridge error, e); } finally { // 资源释放断开Canal连接 if (connector ! null) { try { connector.disconnect(); log.info(Canal disconnected: dest{}, destination); } catch (Exception ex) { log.warn(Canal disconnect failed: dest{} err{}, destination, ex.getMessage()); } } } }); }2.3 核心逻辑说明异步循环监听通过线程池启动独立循环阻塞式拉取 Canal 消息事件过滤只保留行级 INSERT/UPDATE 事件过滤 DDL、删除、事务事件位点保障采用getWithoutAck手动拉取、处理完成后ack确认保证消息至少一次投递消息封装仅提取 Outbox 表payload核心字段精简消息体积提升下游消费效率。三、Canal Outbox 消费者实现3.1 组件职责CanalOutboxConsumer订阅 Kafka 的canal-outbox主题负责消费消息、解析 JSON 载荷、反序列化为业务事件并调用事件处理器完成后续业务逻辑采用手动 ACK保证消息消费可靠性。3.2 核心源码实现package com.tongji.relation.outbox; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.tongji.relation.event.RelationEvent; import com.tongji.relation.processor.RelationEventProcessor; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Service; import java.util.List; import com.tongji.common.util.OutboxMessageUtil; /** * Canal Outbox 消费者 * 消费Canal桥接消息解析payload为关系事件交由处理器处理 */ Service public class CanalOutboxConsumer { private final ObjectMapper objectMapper; private final RelationEventProcessor processor; public CanalOutboxConsumer(ObjectMapper objectMapper, RelationEventProcessor processor) { this.objectMapper objectMapper; this.processor processor; } KafkaListener(topics OutboxTopics.CANAL_OUTBOX, groupId relation-outbox-consumer) public void onMessage(String message, Acknowledgment ack) { try { // 工具类提取消息数据行 ListJsonNode rows OutboxMessageUtil.extractRows(objectMapper, message); if (rows.isEmpty()) { ack.acknowledge(); return; } // 遍历解析每一条关系事件 for (JsonNode row : rows) { JsonNode payloadNode row.get(payload); if (payloadNode null) { continue; } // 反序列化为业务事件 RelationEvent evt objectMapper.readValue(payloadNode.asText(), RelationEvent.class); processor.process(evt); } // 全部处理完成后手动提交位点 ack.acknowledge(); } catch (Exception ignored) {} } }3.3 设计亮点采用批量处理、统一手动 ACK机制只有所有事件处理完成后才提交 Kafka 位点。 若中途异常不会提交位点重启后可重新消费天然实现消息重试与业务幂等兜底。四、关系事件处理器幂等与业务处理4.1 核心职责RelationEventProcessor是业务逻辑核心负责事件幂等去重、粉丝伪从表同步、Redis ZSet 缓存维护、关注 / 粉丝计数原子更新同时设置缓存 TTL 规避缓存数据陈旧问题。4.2 幂等与业务逻辑源码/** * 关系事件处理器 * 职责事件去重防抖、幂等处理、落库同步、缓存维护、计数更新 */ Service public class RelationEventProcessor { private final RelationMapper mapper; private final StringRedisTemplate redis; private final UserCounterService userCounterService; public RelationEventProcessor(RelationMapper mapper, StringRedisTemplate redis, UserCounterService userCounterService) { this.mapper mapper; this.redis redis; this.userCounterService userCounterService; } public void process(RelationEvent evt) { // 构造幂等去重Key事件类型发起用户目标用户事件ID String dk dedup:rel: evt.type() : evt.fromUserId() : evt.toUserId() : (evt.id() null ? 0 : String.valueOf(evt.id())); // 10分钟幂等锁防止重复消费 Boolean first redis.opsForValue().setIfAbsent(dk, 1, Duration.ofMinutes(10)); if (first null || !first) { return; } // 关注创建事件 if (FollowCreated.equals(evt.type())) { mapper.insertFollower(evt.id(), evt.toUserId(), evt.fromUserId(), 1); long now System.currentTimeMillis(); // 维护关注、粉丝ZSet有序缓存 redis.opsForZSet().add(uf:flws: evt.fromUserId(), String.valueOf(evt.toUserId()), now); redis.opsForZSet().add(uf:fans: evt.toUserId(), String.valueOf(evt.fromUserId()), now); // 设置缓存2小时TTL redis.expire(uf:flws: evt.fromUserId(), Duration.ofHours(2)); redis.expire(uf:fans: evt.toUserId(), Duration.ofHours(2)); // 原子更新计数 userCounterService.incrementFollowings(evt.fromUserId(), 1); userCounterService.incrementFollowers(evt.toUserId(), 1); } // 取关取消事件 else if (FollowCanceled.equals(evt.type())) { mapper.cancelFollower(evt.toUserId(), evt.fromUserId()); // 移除缓存中对应关系 redis.opsForZSet().remove(uf:flws: evt.fromUserId(), String.valueOf(evt.toUserId())); redis.opsForZSet().remove(uf:fans: evt.toUserId(), String.valueOf(evt.fromUserId())); redis.expire(uf:flws: evt.fromUserId(), Duration.ofHours(2)); redis.expire(uf:fans: evt.toUserId(), Duration.ofHours(2)); // 扣减关注、粉丝计数 userCounterService.incrementFollowings(evt.fromUserId(), -1); userCounterService.incrementFollowers(evt.toUserId(), -1); } } }4.3 关键设计解析幂等去重基于 RedissetIfAbsent构造唯一去重键10 分钟有效期规避 Kafka 重复消费、消息重放导致的数据错乱ZSet 缓存设计采用时间戳作为分数有序维护关注 / 粉丝列表支持分页、排序查询缓存 TTL 策略设置 2 小时过期自动清理冷数据减少内存占用兜底缓存一致性问题计数原子更新通过独立计数服务增减关注数、粉丝数避免数据库频繁聚合查询提升接口响应速度。结语本文完整拆解了Outbox 事件驱动 Canal Binlog 增量订阅在用户关系模块的落地架构涵盖 Canal 桥接、Kafka 消息中转、消费者解析、事件幂等处理四大核心环节。该架构核心价值在于业务解耦、无侵入数据同步、天然支持高并发与削峰填谷同时通过手动位点、Redis 幂等锁、缓存 TTL 三重机制保障数据一致性。适用于社交平台、社区 APP 等需要维护关注 / 粉丝关系、异步缓存更新、数据冗余表同步的业务场景。进阶优化可从 Canal 集群化、Kafka 分区并行消费、本地消息表兜底、缓存预热等方向扩展进一步提升架构可用性与吞吐能力。