From ed3a1614384279b7d3a97e7411b649476a934ddb Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期二, 06 九月 2022 15:14:48 +0800 Subject: [PATCH] add LICENSE. --- src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java | 99 +++++++++++++++++++++++++++++++++++-------------- 1 files changed, 70 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java index e1a4d8a..6dd0e43 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java +++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java @@ -2,59 +2,78 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.duxinglangzi.canal.starter.annotation.CanalListener; +import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; /** - * 鐧昏鍛� + * 鐩戝惉鐨勭粓绔敞鍐屽櫒 + * * @author wuqiong 2022/4/11 */ public class CanalListenerEndpointRegistrar { private Object bean; - private Map.Entry<Method, CanalListener> listenerEntry; + private Method method; /** - * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔涓や釜鍙傛暟 <p> - * 2銆佷笖椤哄簭蹇呴』涓�: CanalEntry.EventType 銆� CanalEntry.RowData <p> + * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination + */ + private String destination; + + /** + * 鏁版嵁搴撳悕 + */ + private String database; + + /** + * 鏁版嵁琛ㄥ悕 + */ + private String[] table; + + /** + * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL + */ + private CanalEntry.EventType[] eventType; + + /** + * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔1涓弬鏁�, 璇ュ弬鏁板璞″唴鍖呭惈浜�: 搴撳悕銆佽〃鍚嶃�佷簨浠剁被鍨嬨�佸彉鏇寸殑鏁版嵁 <p> + * 2銆佹柟娉曞弬鏁板繀椤讳负: CanalMessage <p> * 3銆佸鏋淐analListener 鎸囧畾鐨� destination 涓嶅湪閰嶇疆鏂囦欢鍐咃紝鍒欑洿鎺ユ姏閿� <p> + * * @param sets * @return void * @author wuqiong 2022-04-23 20:27 */ public void checkParameter(Set<String> sets) { List<Class<?>> classes = parameterTypes(); - if (classes.size() > 2 - || classes.get(1) != CanalEntry.RowData.class - || classes.get(0) != CanalEntry.EventType.class) + if (classes.size() != 1 || classes.get(0) != CanalMessage.class) throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + - "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check "); - if (StringUtils.isNotBlank(getListenerEntry().getValue().destination()) - && !sets.contains(getListenerEntry().getValue().destination())) - throw new CanalClientException("@CanalListener Illegal destination , please check "); + "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check "); + if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) + throw new CanalClientException("@CanalListener Illegal destination " + getDestination() + ", please check "); } public List<Class<?>> parameterTypes() { - return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList()); + return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList()); } public boolean isContainDestination(String destination) { - if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true; - return getListenerEntry().getValue().destination().equals(destination); + if (StringUtils.isBlank(getDestination())) return true; + return getDestination().equals(destination); } /** * 杩囨护鍙傛暟 + * * @param database * @param tableName * @param eventType @@ -63,27 +82,49 @@ */ public static Predicate<CanalListenerEndpointRegistrar> filterArgs( String database, String tableName, CanalEntry.EventType eventType) { - Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database()) - || e.getListenerEntry().getValue().database().equals(database); - Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0 - || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0])) - || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName)); - Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0 - || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType); + Predicate<CanalListenerEndpointRegistrar> databases = + e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database); + Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0 + || (e.getTable().length == 1 && "".equals(e.getTable()[0])) + || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName)); + Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0 + || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType); return databases.and(table).and(eventTypes); } - public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { + public CanalListenerEndpointRegistrar( + Object bean, Method method, String destination, + String database, String[] tables, CanalEntry.EventType[] eventTypes) { this.bean = bean; - this.listenerEntry = entry; - } - - public Map.Entry<Method, CanalListener> getListenerEntry() { - return listenerEntry; + this.method = method; + this.destination = destination; + this.database = database; + this.table = tables; + this.eventType = eventTypes; } public Object getBean() { return bean; } + + public Method getMethod() { + return method; + } + + public String getDestination() { + return destination; + } + + public String getDatabase() { + return database; + } + + public String[] getTable() { + return table; + } + + public CanalEntry.EventType[] getEventType() { + return eventType; + } } -- Gitblit v1.8.0