README.md
@@ -41,6 +41,16 @@ * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData * 程序在启动过程中会做检查 */ /** * 监控更新操作 * 支持动态参数配置,配置项需在 yml 或 properties 进行配置 * 目标是 ${prod.example} 的 ${prod.database} 库 users表 */ @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"}) public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksUsers",eventType, rowData); } /** * 监控更新操作 ,目标是 example的 books库 users表 src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
@@ -11,6 +11,8 @@ import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.core.MethodIntrospector; import org.springframework.core.annotation.AnnotatedElementUtils; @@ -25,7 +27,7 @@ * @author wuqiong 2022/4/11 */ public class CanalListenerAnnotationBeanPostProcessor implements BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor { BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class); @@ -33,6 +35,7 @@ private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>()); private ConfigurableListableBeanFactory configurableListableBeanFactory; private CanalAutoConfigurationProperties canalAutoConfigurationProperties; private ApplicationContext applicationContext; @Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { @@ -47,7 +50,7 @@ // 先加入到待注册里面 annotatedMethods.entrySet().stream() .filter(e -> e != null) .forEach(ele -> registrars.add(new CanalListenerEndpointRegistrar(bean, ele))); .forEach(ele -> registrars.add(createRegistrar(bean, ele))); logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName, annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet())); } @@ -71,5 +74,15 @@ this.configurableListableBeanFactory = configurableListableBeanFactory; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { return new CanalListenerEndpointRegistrar(bean, entry.getKey(), applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()), applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()), entry.getValue().table(), entry.getValue().eventType()); } } src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -2,13 +2,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.exception.CanalClientException; import com.duxinglangzi.canal.starter.annotation.CanalListener; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -21,7 +19,27 @@ public class CanalListenerEndpointRegistrar { private Object bean; private Map.Entry<Method, CanalListener> listenerEntry; private Method method; /** * 如果未进行配置,则使用配置文件里全部 destination */ private String destination; /** * 数据库名 */ private String database; /** * 数据表名 */ private String[] table; /** * 数据变动类型,此处请注意,默认不包含 DDL */ private CanalEntry.EventType[] eventType; /** * 1、目前实现的 DML 解析器仅支持两个参数 <p> @@ -39,20 +57,19 @@ || classes.get(0) != CanalEntry.EventType.class) throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check "); if (StringUtils.isNotBlank(getListenerEntry().getValue().destination()) && !sets.contains(getListenerEntry().getValue().destination())) if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) throw new CanalClientException("@CanalListener Illegal destination , please check "); } public List<Class<?>> parameterTypes() { return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList()); return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList()); } public boolean isContainDestination(String destination) { if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true; return getListenerEntry().getValue().destination().equals(destination); if (StringUtils.isBlank(getDestination())) return true; return getDestination().equals(destination); } /** @@ -66,27 +83,49 @@ */ public static Predicate<CanalListenerEndpointRegistrar> filterArgs( String database, String tableName, CanalEntry.EventType eventType) { Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database()) || e.getListenerEntry().getValue().database().equals(database); Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0 || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0])) || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName)); Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0 || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType); Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database); Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0 || (e.getTable().length == 1 && "".equals(e.getTable()[0])) || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName)); Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0 || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType); return databases.and(table).and(eventTypes); } public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { public CanalListenerEndpointRegistrar( Object bean, Method method, String destination, String database, String[] tables, CanalEntry.EventType[] eventTypes) { this.bean = bean; this.listenerEntry = entry; } public Map.Entry<Method, CanalListener> getListenerEntry() { return listenerEntry; this.method = method; this.destination = destination; this.database = database; this.table = tables; this.eventType = eventTypes; } public Object getBean() { return bean; } public Method getMethod() { return method; } public String getDestination() { return destination; } public String getDatabase() { return database; } public String[] getTable() { return table; } public CanalEntry.EventType[] getEventType() { return eventType; } } src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -29,8 +29,7 @@ public void initConnect() { try { // init supportAllTypes registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll( Arrays.asList(e.getListenerEntry().getValue().eventType()))); registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType()))); connector.connect(); connector.subscribe(endpointInstance.getSubscribe()); connector.rollback(); @@ -114,7 +113,7 @@ eventType)) .forEach(element -> { try { element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData); element.getMethod().invoke(element.getBean(), eventType, rowData); } catch (IllegalAccessException | InvocationTargetException e) { logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);