duxinglangzi
2022-08-15 c83c1b4ee7ff9c01a7a67855863c281589f39c72
修改了listener方法的参数
3个文件已修改
1个文件已添加
142 ■■■■ 已修改文件
README.md 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -16,9 +16,9 @@
        zookeeper-address:      # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
        acquire-interval: 1000  # 未拉取到消息情况下,获取消息的时间间隔毫秒值
        subscribe: .*\\..*      # 默认情况下拉取所有库、所有表
  prod:
    example: example1
    database: books
prod:
  example: example
  database: books
```
@@ -28,9 +28,11 @@
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.duxinglangzi.canal.starter.annotation.CanalInsertListener;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import org.springframework.stereotype.Service;
import java.util.stream.Collectors;
@@ -44,9 +46,9 @@
public class CanalListenerTest {
    /**
     * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
     *
     * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData
     * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
     *
     * 目前 Listener 方法的参数必须为 com.duxinglangzi.canal.starter.mode.CanalMessage
     * 程序在启动过程中会做检查
     */
@@ -56,51 +58,59 @@
     * 目标是 ${prod.example} 的  ${prod.database} 库  users表
     */
    @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksUsers", eventType, rowData);
    public void listenerExampleBooksUsers(CanalMessage message) {
        printChange("listenerExampleBooksUsers", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  books库  users表
     */
    @CanalUpdateListener(destination = "example", database = "books", table = {"users"})
    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksUsers", eventType, rowData);
    @CanalInsertListener(destination = "example", database = "books", table = {"users"})
    public void listenerExampleBooksUser(CanalMessage message) {
        printChange("listenerExampleBooksUsers", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  books库  books表
     */
    @CanalUpdateListener(destination = "example", database = "books", table = {"books"})
    public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksBooks", eventType, rowData);
    public void listenerExampleBooksBooks(CanalMessage message) {
        printChange("listenerExampleBooksBooks", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  books库的所有表
     */
    @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
    public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksAll", eventType, rowData);
    public void listenerExampleBooksAll(CanalMessage message) {
        printChange("listenerExampleBooksAll", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  所有库的所有表
     */
    @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
    public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleAll", eventType, rowData);
    public void listenerExampleAll(CanalMessage message) {
        printChange("listenerExampleAll", message);
    }
    /**
     * 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表
     */
    @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
    public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerAllDml", eventType, rowData);
    public void listenerAllDml(CanalMessage message) {
        printChange("listenerAllDml", message);
    }
    public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
    public void printChange(String method, CanalMessage message) {
        CanalEntry.EventType eventType = message.getEventType();
        CanalEntry.RowData rowData = message.getRowData();
        System.out.println(" >>>>>>>>>>>>>[当前数据库: "+message.getDataBaseName()+" ," +
                "数据库表名: " + message.getTableName() + " , " +
                "方法: " + method );
        if (eventType == CanalEntry.EventType.DELETE) {
            rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
                System.out.println("[方法: " + method + " ,  delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -2,6 +2,7 @@
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
@@ -12,7 +13,7 @@
import java.util.stream.Collectors;
/**
 * 登记员
 * 监听的终端注册器
 *
 * @author wuqiong 2022/4/11
 */
@@ -42,8 +43,8 @@
    private CanalEntry.EventType[] eventType;
    /**
     * 1、目前实现的 DML 解析器仅支持两个参数 <p>
     * 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData  <p>
     * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
     * 2、方法参数必须为: CanalMessage  <p>
     * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
     *
     * @param sets
@@ -52,11 +53,9 @@
     */
    public void checkParameter(Set<String> sets) {
        List<Class<?>> classes = parameterTypes();
        if (classes.size() > 2
                || classes.get(1) != CanalEntry.RowData.class
                || classes.get(0) != CanalEntry.EventType.class)
        if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
            throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                    "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
                    "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
        if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
            throw new CanalClientException("@CanalListener Illegal destination , please check ");
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -5,6 +5,7 @@
import com.alibaba.otter.canal.protocol.Message;
import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -119,7 +120,7 @@
                            eventType))
                    .forEach(element -> {
                        try {
                            element.getMethod().invoke(element.getBean(), eventType, rowData);
                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java
New file
@@ -0,0 +1,76 @@
package com.duxinglangzi.canal.starter.mode;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.io.Serializable;
/**
 * 监听事件的 返回信息, 主要将 CanalEntry 里不同层级的常用数据组装到同一层级使用.<p>
 * headers 是为了方便获取其他数据,比如
 *
 * @author wuqiong 2022/8/15
 */
public class CanalMessage implements Serializable {
    private static final long serialVersionUID = 730485362580815032L;
    /**
     * 数据库名
     */
    private String dataBaseName;
    /**
     * 表名
     */
    private String tableName;
    /**
     * 发生变化的 事件类型
     */
    private CanalEntry.EventType eventType;
    /**
     * 发生变化的数据
     */
    private CanalEntry.RowData rowData;
    /**
     * 头信息, 包含: sql执行时间、数据库日志文件名、数据库日志文件偏移量 等信息
     */
    private CanalEntry.Header entryHeader;
    /**
     * 构造返回数据信息
     *
     * @param entryHeader 头信息
     * @param eventType   事件类型
     * @param rowData     变化的数据
     * @author wuqiong 2022/8/15 16:18
     */
    public CanalMessage(CanalEntry.Header entryHeader, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        this.entryHeader = entryHeader;
        this.eventType = eventType;
        this.rowData = rowData;
        this.dataBaseName = getEntryHeader().getSchemaName();
        this.tableName = getEntryHeader().getTableName();
    }
    public String getDataBaseName() {
        return dataBaseName;
    }
    public String getTableName() {
        return tableName;
    }
    public CanalEntry.EventType getEventType() {
        return eventType;
    }
    public CanalEntry.RowData getRowData() {
        return rowData;
    }
    public CanalEntry.Header getEntryHeader() {
        return entryHeader;
    }
}