From a87aa74a3af27960276ed02f4273386d25d2a231 Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期五, 24 六月 2022 15:41:11 +0800 Subject: [PATCH] 支持动态参数配置 --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 5 +- src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java | 17 +++++++- src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java | 81 ++++++++++++++++++++++++++++++---------- README.md | 10 +++++ 4 files changed, 87 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index e7eeab9..2d099e6 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,16 @@ * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalEntry.EventType , CanalEntry.RowData * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌� */ + + /** + * 鐩戞帶鏇存柊鎿嶄綔 + * 鏀寔鍔ㄦ�佸弬鏁伴厤缃紝閰嶇疆椤归渶鍦� yml 鎴� properties 杩涜閰嶇疆 + * 鐩爣鏄� ${prod.example} 鐨� ${prod.database} 搴� users琛� + */ + @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"}) + public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + printChange("listenerExampleBooksUsers",eventType, rowData); + } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� books搴� users琛� diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java index 02d401e..64915e4 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java +++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java @@ -11,6 +11,8 @@ import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; import org.springframework.core.MethodIntrospector; import org.springframework.core.annotation.AnnotatedElementUtils; @@ -25,7 +27,7 @@ * @author wuqiong 2022/4/11 */ public class CanalListenerAnnotationBeanPostProcessor implements - BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor { + BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware { private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class); @@ -33,6 +35,7 @@ private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>()); private ConfigurableListableBeanFactory configurableListableBeanFactory; private CanalAutoConfigurationProperties canalAutoConfigurationProperties; + private ApplicationContext applicationContext; @Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { @@ -47,7 +50,7 @@ // 鍏堝姞鍏ュ埌寰呮敞鍐岄噷闈� annotatedMethods.entrySet().stream() .filter(e -> e != null) - .forEach(ele -> registrars.add(new CanalListenerEndpointRegistrar(bean, ele))); + .forEach(ele -> registrars.add(createRegistrar(bean, ele))); logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName, annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet())); } @@ -71,5 +74,15 @@ this.configurableListableBeanFactory = configurableListableBeanFactory; } + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { + return new CanalListenerEndpointRegistrar(bean, entry.getKey(), + applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()), + applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()), + entry.getValue().table(), entry.getValue().eventType()); + } } diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java index aa2cc0b..c9e0c1d 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java +++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java @@ -2,13 +2,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.duxinglangzi.canal.starter.annotation.CanalListener; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -21,7 +19,27 @@ public class CanalListenerEndpointRegistrar { private Object bean; - private Map.Entry<Method, CanalListener> listenerEntry; + private Method method; + + /** + * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination + */ + private String destination; + + /** + * 鏁版嵁搴撳悕 + */ + private String database; + + /** + * 鏁版嵁琛ㄥ悕 + */ + private String[] table; + + /** + * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL + */ + private CanalEntry.EventType[] eventType; /** * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔涓や釜鍙傛暟 <p> @@ -39,20 +57,19 @@ || classes.get(0) != CanalEntry.EventType.class) throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check "); - if (StringUtils.isNotBlank(getListenerEntry().getValue().destination()) - && !sets.contains(getListenerEntry().getValue().destination())) + if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) throw new CanalClientException("@CanalListener Illegal destination , please check "); } public List<Class<?>> parameterTypes() { - return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList()); + return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList()); } public boolean isContainDestination(String destination) { - if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true; - return getListenerEntry().getValue().destination().equals(destination); + if (StringUtils.isBlank(getDestination())) return true; + return getDestination().equals(destination); } /** @@ -66,27 +83,49 @@ */ public static Predicate<CanalListenerEndpointRegistrar> filterArgs( String database, String tableName, CanalEntry.EventType eventType) { - Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database()) - || e.getListenerEntry().getValue().database().equals(database); - Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0 - || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0])) - || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName)); - Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0 - || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType); + Predicate<CanalListenerEndpointRegistrar> databases = + e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database); + Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0 + || (e.getTable().length == 1 && "".equals(e.getTable()[0])) + || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName)); + Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0 + || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType); return databases.and(table).and(eventTypes); } - public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { + public CanalListenerEndpointRegistrar( + Object bean, Method method, String destination, + String database, String[] tables, CanalEntry.EventType[] eventTypes) { this.bean = bean; - this.listenerEntry = entry; - } - - public Map.Entry<Method, CanalListener> getListenerEntry() { - return listenerEntry; + this.method = method; + this.destination = destination; + this.database = database; + this.table = tables; + this.eventType = eventTypes; } public Object getBean() { return bean; } + + public Method getMethod() { + return method; + } + + public String getDestination() { + return destination; + } + + public String getDatabase() { + return database; + } + + public String[] getTable() { + return table; + } + + public CanalEntry.EventType[] getEventType() { + return eventType; + } } diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index 4214713..0374716 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -29,8 +29,7 @@ public void initConnect() { try { // init supportAllTypes - registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll( - Arrays.asList(e.getListenerEntry().getValue().eventType()))); + registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType()))); connector.connect(); connector.subscribe(endpointInstance.getSubscribe()); connector.rollback(); @@ -114,7 +113,7 @@ eventType)) .forEach(element -> { try { - element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData); + element.getMethod().invoke(element.getBean(), eventType, rowData); } catch (IllegalAccessException | InvocationTargetException e) { logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); -- Gitblit v1.8.0