From a87aa74a3af27960276ed02f4273386d25d2a231 Mon Sep 17 00:00:00 2001
From: duxinglangzi <871364441@qq.com>
Date: 星期五, 24 六月 2022 15:41:11 +0800
Subject: [PATCH] 支持动态参数配置

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java               |    5 +-
 src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java |   17 +++++++-
 src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java           |   81 ++++++++++++++++++++++++++++++----------
 README.md                                                                                                |   10 +++++
 4 files changed, 87 insertions(+), 26 deletions(-)

diff --git a/README.md b/README.md
index e7eeab9..2d099e6 100644
--- a/README.md
+++ b/README.md
@@ -41,6 +41,16 @@
      * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalEntry.EventType , CanalEntry.RowData 
      * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌�
      */
+    
+    /**
+     * 鐩戞帶鏇存柊鎿嶄綔
+     * 鏀寔鍔ㄦ�佸弬鏁伴厤缃紝閰嶇疆椤归渶鍦� yml 鎴� properties 杩涜閰嶇疆
+     * 鐩爣鏄� ${prod.example} 鐨�  ${prod.database} 搴�  users琛�
+     */
+    @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
+    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
+        printChange("listenerExampleBooksUsers",eventType, rowData);
+    }
 
     /**
      * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨�  books搴�  users琛�
diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
index 02d401e..64915e4 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java
@@ -11,6 +11,8 @@
 import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
 import org.springframework.beans.factory.config.BeanPostProcessor;
 import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
 import org.springframework.core.MethodIntrospector;
 import org.springframework.core.annotation.AnnotatedElementUtils;
 
@@ -25,7 +27,7 @@
  * @author wuqiong 2022/4/11
  */
 public class CanalListenerAnnotationBeanPostProcessor implements
-        BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor {
+        BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware {
 
     private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class);
 
@@ -33,6 +35,7 @@
     private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private ConfigurableListableBeanFactory configurableListableBeanFactory;
     private CanalAutoConfigurationProperties canalAutoConfigurationProperties;
+    private ApplicationContext applicationContext;
 
     @Override
     public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
@@ -47,7 +50,7 @@
             // 鍏堝姞鍏ュ埌寰呮敞鍐岄噷闈�
             annotatedMethods.entrySet().stream()
                     .filter(e -> e != null)
-                    .forEach(ele -> registrars.add(new CanalListenerEndpointRegistrar(bean, ele)));
+                    .forEach(ele -> registrars.add(createRegistrar(bean, ele)));
             logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName,
                     annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet()));
         }
@@ -71,5 +74,15 @@
         this.configurableListableBeanFactory = configurableListableBeanFactory;
     }
 
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
 
+    private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
+        return new CanalListenerEndpointRegistrar(bean, entry.getKey(),
+                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()),
+                applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()),
+                entry.getValue().table(), entry.getValue().eventType());
+    }
 }
diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
index aa2cc0b..c9e0c1d 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -2,13 +2,11 @@
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
-import com.duxinglangzi.canal.starter.annotation.CanalListener;
 import org.apache.commons.lang3.StringUtils;
 
 import java.lang.reflect.Method;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -21,7 +19,27 @@
 public class CanalListenerEndpointRegistrar {
 
     private Object bean;
-    private Map.Entry<Method, CanalListener> listenerEntry;
+    private Method method;
+
+    /**
+     * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination
+     */
+    private String destination;
+
+    /**
+     * 鏁版嵁搴撳悕
+     */
+    private String database;
+
+    /**
+     * 鏁版嵁琛ㄥ悕
+     */
+    private String[] table;
+
+    /**
+     * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL
+     */
+    private CanalEntry.EventType[] eventType;
 
     /**
      * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔涓や釜鍙傛暟 <p>
@@ -39,20 +57,19 @@
                 || classes.get(0) != CanalEntry.EventType.class)
             throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                     "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
-        if (StringUtils.isNotBlank(getListenerEntry().getValue().destination())
-                && !sets.contains(getListenerEntry().getValue().destination()))
+        if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
             throw new CanalClientException("@CanalListener Illegal destination , please check ");
 
     }
 
 
     public List<Class<?>> parameterTypes() {
-        return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList());
+        return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
     }
 
     public boolean isContainDestination(String destination) {
-        if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true;
-        return getListenerEntry().getValue().destination().equals(destination);
+        if (StringUtils.isBlank(getDestination())) return true;
+        return getDestination().equals(destination);
     }
 
     /**
@@ -66,27 +83,49 @@
      */
     public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
             String database, String tableName, CanalEntry.EventType eventType) {
-        Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
-                || e.getListenerEntry().getValue().database().equals(database);
-        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0
-                || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0]))
-                || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName));
-        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0
-                || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType);
+        Predicate<CanalListenerEndpointRegistrar> databases =
+                e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
+        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
+                || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
+                || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
+        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
+                || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
         return databases.and(table).and(eventTypes);
     }
 
 
-    public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
+    public CanalListenerEndpointRegistrar(
+            Object bean, Method method, String destination,
+            String database, String[] tables, CanalEntry.EventType[] eventTypes) {
         this.bean = bean;
-        this.listenerEntry = entry;
-    }
-
-    public Map.Entry<Method, CanalListener> getListenerEntry() {
-        return listenerEntry;
+        this.method = method;
+        this.destination = destination;
+        this.database = database;
+        this.table = tables;
+        this.eventType = eventTypes;
     }
 
     public Object getBean() {
         return bean;
     }
+
+    public Method getMethod() {
+        return method;
+    }
+
+    public String getDestination() {
+        return destination;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public String[] getTable() {
+        return table;
+    }
+
+    public CanalEntry.EventType[] getEventType() {
+        return eventType;
+    }
 }
diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index 4214713..0374716 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -29,8 +29,7 @@
     public void initConnect() {
         try {
             // init supportAllTypes
-            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
-                    Arrays.asList(e.getListenerEntry().getValue().eventType())));
+            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
             connector.connect();
             connector.subscribe(endpointInstance.getSubscribe());
             connector.rollback();
@@ -114,7 +113,7 @@
                             eventType))
                     .forEach(element -> {
                         try {
-                            element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
+                            element.getMethod().invoke(element.getBean(), eventType, rowData);
                         } catch (IllegalAccessException | InvocationTargetException e) {
                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                             throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e);

--
Gitblit v1.8.0