duxinglangzi
2022-04-23 6274208525b7e80c208f614915ef973d63834101
修改部分注释内容
11个文件已修改
68 ■■■■ 已修改文件
README.md 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalAutoConfigurationProperties.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalBootstrapConfiguration.java 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalConfigurationSelector.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/factory/CanalConnectorFactory.java 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/factory/TransponderContainerFactory.java 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/listener/ApplicationReadyListener.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -38,7 +38,7 @@
public class CanalListenerTest {
    /**
     * 目前 Listener 方法的参数必须为 CanalEntry.EventType eventType, CanalEntry.RowData rowData
     * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData
     * 程序在启动过程中会做检查
     */
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalAutoConfigurationProperties.java
@@ -8,8 +8,8 @@
import java.util.Map;
/**
 * Canal连接的配置类
 * @author wuqiong 2022/4/11
 * @description
 */
@Order(Ordered.HIGHEST_PRECEDENCE)
@ConfigurationProperties(prefix = "spring.canal")
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalBootstrapConfiguration.java
@@ -7,13 +7,19 @@
/**
 * @author wuqiong 2022/4/12
 * @description
 */
public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
    public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME =
            "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor";
    /**
     * 注册 CanalListenerAnnotationBeanPostProcessor 到spring bean 容器内
     * @param importingClassMetadata
     * @param registry
     * @return void
     * @author wuqiong 2022-04-23 20:21
     */
    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalConfigurationSelector.java
@@ -5,10 +5,15 @@
/**
 * @author wuqiong 2022/4/12
 * @description
 */
public class CanalConfigurationSelector implements DeferredImportSelector {
    /**
     * 扫描器 导入指定类, 这里导入 CanalBootstrapConfiguration.class
     * @param importingClassMetadata
     * @return java.lang.String[]
     * @author wuqiong 2022-04-23 20:20
     */
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[]{CanalBootstrapConfiguration.class.getName()};
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
@@ -23,7 +23,6 @@
/**
 * @author wuqiong 2022/4/11
 * @description
 */
public class CanalListenerAnnotationBeanPostProcessor implements
        BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor {
@@ -39,15 +38,17 @@
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        if (notAnnotatedClasses.contains(bean.getClass())) return bean;
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        // 只扫描类的方法,目前 CanalListener 只支持在方法上
        Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method));
        if (annotatedMethods.isEmpty()) {
            this.notAnnotatedClasses.add(bean.getClass());
        } else {
            // 先加入到待注册里面
            annotatedMethods.entrySet().stream()
                    .filter(e -> e != null)
                    .forEach(ele -> registrars.add(new CanalListenerEndpointRegistrar(bean, ele)));
            logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", bean.getClass().getName(),
            logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName,
                    annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet()));
        }
        return bean;
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -14,14 +14,22 @@
import java.util.stream.Collectors;
/**
 * 登记员
 * @author wuqiong 2022/4/11
 * @description
 */
public class CanalListenerEndpointRegistrar {
    private Object bean;
    private Map.Entry<Method, CanalListener> listenerEntry;
    /**
     * 1、目前实现的 DML 解析器仅支持两个参数 <p>
     * 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData  <p>
     * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
     * @param sets
     * @return void
     * @author wuqiong 2022-04-23 20:27
     */
    public void checkParameter(Set<String> sets) {
        List<Class<?>> classes = parameterTypes();
        if (classes.size() > 2
@@ -45,6 +53,14 @@
        return getListenerEntry().getValue().destination().equals(destination);
    }
    /**
     * 过滤参数
     * @param database
     * @param tableName
     * @param eventType
     * @return java.util.function.Predicate
     * @author wuqiong 2022-04-23 20:47
     */
    public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
            String database, String tableName, CanalEntry.EventType eventType) {
        Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java
@@ -9,8 +9,8 @@
import java.util.concurrent.TimeUnit;
/**
 * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理
 * @author wuqiong 2022/4/11
 * @description
 */
public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
    protected boolean isRunning = false;
@@ -26,13 +26,14 @@
    @Override
    public void start() {
        setRunning(true);
        new Thread(() -> {
            // spring 启动后 才会进行canal数据拉取
            while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
                sleep(5L * SLEEP_TIME_MILLI_SECONDS);
            initConnect();
            while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
        }).start();
        setRunning(true);
    }
    @Override
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -12,8 +12,8 @@
import java.util.*;
/**
 * DML 数据拉取、解析
 * @author wuqiong 2022/4/11
 * @description
 */
public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
src/main/java/com/duxinglangzi/canal/starter/factory/CanalConnectorFactory.java
@@ -15,6 +15,13 @@
public class CanalConnectorFactory {
    /**
     * 创建 CanalConnector
     * @param destination
     * @param endpointInstance
     * @return CanalConnector
     * @author wuqiong 2022-04-23 20:36
     */
    public static synchronized CanalConnector createConnector(
            String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
        Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
src/main/java/com/duxinglangzi/canal/starter/factory/TransponderContainerFactory.java
@@ -20,6 +20,14 @@
    private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#";
    /**
     * 将所有待注册的端点,注册到spring中
     * @param beanFactory
     * @param canalConfig
     * @param registrars
     * @return void
     * @author wuqiong 2022-04-23 20:34
     */
    public static void registerListenerContainer(
            ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig,
            Set<CanalListenerEndpointRegistrar> registrars) {
@@ -30,8 +38,8 @@
            if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 如果已经存在则不在创建
            List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>();
            for (CanalListenerEndpointRegistrar registrar : registrars) {
                if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
                registrar.checkParameter(canalConfig.getInstances().keySet());
                if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
                registrarList.add(registrar);
            }
            if (registrarList.isEmpty()) continue;
src/main/java/com/duxinglangzi/canal/starter/listener/ApplicationReadyListener.java
@@ -7,7 +7,6 @@
/**
 * @author wuqiong 2022/4/16
 * @description
 */
public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent>{
@@ -15,6 +14,7 @@
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        // 确保程序启动之后,再放行所有的 canal transponder
        if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true);
    }
}