duxinglangzi
2022-05-16 7cf9781edf66c75570af378a96b4011ff63ada92
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.configuration;
D 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
5 import com.duxinglangzi.canal.starter.annotation.CanalListener;
6 import org.apache.commons.lang3.StringUtils;
7
8 import java.lang.reflect.Method;
9 import java.util.Arrays;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Set;
13 import java.util.function.Predicate;
14 import java.util.stream.Collectors;
15
16 /**
627420 17  * 登记员
7cf978 18  *
de8c2b 19  * @author wuqiong 2022/4/11
D 20  */
21 public class CanalListenerEndpointRegistrar {
22
23     private Object bean;
24     private Map.Entry<Method, CanalListener> listenerEntry;
25
627420 26     /**
D 27      * 1、目前实现的 DML 解析器仅支持两个参数 <p>
28      * 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData  <p>
29      * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
7cf978 30      *
627420 31      * @param sets
D 32      * @return void
33      * @author wuqiong 2022-04-23 20:27
34      */
de8c2b 35     public void checkParameter(Set<String> sets) {
D 36         List<Class<?>> classes = parameterTypes();
37         if (classes.size() > 2
38                 || classes.get(1) != CanalEntry.RowData.class
39                 || classes.get(0) != CanalEntry.EventType.class)
40             throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
41                     "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
42         if (StringUtils.isNotBlank(getListenerEntry().getValue().destination())
43                 && !sets.contains(getListenerEntry().getValue().destination()))
44             throw new CanalClientException("@CanalListener Illegal destination , please check ");
45
46     }
47
48
49     public List<Class<?>> parameterTypes() {
50         return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList());
51     }
52
53     public boolean isContainDestination(String destination) {
54         if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true;
55         return getListenerEntry().getValue().destination().equals(destination);
56     }
57
627420 58     /**
D 59      * 过滤参数
7cf978 60      *
627420 61      * @param database
D 62      * @param tableName
63      * @param eventType
64      * @return java.util.function.Predicate
65      * @author wuqiong 2022-04-23 20:47
66      */
de8c2b 67     public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
D 68             String database, String tableName, CanalEntry.EventType eventType) {
69         Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
70                 || e.getListenerEntry().getValue().database().equals(database);
71         Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0
72                 || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0]))
73                 || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName));
74         Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0
75                 || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType);
76         return databases.and(table).and(eventTypes);
77     }
78
79
80     public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
81         this.bean = bean;
82         this.listenerEntry = entry;
83     }
84
85     public Map.Entry<Method, CanalListener> getListenerEntry() {
86         return listenerEntry;
87     }
88
89     public Object getBean() {
90         return bean;
91     }
92 }