README.md | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
README.md
@@ -16,9 +16,9 @@ zookeeper-address: # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 acquire-interval: 1000 # 未拉取到消息情况下,获取消息的时间间隔毫秒值 subscribe: .*\\..* # 默认情况下拉取所有库、所有表 prod: example: example1 database: books prod: example: example database: books ``` @@ -28,9 +28,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.duxinglangzi.canal.starter.annotation.CanalInsertListener; import com.duxinglangzi.canal.starter.annotation.CanalListener; import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener; import com.duxinglangzi.canal.starter.annotation.EnableCanalListener; import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.springframework.stereotype.Service; import java.util.stream.Collectors; @@ -44,9 +46,9 @@ public class CanalListenerTest { /** * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener * * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener * * 目前 Listener 方法的参数必须为 com.duxinglangzi.canal.starter.mode.CanalMessage * 程序在启动过程中会做检查 */ @@ -56,51 +58,59 @@ * 目标是 ${prod.example} 的 ${prod.database} 库 users表 */ @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"}) public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksUsers", eventType, rowData); public void listenerExampleBooksUsers(CanalMessage message) { printChange("listenerExampleBooksUsers", message); } /** * 监控更新操作 ,目标是 example的 books库 users表 */ @CanalUpdateListener(destination = "example", database = "books", table = {"users"}) public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksUsers", eventType, rowData); @CanalInsertListener(destination = "example", database = "books", table = {"users"}) public void listenerExampleBooksUser(CanalMessage message) { printChange("listenerExampleBooksUsers", message); } /** * 监控更新操作 ,目标是 example的 books库 books表 */ @CanalUpdateListener(destination = "example", database = "books", table = {"books"}) public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksBooks", eventType, rowData); public void listenerExampleBooksBooks(CanalMessage message) { printChange("listenerExampleBooksBooks", message); } /** * 监控更新操作 ,目标是 example的 books库的所有表 */ @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE) public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksAll", eventType, rowData); public void listenerExampleBooksAll(CanalMessage message) { printChange("listenerExampleBooksAll", message); } /** * 监控更新操作 ,目标是 example的 所有库的所有表 */ @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE) public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleAll", eventType, rowData); public void listenerExampleAll(CanalMessage message) { printChange("listenerExampleAll", message); } /** * 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表 */ @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerAllDml", eventType, rowData); public void listenerAllDml(CanalMessage message) { printChange("listenerAllDml", message); } public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { public void printChange(String method, CanalMessage message) { CanalEntry.EventType eventType = message.getEventType(); CanalEntry.RowData rowData = message.getRowData(); System.out.println(" >>>>>>>>>>>>>[当前数据库: "+message.getDataBaseName()+" ," + "数据库表名: " + message.getTableName() + " , " + "方法: " + method ); if (eventType == CanalEntry.EventType.DELETE) { rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { System.out.println("[方法: " + method + " , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue()); src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -2,6 +2,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; @@ -12,7 +13,7 @@ import java.util.stream.Collectors; /** * 登记员 * 监听的终端注册器 * * @author wuqiong 2022/4/11 */ @@ -42,8 +43,8 @@ private CanalEntry.EventType[] eventType; /** * 1、目前实现的 DML 解析器仅支持两个参数 <p> * 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData <p> * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p> * 2、方法参数必须为: CanalMessage <p> * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p> * * @param sets @@ -52,11 +53,9 @@ */ public void checkParameter(Set<String> sets) { List<Class<?>> classes = parameterTypes(); if (classes.size() > 2 || classes.get(1) != CanalEntry.RowData.class || classes.get(0) != CanalEntry.EventType.class) if (classes.size() != 1 || classes.get(0) != CanalMessage.class) throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check "); "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check "); if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) throw new CanalClientException("@CanalListener Illegal destination , please check "); src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -5,6 +5,7 @@ import com.alibaba.otter.canal.protocol.Message; import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar; import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,7 +120,7 @@ eventType)) .forEach(element -> { try { element.getMethod().invoke(element.getBean(), eventType, rowData); element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); } catch (IllegalAccessException | InvocationTargetException e) { logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e); src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java
New file @@ -0,0 +1,76 @@ package com.duxinglangzi.canal.starter.mode; import com.alibaba.otter.canal.protocol.CanalEntry; import java.io.Serializable; /** * 监听事件的 返回信息, 主要将 CanalEntry 里不同层级的常用数据组装到同一层级使用.<p> * headers 是为了方便获取其他数据,比如 * * @author wuqiong 2022/8/15 */ public class CanalMessage implements Serializable { private static final long serialVersionUID = 730485362580815032L; /** * 数据库名 */ private String dataBaseName; /** * 表名 */ private String tableName; /** * 发生变化的 事件类型 */ private CanalEntry.EventType eventType; /** * 发生变化的数据 */ private CanalEntry.RowData rowData; /** * 头信息, 包含: sql执行时间、数据库日志文件名、数据库日志文件偏移量 等信息 */ private CanalEntry.Header entryHeader; /** * 构造返回数据信息 * * @param entryHeader 头信息 * @param eventType 事件类型 * @param rowData 变化的数据 * @author wuqiong 2022/8/15 16:18 */ public CanalMessage(CanalEntry.Header entryHeader, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { this.entryHeader = entryHeader; this.eventType = eventType; this.rowData = rowData; this.dataBaseName = getEntryHeader().getSchemaName(); this.tableName = getEntryHeader().getTableName(); } public String getDataBaseName() { return dataBaseName; } public String getTableName() { return tableName; } public CanalEntry.EventType getEventType() { return eventType; } public CanalEntry.RowData getRowData() { return rowData; } public CanalEntry.Header getEntryHeader() { return entryHeader; } }