duxinglangzi
2022-08-15 c83c1b4ee7ff9c01a7a67855863c281589f39c72
README.md
@@ -16,9 +16,9 @@
        zookeeper-address:      # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
        acquire-interval: 1000  # 未拉取到消息情况下,获取消息的时间间隔毫秒值
        subscribe: .*\\..*      # 默认情况下拉取所有库、所有表
  prod:
    example: example1
    database: books
prod:
  example: example
  database: books
```
@@ -28,9 +28,11 @@
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.duxinglangzi.canal.starter.annotation.CanalInsertListener;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import org.springframework.stereotype.Service;
import java.util.stream.Collectors;
@@ -44,9 +46,9 @@
public class CanalListenerTest {
    /**
     * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
     *
     * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData
     * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
     *
     * 目前 Listener 方法的参数必须为 com.duxinglangzi.canal.starter.mode.CanalMessage
     * 程序在启动过程中会做检查
     */
@@ -56,51 +58,59 @@
     * 目标是 ${prod.example} 的  ${prod.database} 库  users表
     */
    @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksUsers", eventType, rowData);
    public void listenerExampleBooksUsers(CanalMessage message) {
        printChange("listenerExampleBooksUsers", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  books库  users表
     */
    @CanalUpdateListener(destination = "example", database = "books", table = {"users"})
    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksUsers", eventType, rowData);
    @CanalInsertListener(destination = "example", database = "books", table = {"users"})
    public void listenerExampleBooksUser(CanalMessage message) {
        printChange("listenerExampleBooksUsers", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  books库  books表
     */
    @CanalUpdateListener(destination = "example", database = "books", table = {"books"})
    public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksBooks", eventType, rowData);
    public void listenerExampleBooksBooks(CanalMessage message) {
        printChange("listenerExampleBooksBooks", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  books库的所有表
     */
    @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
    public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksAll", eventType, rowData);
    public void listenerExampleBooksAll(CanalMessage message) {
        printChange("listenerExampleBooksAll", message);
    }
    /**
     * 监控更新操作 ,目标是 example的  所有库的所有表
     */
    @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
    public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleAll", eventType, rowData);
    public void listenerExampleAll(CanalMessage message) {
        printChange("listenerExampleAll", message);
    }
    /**
     * 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表
     */
    @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
    public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerAllDml", eventType, rowData);
    public void listenerAllDml(CanalMessage message) {
        printChange("listenerAllDml", message);
    }
    public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
    public void printChange(String method, CanalMessage message) {
        CanalEntry.EventType eventType = message.getEventType();
        CanalEntry.RowData rowData = message.getRowData();
        System.out.println(" >>>>>>>>>>>>>[当前数据库: "+message.getDataBaseName()+" ," +
                "数据库表名: " + message.getTableName() + " , " +
                "方法: " + method );
        if (eventType == CanalEntry.EventType.DELETE) {
            rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
                System.out.println("[方法: " + method + " ,  delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());