renxue
2022-10-24 0b055a3f554da3a934e79e88c4781705cbab5a21
修改包下的类名
15 文件已重命名
1个文件已删除
1个文件已添加
3个文件已修改
1930 ■■■■ 已修改文件
README.md 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/annotation/CanalDeleteListener.java 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/annotation/CanalInsertListener.java 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/annotation/CanalListener.java 82 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/annotation/CanalUpdateListener.java 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/annotation/EnableCanalListener.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/configuration/CanalAutoConfigurationProperties.java 354 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/configuration/CanalBootstrapConfiguration.java 70 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/configuration/CanalConfigurationSelector.java 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java 176 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/configuration/CanalListenerEndpointRegistrar.java 260 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/container/AbstractCanalTransponderContainer.java 148 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java 278 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/factory/CanalConnectorFactory.java 100 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/factory/TransponderContainerFactory.java 134 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/listener/ApplicationReadyListener.java 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hz/canal/starter/mode/CanalMessage.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/META-INF/spring-configuration-metadata.json 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/META-INF/spring.factories 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -28,11 +28,11 @@
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.duxinglangzi.canal.starter.annotation.CanalInsertListener;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import CanalInsertListener;
import CanalListener;
import CanalUpdateListener;
import EnableCanalListener;
import CanalMessage;
import org.springframework.stereotype.Service;
import java.util.stream.Collectors;
@@ -48,7 +48,7 @@
    /**
     * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
     *
     * 目前 Listener 方法的参数必须为 com.duxinglangzi.canal.starter.mode.CanalMessage
     * 目前 Listener 方法的参数必须为 CanalMessage
     * 程序在启动过程中会做检查
     */
@@ -107,9 +107,9 @@
        CanalEntry.RowData rowData = message.getRowData();
        System.out.println(" >>>>>>>>>>>>>[当前数据库: "+message.getDataBaseName()+" ," +
        System.out.println(" >>>>>>>>>>>>>[当前数据库: " + message.getDataBaseName() + " ," +
                "数据库表名: " + message.getTableName() + " , " +
                "方法: " + method );
                "方法: " + method);
        if (eventType == CanalEntry.EventType.DELETE) {
            rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java
File was deleted
src/main/java/com/hz/canal/starter/annotation/CanalDeleteListener.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalDeleteListener.java
@@ -1,29 +1,29 @@
package com.duxinglangzi.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@CanalListener(eventType = {CanalEntry.EventType.DELETE})
public @interface CanalDeleteListener {
    @AliasFor(annotation = CanalListener.class)
    String destination();
    @AliasFor(annotation = CanalListener.class)
    String database();
    @AliasFor(annotation = CanalListener.class)
    String[] table();
}
package com.hz.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@CanalListener(eventType = {CanalEntry.EventType.DELETE})
public @interface CanalDeleteListener {
    @AliasFor(annotation = CanalListener.class)
    String destination();
    @AliasFor(annotation = CanalListener.class)
    String database();
    @AliasFor(annotation = CanalListener.class)
    String[] table();
}
src/main/java/com/hz/canal/starter/annotation/CanalInsertListener.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalInsertListener.java
@@ -1,29 +1,29 @@
package com.duxinglangzi.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@CanalListener(eventType = CanalEntry.EventType.INSERT)
public @interface CanalInsertListener {
    @AliasFor(annotation = CanalListener.class)
    String destination();
    @AliasFor(annotation = CanalListener.class)
    String database();
    @AliasFor(annotation = CanalListener.class)
    String[] table();
}
package com.hz.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@CanalListener(eventType = CanalEntry.EventType.INSERT)
public @interface CanalInsertListener {
    @AliasFor(annotation = CanalListener.class)
    String destination();
    @AliasFor(annotation = CanalListener.class)
    String database();
    @AliasFor(annotation = CanalListener.class)
    String[] table();
}
src/main/java/com/hz/canal/starter/annotation/CanalListener.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalListener.java
@@ -1,41 +1,41 @@
package com.duxinglangzi.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface CanalListener {
    /**
     * 如果未进行配置,则使用配置文件里全部 destination
     */
    String destination() default "";
    /**
     * 数据库名
     */
    String database() default "";
    /**
     * 数据表名
     */
    String[] table() default "";
    /**
     * 数据变动类型,此处请注意,默认不包含 DDL
     */
    CanalEntry.EventType[] eventType();
}
package com.hz.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface CanalListener {
    /**
     * 如果未进行配置,则使用配置文件里全部 destination
     */
    String destination() default "";
    /**
     * 数据库名
     */
    String database() default "";
    /**
     * 数据表名
     */
    String[] table() default "";
    /**
     * 数据变动类型,此处请注意,默认不包含 DDL
     */
    CanalEntry.EventType[] eventType();
}
src/main/java/com/hz/canal/starter/annotation/CanalUpdateListener.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalUpdateListener.java
@@ -1,29 +1,29 @@
package com.duxinglangzi.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@CanalListener(eventType = CanalEntry.EventType.UPDATE)
public @interface CanalUpdateListener {
    @AliasFor(annotation = CanalListener.class)
    String destination();
    @AliasFor(annotation = CanalListener.class)
    String database();
    @AliasFor(annotation = CanalListener.class)
    String[] table();
}
package com.hz.canal.starter.annotation;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * @author wuqiong 2022/4/11
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@CanalListener(eventType = CanalEntry.EventType.UPDATE)
public @interface CanalUpdateListener {
    @AliasFor(annotation = CanalListener.class)
    String destination();
    @AliasFor(annotation = CanalListener.class)
    String database();
    @AliasFor(annotation = CanalListener.class)
    String[] table();
}
src/main/java/com/hz/canal/starter/annotation/EnableCanalListener.java
New file
@@ -0,0 +1,21 @@
package com.hz.canal.starter.annotation;
import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.hz.canal.starter.configuration.CanalConfigurationSelector;
import com.hz.canal.starter.listener.ApplicationReadyListener;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
 * 开启 canal listener
 *
 * @author wuqiong 2022/8/4
 */
@Documented
@Inherited
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class})
public @interface EnableCanalListener {
}
src/main/java/com/hz/canal/starter/configuration/CanalAutoConfigurationProperties.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalAutoConfigurationProperties.java
@@ -1,177 +1,177 @@
package com.duxinglangzi.canal.starter.configuration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import java.util.LinkedHashMap;
import java.util.Map;
/**
 * Canal连接的配置类
 *
 * @author wuqiong 2022/4/11
 */
@Order(Ordered.HIGHEST_PRECEDENCE)
@ConfigurationProperties(prefix = "spring.canal")
public class CanalAutoConfigurationProperties {
    private Map<String, EndpointInstance> instances = new LinkedHashMap<>();
    public static class EndpointInstance {
        /**
         * 是否开启 cluster
         */
        private boolean clusterEnabled;
        /**
         * zookeeper 地址, 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
         */
        private String zookeeperAddress;
        /**
         * 默认 127.0.0.1
         */
        private String host = "127.0.0.1";
        /**
         * 端口 , 默认: 11111
         */
        private int port = 11111;
        /**
         * 用户名
         */
        private String userName = "";
        /**
         * 密码
         */
        private String password = "";
        /**
         * 每次获取数据条数 , 默认: 200
         */
        private int batchSize = 200;
        /**
         * 发生错误时重试次数 , 默认: 5
         */
        private int retryCount = 5;
        /**
         * mysql 数据解析关注的表,Perl正则表达式.
         * <p>
         * <p>
         * 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
         * <p>
         * <p>
         * 常见例子: <p>
         * 1.  所有库表:.*   or  .*\\..* <p>
         * 2.  canal_db 下所有表:    canal_db\\..* <p>
         * 3.  canal_db 下的以canal打头的表:   canal_db\\.canal.* <p>
         * 4.  canal_db 下的一张表:  canal_db\\.test1 <p>
         * 5.  多个规则组合使用:canal_db\\..*,mysql_db.test1,mysql.test2 (逗号分隔) <p>
         * <p>
         * 默认: 全库全表(.*\\..*)
         */
        private String subscribe = ".*\\..*";
        /**
         * 未拉取到消息情况下,获取消息的时间间隔毫秒值 , 默认: 1000
         */
        private long acquireInterval = 1000;
        public EndpointInstance() {
        }
        public boolean isClusterEnabled() {
            return clusterEnabled;
        }
        public void setClusterEnabled(boolean clusterEnabled) {
            this.clusterEnabled = clusterEnabled;
        }
        public String getZookeeperAddress() {
            return zookeeperAddress;
        }
        public void setZookeeperAddress(String zookeeperAddress) {
            this.zookeeperAddress = zookeeperAddress;
        }
        public String getHost() {
            return host;
        }
        public void setHost(String host) {
            this.host = host;
        }
        public int getPort() {
            return port;
        }
        public void setPort(int port) {
            this.port = port;
        }
        public String getUserName() {
            return userName;
        }
        public void setUserName(String userName) {
            this.userName = userName;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
        public int getBatchSize() {
            return batchSize;
        }
        public void setBatchSize(int batchSize) {
            this.batchSize = batchSize;
        }
        public int getRetryCount() {
            return retryCount;
        }
        public void setRetryCount(int retryCount) {
            this.retryCount = retryCount;
        }
        public long getAcquireInterval() {
            return acquireInterval;
        }
        public void setAcquireInterval(long acquireInterval) {
            this.acquireInterval = acquireInterval;
        }
        public String getSubscribe() {
            return subscribe;
        }
        public void setSubscribe(String subscribe) {
            this.subscribe = subscribe;
        }
    }
    public Map<String, EndpointInstance> getInstances() {
        return instances;
    }
    public void setInstances(Map<String, EndpointInstance> instances) {
        this.instances = instances;
    }
}
package com.hz.canal.starter.configuration;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import java.util.LinkedHashMap;
import java.util.Map;
/**
 * Canal连接的配置类
 *
 * @author wuqiong 2022/4/11
 */
@Order(Ordered.HIGHEST_PRECEDENCE)
@ConfigurationProperties(prefix = "spring.canal")
public class CanalAutoConfigurationProperties {
    private Map<String, EndpointInstance> instances = new LinkedHashMap<>();
    public static class EndpointInstance {
        /**
         * 是否开启 cluster
         */
        private boolean clusterEnabled;
        /**
         * zookeeper 地址, 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
         */
        private String zookeeperAddress;
        /**
         * 默认 127.0.0.1
         */
        private String host = "127.0.0.1";
        /**
         * 端口 , 默认: 11111
         */
        private int port = 11111;
        /**
         * 用户名
         */
        private String userName = "";
        /**
         * 密码
         */
        private String password = "";
        /**
         * 每次获取数据条数 , 默认: 200
         */
        private int batchSize = 200;
        /**
         * 发生错误时重试次数 , 默认: 5
         */
        private int retryCount = 5;
        /**
         * mysql 数据解析关注的表,Perl正则表达式.
         * <p>
         * <p>
         * 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
         * <p>
         * <p>
         * 常见例子: <p>
         * 1.  所有库表:.*   or  .*\\..* <p>
         * 2.  canal_db 下所有表:    canal_db\\..* <p>
         * 3.  canal_db 下的以canal打头的表:   canal_db\\.canal.* <p>
         * 4.  canal_db 下的一张表:  canal_db\\.test1 <p>
         * 5.  多个规则组合使用:canal_db\\..*,mysql_db.test1,mysql.test2 (逗号分隔) <p>
         * <p>
         * 默认: 全库全表(.*\\..*)
         */
        private String subscribe = ".*\\..*";
        /**
         * 未拉取到消息情况下,获取消息的时间间隔毫秒值 , 默认: 1000
         */
        private long acquireInterval = 1000;
        public EndpointInstance() {
        }
        public boolean isClusterEnabled() {
            return clusterEnabled;
        }
        public void setClusterEnabled(boolean clusterEnabled) {
            this.clusterEnabled = clusterEnabled;
        }
        public String getZookeeperAddress() {
            return zookeeperAddress;
        }
        public void setZookeeperAddress(String zookeeperAddress) {
            this.zookeeperAddress = zookeeperAddress;
        }
        public String getHost() {
            return host;
        }
        public void setHost(String host) {
            this.host = host;
        }
        public int getPort() {
            return port;
        }
        public void setPort(int port) {
            this.port = port;
        }
        public String getUserName() {
            return userName;
        }
        public void setUserName(String userName) {
            this.userName = userName;
        }
        public String getPassword() {
            return password;
        }
        public void setPassword(String password) {
            this.password = password;
        }
        public int getBatchSize() {
            return batchSize;
        }
        public void setBatchSize(int batchSize) {
            this.batchSize = batchSize;
        }
        public int getRetryCount() {
            return retryCount;
        }
        public void setRetryCount(int retryCount) {
            this.retryCount = retryCount;
        }
        public long getAcquireInterval() {
            return acquireInterval;
        }
        public void setAcquireInterval(long acquireInterval) {
            this.acquireInterval = acquireInterval;
        }
        public String getSubscribe() {
            return subscribe;
        }
        public void setSubscribe(String subscribe) {
            this.subscribe = subscribe;
        }
    }
    public Map<String, EndpointInstance> getInstances() {
        return instances;
    }
    public void setInstances(Map<String, EndpointInstance> instances) {
        this.instances = instances;
    }
}
src/main/java/com/hz/canal/starter/configuration/CanalBootstrapConfiguration.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalBootstrapConfiguration.java
@@ -1,35 +1,35 @@
package com.duxinglangzi.canal.starter.configuration;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
/**
 * @author wuqiong 2022/4/12
 */
public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
    public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME =
            "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor";
    /**
     * 注册 CanalListenerAnnotationBeanPostProcessor 到spring bean 容器内
     *
     * @param importingClassMetadata
     * @param registry
     * @return void
     * @author wuqiong 2022-04-23 20:21
     */
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
            registry.registerBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
                    new RootBeanDefinition(CanalListenerAnnotationBeanPostProcessor.class));
        }
    }
}
package com.hz.canal.starter.configuration;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.type.AnnotationMetadata;
/**
 * @author wuqiong 2022/4/12
 */
public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
    public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME =
            "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor";
    /**
     * 注册 CanalListenerAnnotationBeanPostProcessor 到spring bean 容器内
     *
     * @param importingClassMetadata
     * @param registry
     * @return void
     * @author wuqiong 2022-04-23 20:21
     */
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
            registry.registerBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
                    new RootBeanDefinition(CanalListenerAnnotationBeanPostProcessor.class));
        }
    }
}
src/main/java/com/hz/canal/starter/configuration/CanalConfigurationSelector.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalConfigurationSelector.java
@@ -1,22 +1,22 @@
package com.duxinglangzi.canal.starter.configuration;
import org.springframework.context.annotation.DeferredImportSelector;
import org.springframework.core.type.AnnotationMetadata;
/**
 * @author wuqiong 2022/4/12
 */
public class CanalConfigurationSelector implements DeferredImportSelector {
    /**
     * 扫描器 导入指定类, 这里导入 CanalBootstrapConfiguration.class
     *
     * @param importingClassMetadata
     * @return java.lang.String[]
     * @author wuqiong 2022-04-23 20:20
     */
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[]{CanalBootstrapConfiguration.class.getName()};
    }
}
package com.hz.canal.starter.configuration;
import org.springframework.context.annotation.DeferredImportSelector;
import org.springframework.core.type.AnnotationMetadata;
/**
 * @author wuqiong 2022/4/12
 */
public class CanalConfigurationSelector implements DeferredImportSelector {
    /**
     * 扫描器 导入指定类, 这里导入 CanalBootstrapConfiguration.class
     *
     * @param importingClassMetadata
     * @return java.lang.String[]
     * @author wuqiong 2022-04-23 20:20
     */
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[]{CanalBootstrapConfiguration.class.getName()};
    }
}
src/main/java/com/hz/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
@@ -1,88 +1,88 @@
package com.duxinglangzi.canal.starter.configuration;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import com.duxinglangzi.canal.starter.factory.TransponderContainerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
 * @author wuqiong 2022/4/11
 */
public class CanalListenerAnnotationBeanPostProcessor implements
        BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class);
    private final Set<Class<?>> notAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(256));
    private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>());
    private ConfigurableListableBeanFactory configurableListableBeanFactory;
    private CanalAutoConfigurationProperties canalAutoConfigurationProperties;
    private ApplicationContext applicationContext;
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (notAnnotatedClasses.contains(bean.getClass())) return bean;
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        // 只扫描类的方法,目前 CanalListener 只支持在方法上
        Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method));
        if (annotatedMethods.isEmpty()) {
            this.notAnnotatedClasses.add(bean.getClass());
        } else {
            // 先加入到待注册里面
            annotatedMethods.entrySet().stream()
                    .filter(e -> e != null)
                    .forEach(ele -> registrars.add(createRegistrar(bean, ele)));
            logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName,
                    annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet()));
        }
        return bean;
    }
    private CanalListener findListenerAnnotations(Method method) {
        return AnnotatedElementUtils.findMergedAnnotation(method, CanalListener.class);
    }
    @Override
    public void afterSingletonsInstantiated() {
        canalAutoConfigurationProperties = configurableListableBeanFactory.getBean(CanalAutoConfigurationProperties.class);
        TransponderContainerFactory.registerListenerContainer(configurableListableBeanFactory, canalAutoConfigurationProperties, registrars);
    }
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        this.configurableListableBeanFactory = configurableListableBeanFactory;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
    private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
        return new CanalListenerEndpointRegistrar(bean, entry.getKey(),
                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()),
                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()),
                entry.getValue().table(), entry.getValue().eventType());
    }
}
package com.hz.canal.starter.configuration;
import com.hz.canal.starter.annotation.CanalListener;
import com.hz.canal.starter.factory.TransponderContainerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
 * @author wuqiong 2022/4/11
 */
public class CanalListenerAnnotationBeanPostProcessor implements
        BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class);
    private final Set<Class<?>> notAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(256));
    private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>());
    private ConfigurableListableBeanFactory configurableListableBeanFactory;
    private CanalAutoConfigurationProperties canalAutoConfigurationProperties;
    private ApplicationContext applicationContext;
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (notAnnotatedClasses.contains(bean.getClass())) return bean;
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        // 只扫描类的方法,目前 CanalListener 只支持在方法上
        Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method));
        if (annotatedMethods.isEmpty()) {
            this.notAnnotatedClasses.add(bean.getClass());
        } else {
            // 先加入到待注册里面
            annotatedMethods.entrySet().stream()
                    .filter(e -> e != null)
                    .forEach(ele -> registrars.add(createRegistrar(bean, ele)));
            logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName,
                    annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet()));
        }
        return bean;
    }
    private CanalListener findListenerAnnotations(Method method) {
        return AnnotatedElementUtils.findMergedAnnotation(method, CanalListener.class);
    }
    @Override
    public void afterSingletonsInstantiated() {
        canalAutoConfigurationProperties = configurableListableBeanFactory.getBean(CanalAutoConfigurationProperties.class);
        TransponderContainerFactory.registerListenerContainer(configurableListableBeanFactory, canalAutoConfigurationProperties, registrars);
    }
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
        this.configurableListableBeanFactory = configurableListableBeanFactory;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
    private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
        return new CanalListenerEndpointRegistrar(bean, entry.getKey(),
                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()),
                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()),
                entry.getValue().table(), entry.getValue().eventType());
    }
}
src/main/java/com/hz/canal/starter/configuration/CanalListenerEndpointRegistrar.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -1,130 +1,130 @@
package com.duxinglangzi.canal.starter.configuration;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
 * 监听的终端注册器
 *
 * @author wuqiong 2022/4/11
 */
public class CanalListenerEndpointRegistrar {
    private Object bean;
    private Method method;
    /**
     * 如果未进行配置,则使用配置文件里全部 destination
     */
    private String destination;
    /**
     * 数据库名
     */
    private String database;
    /**
     * 数据表名
     */
    private String[] table;
    /**
     * 数据变动类型,此处请注意,默认不包含 DDL
     */
    private CanalEntry.EventType[] eventType;
    /**
     * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
     * 2、方法参数必须为: CanalMessage  <p>
     * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
     *
     * @param sets
     * @return void
     * @author wuqiong 2022-04-23 20:27
     */
    public void checkParameter(Set<String> sets) {
        List<Class<?>> classes = parameterTypes();
        if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
            throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                    "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
        if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
            throw new CanalClientException("@CanalListener Illegal destination  " + getDestination() + ", please check ");
    }
    public List<Class<?>> parameterTypes() {
        return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
    }
    public boolean isContainDestination(String destination) {
        if (StringUtils.isBlank(getDestination())) return true;
        return getDestination().equals(destination);
    }
    /**
     * 过滤参数
     *
     * @param database
     * @param tableName
     * @param eventType
     * @return java.util.function.Predicate
     * @author wuqiong 2022-04-23 20:47
     */
    public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
            String database, String tableName, CanalEntry.EventType eventType) {
        Predicate<CanalListenerEndpointRegistrar> databases =
                e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
                || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
                || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
                || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
        return databases.and(table).and(eventTypes);
    }
    public CanalListenerEndpointRegistrar(
            Object bean, Method method, String destination,
            String database, String[] tables, CanalEntry.EventType[] eventTypes) {
        this.bean = bean;
        this.method = method;
        this.destination = destination;
        this.database = database;
        this.table = tables;
        this.eventType = eventTypes;
    }
    public Object getBean() {
        return bean;
    }
    public Method getMethod() {
        return method;
    }
    public String getDestination() {
        return destination;
    }
    public String getDatabase() {
        return database;
    }
    public String[] getTable() {
        return table;
    }
    public CanalEntry.EventType[] getEventType() {
        return eventType;
    }
}
package com.hz.canal.starter.configuration;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.hz.canal.starter.mode.CanalMessage;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
 * 监听的终端注册器
 *
 * @author wuqiong 2022/4/11
 */
public class CanalListenerEndpointRegistrar {
    private Object bean;
    private Method method;
    /**
     * 如果未进行配置,则使用配置文件里全部 destination
     */
    private String destination;
    /**
     * 数据库名
     */
    private String database;
    /**
     * 数据表名
     */
    private String[] table;
    /**
     * 数据变动类型,此处请注意,默认不包含 DDL
     */
    private CanalEntry.EventType[] eventType;
    /**
     * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
     * 2、方法参数必须为: CanalMessage  <p>
     * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
     *
     * @param sets
     * @return void
     * @author wuqiong 2022-04-23 20:27
     */
    public void checkParameter(Set<String> sets) {
        List<Class<?>> classes = parameterTypes();
        if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
            throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                    "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
        if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
            throw new CanalClientException("@CanalListener Illegal destination  " + getDestination() + ", please check ");
    }
    public List<Class<?>> parameterTypes() {
        return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
    }
    public boolean isContainDestination(String destination) {
        if (StringUtils.isBlank(getDestination())) return true;
        return getDestination().equals(destination);
    }
    /**
     * 过滤参数
     *
     * @param database
     * @param tableName
     * @param eventType
     * @return java.util.function.Predicate
     * @author wuqiong 2022-04-23 20:47
     */
    public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
            String database, String tableName, CanalEntry.EventType eventType) {
        Predicate<CanalListenerEndpointRegistrar> databases =
                e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
                || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
                || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
                || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
        return databases.and(table).and(eventTypes);
    }
    public CanalListenerEndpointRegistrar(
            Object bean, Method method, String destination,
            String database, String[] tables, CanalEntry.EventType[] eventTypes) {
        this.bean = bean;
        this.method = method;
        this.destination = destination;
        this.database = database;
        this.table = tables;
        this.eventType = eventTypes;
    }
    public Object getBean() {
        return bean;
    }
    public Method getMethod() {
        return method;
    }
    public String getDestination() {
        return destination;
    }
    public String getDatabase() {
        return database;
    }
    public String[] getTable() {
        return table;
    }
    public CanalEntry.EventType[] getEventType() {
        return eventType;
    }
}
src/main/java/com/hz/canal/starter/container/AbstractCanalTransponderContainer.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java
@@ -1,74 +1,74 @@
package com.duxinglangzi.canal.starter.container;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener;
import org.springframework.context.SmartLifecycle;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
 * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理
 *
 * @author wuqiong 2022/4/11
 */
public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
    protected boolean isRunning = false;
    protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L;
    protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES =
            Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN,
                    CanalEntry.EntryType.TRANSACTIONEND,
                    CanalEntry.EntryType.HEARTBEAT);
    protected abstract void doStart();
    protected abstract void initConnect();
    protected abstract void disconnect();
    @Override
    public void start() {
        new Thread(() -> {
            // spring 启动后 才会进行canal数据拉取
            while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
                sleep(5L * SLEEP_TIME_MILLI_SECONDS);
            initConnect();
            while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
            disconnect(); // 线程被终止或者容器已经停止,需要关闭连接
        }).start();
        setRunning(true);
    }
    @Override
    public void stop() {
        setRunning(false);
    }
    @Override
    public void stop(Runnable callback) {
        callback.run();
        setRunning(false);
        sleep(SLEEP_TIME_MILLI_SECONDS);
    }
    @Override
    public boolean isRunning() {
        return isRunning;
    }
    protected void setRunning(boolean bool) {
        isRunning = bool;
    }
    protected void sleep(long sleepTimeMilliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds);
            if (!isRunning()) Thread.currentThread().interrupt();
        } catch (InterruptedException e) {
        }
    }
}
package com.hz.canal.starter.container;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.hz.canal.starter.listener.ApplicationReadyListener;
import org.springframework.context.SmartLifecycle;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
 * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理
 *
 * @author wuqiong 2022/4/11
 */
public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
    protected boolean isRunning = false;
    protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L;
    protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES =
            Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN,
                    CanalEntry.EntryType.TRANSACTIONEND,
                    CanalEntry.EntryType.HEARTBEAT);
    protected abstract void doStart();
    protected abstract void initConnect();
    protected abstract void disconnect();
    @Override
    public void start() {
        new Thread(() -> {
            // spring 启动后 才会进行canal数据拉取
            while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
                sleep(5L * SLEEP_TIME_MILLI_SECONDS);
            initConnect();
            while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
            disconnect(); // 线程被终止或者容器已经停止,需要关闭连接
        }).start();
        setRunning(true);
    }
    @Override
    public void stop() {
        setRunning(false);
    }
    @Override
    public void stop(Runnable callback) {
        callback.run();
        setRunning(false);
        sleep(SLEEP_TIME_MILLI_SECONDS);
    }
    @Override
    public boolean isRunning() {
        return isRunning;
    }
    protected void setRunning(boolean bool) {
        isRunning = bool;
    }
    protected void sleep(long sleepTimeMilliSeconds) {
        try {
            TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds);
            if (!isRunning()) Thread.currentThread().interrupt();
        } catch (InterruptedException e) {
        }
    }
}
src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -1,139 +1,139 @@
package com.duxinglangzi.canal.starter.container;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
/**
 * DML 数据拉取、解析
 *
 * @author wuqiong 2022/4/11
 */
public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
    private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
    private CanalConnector connector;
    private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
    private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
    private Integer local_retry_count;
    public void initConnect() {
        try {
            // init supportAllTypes
            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
            connector.connect();
            connector.subscribe(endpointInstance.getSubscribe());
            connector.rollback();
            // 初始化本地重试次数
            local_retry_count = endpointInstance.getRetryCount();
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
            setRunning(false);
        }
    }
    public void disconnect() {
        // 关闭连接
        connector.disconnect();
    }
    public void doStart() {
        try {
            Message message = null;
            try {
                message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
            } catch (Exception clientException) {
                logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
                if (local_retry_count > 0) {
                    // 重试次数减一
                    local_retry_count = local_retry_count - 1;
                    sleep(endpointInstance.getAcquireInterval());
                } else {
                    // 重试次数 <= 0 时,直接终止线程
                    logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                    "thread interrupt , current connector host: {} , port: {} ",
                            endpointInstance.getHost(), endpointInstance.getPort());
                    Thread.currentThread().interrupt();
                }
                return;
            }
            // 如果重试次数小于设置的,则修改
            if (local_retry_count < endpointInstance.getRetryCount())
                local_retry_count = endpointInstance.getRetryCount();
            List<CanalEntry.Entry> entries = message.getEntries();
            if (message.getId() == -1 || entries.isEmpty()) {
                sleep(endpointInstance.getAcquireInterval());
                return;
            }
            for (CanalEntry.Entry entry : entries) {
                try {
                    consumer(entry);
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
                    // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
                    // return;
                }
            }
            connector.ack(message.getId()); // 提交确认
        } catch (Exception exc) {
            logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
            // 防止删除消息时发生错误,或者拉取消息失败等情况
            exc.printStackTrace();
        }
    }
    public DmlMessageTransponderContainer(
            CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
            CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
        this.connector = connector;
        this.registrars.addAll(registrars);
        this.endpointInstance = endpointInstance;
    }
    private void consumer(CanalEntry.Entry entry) {
        if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
            throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
        }
        // 忽略 ddl 语句
        if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
        CanalEntry.EventType eventType = rowChange.getEventType();
        if (!support_all_types.contains(eventType)) return;
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            registrars
                    .stream()
                    .filter(CanalListenerEndpointRegistrar.filterArgs(
                            entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(),
                            eventType))
                    .forEach(element -> {
                        try {
                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
                        }
                    });
        }
    }
}
package com.hz.canal.starter.container;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar;
import com.hz.canal.starter.mode.CanalMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
/**
 * DML 数据拉取、解析
 *
 * @author wuqiong 2022/4/11
 */
public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
    private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
    private CanalConnector connector;
    private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
    private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
    private Integer local_retry_count;
    public void initConnect() {
        try {
            // init supportAllTypes
            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
            connector.connect();
            connector.subscribe(endpointInstance.getSubscribe());
            connector.rollback();
            // 初始化本地重试次数
            local_retry_count = endpointInstance.getRetryCount();
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
            setRunning(false);
        }
    }
    public void disconnect() {
        // 关闭连接
        connector.disconnect();
    }
    public void doStart() {
        try {
            Message message = null;
            try {
                message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
            } catch (Exception clientException) {
                logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
                if (local_retry_count > 0) {
                    // 重试次数减一
                    local_retry_count = local_retry_count - 1;
                    sleep(endpointInstance.getAcquireInterval());
                } else {
                    // 重试次数 <= 0 时,直接终止线程
                    logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                    "thread interrupt , current connector host: {} , port: {} ",
                            endpointInstance.getHost(), endpointInstance.getPort());
                    Thread.currentThread().interrupt();
                }
                return;
            }
            // 如果重试次数小于设置的,则修改
            if (local_retry_count < endpointInstance.getRetryCount())
                local_retry_count = endpointInstance.getRetryCount();
            List<CanalEntry.Entry> entries = message.getEntries();
            if (message.getId() == -1 || entries.isEmpty()) {
                sleep(endpointInstance.getAcquireInterval());
                return;
            }
            for (CanalEntry.Entry entry : entries) {
                try {
                    consumer(entry);
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
                    // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
                    // return;
                }
            }
            connector.ack(message.getId()); // 提交确认
        } catch (Exception exc) {
            logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
            // 防止删除消息时发生错误,或者拉取消息失败等情况
            exc.printStackTrace();
        }
    }
    public DmlMessageTransponderContainer(
            CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
            CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
        this.connector = connector;
        this.registrars.addAll(registrars);
        this.endpointInstance = endpointInstance;
    }
    private void consumer(CanalEntry.Entry entry) {
        if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
            throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
        }
        // 忽略 ddl 语句
        if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
        CanalEntry.EventType eventType = rowChange.getEventType();
        if (!support_all_types.contains(eventType)) return;
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            registrars
                    .stream()
                    .filter(CanalListenerEndpointRegistrar.filterArgs(
                            entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(),
                            eventType))
                    .forEach(element -> {
                        try {
                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
                        }
                    });
        }
    }
}
src/main/java/com/hz/canal/starter/factory/CanalConnectorFactory.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/factory/CanalConnectorFactory.java
@@ -1,50 +1,50 @@
package com.duxinglangzi.canal.starter.factory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
public class CanalConnectorFactory {
    /**
     * 创建 CanalConnector
     *
     * @param destination
     * @param endpointInstance
     * @return CanalConnector
     * @author wuqiong 2022-04-23 20:36
     */
    public static synchronized CanalConnector createConnector(
            String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
        Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
        Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check ");
        CanalConnector connector;
        if (endpointInstance.isClusterEnabled()) {
            if (!StringUtils.hasText(endpointInstance.getZookeeperAddress()))
                throw new CanalClientException("zookeeper address is null");
            List<SocketAddress> addresses = new ArrayList<>();
            for (String s : endpointInstance.getZookeeperAddress().split(",")) {
                String[] split = s.split(":");
                if (split.length != 2)
                    throw new CanalClientException("error parsing zookeeper address:" + s);
                addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
            }
            connector = CanalConnectors.newClusterConnector(
                    addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword());
        } else {
            connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()),
                    destination, endpointInstance.getUserName(), endpointInstance.getPassword());
        }
        return connector;
    }
}
package com.hz.canal.starter.factory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
public class CanalConnectorFactory {
    /**
     * 创建 CanalConnector
     *
     * @param destination
     * @param endpointInstance
     * @return CanalConnector
     * @author wuqiong 2022-04-23 20:36
     */
    public static synchronized CanalConnector createConnector(
            String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
        Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
        Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check ");
        CanalConnector connector;
        if (endpointInstance.isClusterEnabled()) {
            if (!StringUtils.hasText(endpointInstance.getZookeeperAddress()))
                throw new CanalClientException("zookeeper address is null");
            List<SocketAddress> addresses = new ArrayList<>();
            for (String s : endpointInstance.getZookeeperAddress().split(",")) {
                String[] split = s.split(":");
                if (split.length != 2)
                    throw new CanalClientException("error parsing zookeeper address:" + s);
                addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
            }
            connector = CanalConnectors.newClusterConnector(
                    addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword());
        } else {
            connector = CanalConnectors.newSingleConnector(
                    new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()),
                    destination, endpointInstance.getUserName(), endpointInstance.getPassword());
        }
        return connector;
    }
}
src/main/java/com/hz/canal/starter/factory/TransponderContainerFactory.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/factory/TransponderContainerFactory.java
@@ -1,67 +1,67 @@
package com.duxinglangzi.canal.starter.factory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
import com.duxinglangzi.canal.starter.container.DmlMessageTransponderContainer;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * @author wuqiong 2022/4/18
 * @description
 */
public class TransponderContainerFactory {
    private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#";
    /**
     * 将所有待注册的端点,注册到spring中
     *
     * @param beanFactory
     * @param canalConfig
     * @param registrars
     * @return void
     * @author wuqiong 2022-04-23 20:34
     */
    public static void registerListenerContainer(
            ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig,
            Set<CanalListenerEndpointRegistrar> registrars) {
        if (registrars == null || registrars.isEmpty()) return;
        if (canalConfig == null || canalConfig.getInstances().isEmpty()) return;
        for (Map.Entry<String, CanalAutoConfigurationProperties.EndpointInstance> endpointInstance : canalConfig.getInstances().entrySet()) {
            if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 如果已经存在则不在创建
            List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>();
            for (CanalListenerEndpointRegistrar registrar : registrars) {
                registrar.checkParameter(canalConfig.getInstances().keySet());
                if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
                registrarList.add(registrar);
            }
            if (registrarList.isEmpty()) continue;
            registerTransponderContainer(
                    endpointInstance.getKey(), endpointInstance.getValue(), beanFactory, registrarList);
        }
    }
    private static void registerTransponderContainer(
            String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance,
            ConfigurableListableBeanFactory beanFactory, List<CanalListenerEndpointRegistrar> registrarList) {
        CanalConnector connector = CanalConnectorFactory.createConnector(destination, endpointInstance);
        beanFactory.registerSingleton(getContainerID(destination),
                new DmlMessageTransponderContainer(connector, registrarList, endpointInstance));
    }
    private static String getContainerID(String destination) {
        return CONTAINER_ID_PREFIX + "#" + destination;
    }
}
package com.hz.canal.starter.factory;
import com.alibaba.otter.canal.client.CanalConnector;
import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar;
import com.hz.canal.starter.container.DmlMessageTransponderContainer;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * @author wuqiong 2022/4/18
 * @description
 */
public class TransponderContainerFactory {
    private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#";
    /**
     * 将所有待注册的端点,注册到spring中
     *
     * @param beanFactory
     * @param canalConfig
     * @param registrars
     * @return void
     * @author wuqiong 2022-04-23 20:34
     */
    public static void registerListenerContainer(
            ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig,
            Set<CanalListenerEndpointRegistrar> registrars) {
        if (registrars == null || registrars.isEmpty()) return;
        if (canalConfig == null || canalConfig.getInstances().isEmpty()) return;
        for (Map.Entry<String, CanalAutoConfigurationProperties.EndpointInstance> endpointInstance : canalConfig.getInstances().entrySet()) {
            if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 如果已经存在则不在创建
            List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>();
            for (CanalListenerEndpointRegistrar registrar : registrars) {
                registrar.checkParameter(canalConfig.getInstances().keySet());
                if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
                registrarList.add(registrar);
            }
            if (registrarList.isEmpty()) continue;
            registerTransponderContainer(
                    endpointInstance.getKey(), endpointInstance.getValue(), beanFactory, registrarList);
        }
    }
    private static void registerTransponderContainer(
            String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance,
            ConfigurableListableBeanFactory beanFactory, List<CanalListenerEndpointRegistrar> registrarList) {
        CanalConnector connector = CanalConnectorFactory.createConnector(destination, endpointInstance);
        beanFactory.registerSingleton(getContainerID(destination),
                new DmlMessageTransponderContainer(connector, registrarList, endpointInstance));
    }
    private static String getContainerID(String destination) {
        return CONTAINER_ID_PREFIX + "#" + destination;
    }
}
src/main/java/com/hz/canal/starter/listener/ApplicationReadyListener.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/listener/ApplicationReadyListener.java
@@ -1,20 +1,20 @@
package com.duxinglangzi.canal.starter.listener;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import java.util.concurrent.atomic.AtomicBoolean;
/**
 * @author wuqiong 2022/4/16
 */
public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent> {
    public static final AtomicBoolean START_LISTENER_CONTAINER = new AtomicBoolean(false);
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        // 确保程序启动之后,再放行所有的 canal transponder
        if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true);
    }
}
package com.hz.canal.starter.listener;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import java.util.concurrent.atomic.AtomicBoolean;
/**
 * @author wuqiong 2022/4/16
 */
public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent> {
    public static final AtomicBoolean START_LISTENER_CONTAINER = new AtomicBoolean(false);
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        // 确保程序启动之后,再放行所有的 canal transponder
        if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true);
    }
}
src/main/java/com/hz/canal/starter/mode/CanalMessage.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java
@@ -1,4 +1,4 @@
package com.duxinglangzi.canal.starter.mode;
package com.hz.canal.starter.mode;
import com.alibaba.otter.canal.protocol.CanalEntry;
src/main/resources/META-INF/spring-configuration-metadata.json
@@ -1,9 +1,9 @@
{
  "properties": [
    {
      "sourceType": "com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties",
      "sourceType": "com.hz.canal.starter.configuration.CanalAutoConfigurationProperties",
      "name": "spring.canal.instances",
      "type": "java.util.Map<java.lang.String,com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties.EndpointInstance>",
      "type": "java.util.Map<java.lang.String,com.hz.canal.starter.configuration.CanalAutoConfigurationProperties.EndpointInstance>",
      "description": "canal config "
    }
  ]
src/main/resources/META-INF/spring.factories
@@ -1,9 +1,9 @@
# Auto Configure
# org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
# com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\
# com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector
# CanalAutoConfigurationProperties,\
# CanalConfigurationSelector
# Application Listeners
# org.springframework.context.ApplicationListener=\
# com.duxinglangzi.canal.starter.listener.ApplicationReadyListener
# ApplicationReadyListener