From a87aa74a3af27960276ed02f4273386d25d2a231 Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期五, 24 六月 2022 15:41:11 +0800 Subject: [PATCH] 支持动态参数配置 --- src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java | 81 ++++++++++++++++++++++++++++++---------- 1 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java index aa2cc0b..c9e0c1d 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java +++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java @@ -2,13 +2,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.duxinglangzi.canal.starter.annotation.CanalListener; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -21,7 +19,27 @@ public class CanalListenerEndpointRegistrar { private Object bean; - private Map.Entry<Method, CanalListener> listenerEntry; + private Method method; + + /** + * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination + */ + private String destination; + + /** + * 鏁版嵁搴撳悕 + */ + private String database; + + /** + * 鏁版嵁琛ㄥ悕 + */ + private String[] table; + + /** + * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL + */ + private CanalEntry.EventType[] eventType; /** * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔涓や釜鍙傛暟 <p> @@ -39,20 +57,19 @@ || classes.get(0) != CanalEntry.EventType.class) throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check "); - if (StringUtils.isNotBlank(getListenerEntry().getValue().destination()) - && !sets.contains(getListenerEntry().getValue().destination())) + if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) throw new CanalClientException("@CanalListener Illegal destination , please check "); } public List<Class<?>> parameterTypes() { - return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList()); + return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList()); } public boolean isContainDestination(String destination) { - if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true; - return getListenerEntry().getValue().destination().equals(destination); + if (StringUtils.isBlank(getDestination())) return true; + return getDestination().equals(destination); } /** @@ -66,27 +83,49 @@ */ public static Predicate<CanalListenerEndpointRegistrar> filterArgs( String database, String tableName, CanalEntry.EventType eventType) { - Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database()) - || e.getListenerEntry().getValue().database().equals(database); - Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0 - || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0])) - || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName)); - Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0 - || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType); + Predicate<CanalListenerEndpointRegistrar> databases = + e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database); + Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0 + || (e.getTable().length == 1 && "".equals(e.getTable()[0])) + || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName)); + Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0 + || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType); return databases.and(table).and(eventTypes); } - public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { + public CanalListenerEndpointRegistrar( + Object bean, Method method, String destination, + String database, String[] tables, CanalEntry.EventType[] eventTypes) { this.bean = bean; - this.listenerEntry = entry; - } - - public Map.Entry<Method, CanalListener> getListenerEntry() { - return listenerEntry; + this.method = method; + this.destination = destination; + this.database = database; + this.table = tables; + this.eventType = eventTypes; } public Object getBean() { return bean; } + + public Method getMethod() { + return method; + } + + public String getDestination() { + return destination; + } + + public String getDatabase() { + return database; + } + + public String[] getTable() { + return table; + } + + public CanalEntry.EventType[] getEventType() { + return eventType; + } } -- Gitblit v1.8.0