监听Canal
一、前言为什么要监听 Canal在上一篇《认识 Canal》中我们了解到 Canal 能实时捕获 MySQL 的 Binlog 变更。但如何消费这些变更事件这就是“监听 Canal”的核心任务✅监听 Canal 编写客户端程序订阅 Canal Server 发送的数据库变更事件并基于这些事件执行业务逻辑如更新缓存、写入 ES、记录审计日志等。本文将带你编写最简 Java 客户端解析变更事件结构集成 Spring Boot通过消息队列解耦RabbitMQ/Kafka二、前置条件确保已完成MySQL 已开启 Binlogbinlog-formatROWCanal Server 已启动默认端口11111JDK 8/11 环境 参考《安装 Canal》 | 《认识 Canal》三、方式 1原生 Java 客户端监听最基础3.1 添加 Maven 依赖dependency groupIdcom.alibaba.otter/groupId artifactIdcanal.client/artifactId version1.1.8/version /dependency3.2 编写监听代码import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.*; import java.net.InetSocketAddress; import java.util.List; public class SimpleCanalClient { public static void main(String[] args) { // 1. 创建连接器IP:CanalServer, Port, destination, username, password CanalConnector connector CanalConnectors.newSingleConnector( new InetSocketAddress(127.0.0.1, 11111), example, , ); try { // 2. 连接 订阅支持正则匹配表 connector.connect(); connector.subscribe(.*\\\\..*); // 订阅所有库.所有表 while (true) { // 3. 获取消息阻塞等待超时1秒 Message message connector.get(1000); if (message ! null message.getEntries().size() 0) { printSummary(message); // 处理事件 } } } finally { connector.disconnect(); } } private static void printSummary(Message message) { for (Entry entry : message.getEntries()) { if (entry.getEntryType() EntryType.ROWDATA) { RowChange rowChange null; try { rowChange RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException(解析失败, e); } // 打印基本信息 System.out.println(表名: entry.getHeader().getTableName()); System.out.println(操作类型: rowChange.getEventType()); // 遍历每一行变更 for (RowData rowData : rowChange.getRowDatasList()) { if (rowChange.getEventType() EventType.DELETE) { printColumns(rowData.getBeforeColumnsList(), 删除前); } else if (rowChange.getEventType() EventType.INSERT) { printColumns(rowData.getAfterColumnsList(), 插入后); } else { printColumns(rowData.getBeforeColumnsList(), 修改前); printColumns(rowData.getAfterColumnsList(), 修改后); } } } } } private static void printColumns(ListColumn columns, String prefix) { StringBuilder sb new StringBuilder(prefix : ); for (Column column : columns) { sb.append(column.getName()).append().append(column.getValue()).append(, ); } System.out.println(sb.toString()); } }3.3 测试效果启动上述 Java 程序在 MySQL 执行INSERT INTO user(name, age) VALUES(Alice, 25);观察控制台输出表名: user 操作类型: INSERT 插入后: id1, nameAlice, age25,关键点destination必须与 Canal Server 的instance名称一致默认examplesubscribe()支持正则过滤如test_db\\\\.user四、方式 2Spring Boot 集成生产推荐4.1 添加依赖dependency groupIdcom.alibaba.otter/groupId artifactIdcanal.client/artifactId version1.1.8/version /dependency4.2 配置文件 (application.yml)canal: server: host: 127.0.0.1 port: 11111 destination: example batch-size: 1000 timeout: 10004.3 创建配置类Configuration ConfigurationProperties(prefix canal) Data public class CanalConfig { private String serverHost; private int serverPort; private String destination; private int batchSize 1000; private int timeout 1000; }4.4 编写监听服务Service Slf4j public class CanalListenerService implements CommandLineRunner { Autowired private CanalConfig canalConfig; Override public void run(String... args) throws Exception { // 启动后台线程监听 new Thread(this::listen).start(); } private void listen() { CanalConnector connector CanalConnectors.newSingleConnector( new InetSocketAddress(canalConfig.getServerHost(), canalConfig.getServerPort()), canalConfig.getDestination(), , ); try { connector.connect(); connector.subscribe(.*\\\\..*); while (!Thread.interrupted()) { Message message connector.get(canalConfig.getTimeout()); if (message ! null message.getEntries().size() 0) { handleEvent(message); // 自定义处理逻辑 } } } finally { connector.disconnect(); } } private void handleEvent(Message message) { // 示例只处理 user 表 for (Entry entry : message.getEntries()) { if (user.equals(entry.getHeader().getTableName())) { // TODO: 更新 Redis / 发送 MQ / 写审计日志 log.info(捕获 user 表变更: {}, entry.toString()); } } } }✅优势与 Spring 生态无缝集成支持配置中心动态调整易于单元测试五、方式 3通过消息队列解耦高可用架构直接监听 Canal 存在风险❌ 客户端宕机导致事件丢失❌ 无法支持多消费者解决方案Canal → MQ → 多个消费者5.1 Canal 配置发送到 RabbitMQ修改conf/canal.properties# 启用 RabbitMQ canal.mq.servers127.0.0.1:5672 canal.mq.exchangecanal.exchange canal.mq.usernameguest canal.mq.passwordguest canal.mq.vhost/5.2 Spring Boot 消费 RabbitMQComponent Slf4j public class CanalRabbitConsumer { RabbitListener(queues canal.queue) public void handleCanalMessage(String message) { // 解析 JSON 格式的 Canal 事件 log.info(收到 Canal 事件: {}, message); // 执行业务逻辑... } }架构优势削峰填谷MQ 缓冲突发流量多播能力一个事件可被多个系统消费缓存数仓审计可靠性MQ 持久化保证不丢消息六、关键注意事项6.1 位点管理避免重复消费Canal Client 默认自动提交位点若需手动控制如事务成功后再提交connector.ack(message.getId()); // 手动确认6.2 幂等性设计网络抖动可能导致重复事件消费端必须实现幂等如根据主键 upsert6.3 性能调优参数建议值说明batchSize500~2000单次拉取事件数timeout1000ms空轮询超时多线程消费按表分片避免单线程瓶颈七、常见问题排查问题原因解决方案收不到事件Canal instance 未启动检查logs/example/example.log连接拒绝Canal Server 未监听 11111netstat -tunlp | grep 11111解析异常MySQL 字段类型不兼容升级 Canal 版本或简化表结构CPU 100%空轮询太频繁增加timeout到 1000ms八、结语感谢您的阅读如果你有任何疑问或想要分享的经验请在评论区留言交流