duxinglangzi
2022-06-24 a87aa74a3af27960276ed02f4273386d25d2a231
支持动态参数配置
4个文件已修改
113 ■■■■ 已修改文件
README.md 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java 81 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -41,6 +41,16 @@
     * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData 
     * 程序在启动过程中会做检查
     */
    /**
     * 监控更新操作
     * 支持动态参数配置,配置项需在 yml 或 properties 进行配置
     * 目标是 ${prod.example} 的  ${prod.database} 库  users表
     */
    @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        printChange("listenerExampleBooksUsers",eventType, rowData);
    }
    /**
     * 监控更新操作 ,目标是 example的  books库  users表
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
@@ -11,6 +11,8 @@
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
@@ -25,7 +27,7 @@
 * @author wuqiong 2022/4/11
 */
public class CanalListenerAnnotationBeanPostProcessor implements
        BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor {
        BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class);
@@ -33,6 +35,7 @@
    private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>());
    private ConfigurableListableBeanFactory configurableListableBeanFactory;
    private CanalAutoConfigurationProperties canalAutoConfigurationProperties;
    private ApplicationContext applicationContext;
    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
@@ -47,7 +50,7 @@
            // 先加入到待注册里面
            annotatedMethods.entrySet().stream()
                    .filter(e -> e != null)
                    .forEach(ele -> registrars.add(new CanalListenerEndpointRegistrar(bean, ele)));
                    .forEach(ele -> registrars.add(createRegistrar(bean, ele)));
            logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName,
                    annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet()));
        }
@@ -71,5 +74,15 @@
        this.configurableListableBeanFactory = configurableListableBeanFactory;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
    private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
        return new CanalListenerEndpointRegistrar(bean, entry.getKey(),
                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()),
                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()),
                entry.getValue().table(), entry.getValue().eventType());
    }
}
src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -2,13 +2,11 @@
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -21,7 +19,27 @@
public class CanalListenerEndpointRegistrar {
    private Object bean;
    private Map.Entry<Method, CanalListener> listenerEntry;
    private Method method;
    /**
     * 如果未进行配置,则使用配置文件里全部 destination
     */
    private String destination;
    /**
     * 数据库名
     */
    private String database;
    /**
     * 数据表名
     */
    private String[] table;
    /**
     * 数据变动类型,此处请注意,默认不包含 DDL
     */
    private CanalEntry.EventType[] eventType;
    /**
     * 1、目前实现的 DML 解析器仅支持两个参数 <p>
@@ -39,20 +57,19 @@
                || classes.get(0) != CanalEntry.EventType.class)
            throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                    "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
        if (StringUtils.isNotBlank(getListenerEntry().getValue().destination())
                && !sets.contains(getListenerEntry().getValue().destination()))
        if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
            throw new CanalClientException("@CanalListener Illegal destination , please check ");
    }
    public List<Class<?>> parameterTypes() {
        return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList());
        return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
    }
    public boolean isContainDestination(String destination) {
        if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true;
        return getListenerEntry().getValue().destination().equals(destination);
        if (StringUtils.isBlank(getDestination())) return true;
        return getDestination().equals(destination);
    }
    /**
@@ -66,27 +83,49 @@
     */
    public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
            String database, String tableName, CanalEntry.EventType eventType) {
        Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
                || e.getListenerEntry().getValue().database().equals(database);
        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0
                || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0]))
                || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName));
        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0
                || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType);
        Predicate<CanalListenerEndpointRegistrar> databases =
                e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
                || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
                || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
                || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
        return databases.and(table).and(eventTypes);
    }
    public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
    public CanalListenerEndpointRegistrar(
            Object bean, Method method, String destination,
            String database, String[] tables, CanalEntry.EventType[] eventTypes) {
        this.bean = bean;
        this.listenerEntry = entry;
    }
    public Map.Entry<Method, CanalListener> getListenerEntry() {
        return listenerEntry;
        this.method = method;
        this.destination = destination;
        this.database = database;
        this.table = tables;
        this.eventType = eventTypes;
    }
    public Object getBean() {
        return bean;
    }
    public Method getMethod() {
        return method;
    }
    public String getDestination() {
        return destination;
    }
    public String getDatabase() {
        return database;
    }
    public String[] getTable() {
        return table;
    }
    public CanalEntry.EventType[] getEventType() {
        return eventType;
    }
}
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -29,8 +29,7 @@
    public void initConnect() {
        try {
            // init supportAllTypes
            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
                    Arrays.asList(e.getListenerEntry().getValue().eventType())));
            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
            connector.connect();
            connector.subscribe(endpointInstance.getSubscribe());
            connector.rollback();
@@ -114,7 +113,7 @@
                            eventType))
                    .forEach(element -> {
                        try {
                            element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
                            element.getMethod().invoke(element.getBean(), eventType, rowData);
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);