duxinglangzi
2022-04-22 de8c2b2a4654893dc2c80f1fe095c165485bee5f
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.configuration;
D 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
5 import com.duxinglangzi.canal.starter.annotation.CanalListener;
6 import org.apache.commons.lang3.StringUtils;
7
8 import java.lang.reflect.Method;
9 import java.util.Arrays;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Set;
13 import java.util.function.Predicate;
14 import java.util.stream.Collectors;
15
16 /**
17  * @author wuqiong 2022/4/11
18  * @description
19  */
20 public class CanalListenerEndpointRegistrar {
21
22     private Object bean;
23     private Map.Entry<Method, CanalListener> listenerEntry;
24
25     public void checkParameter(Set<String> sets) {
26         List<Class<?>> classes = parameterTypes();
27         if (classes.size() > 2
28                 || classes.get(1) != CanalEntry.RowData.class
29                 || classes.get(0) != CanalEntry.EventType.class)
30             throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
31                     "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
32         if (StringUtils.isNotBlank(getListenerEntry().getValue().destination())
33                 && !sets.contains(getListenerEntry().getValue().destination()))
34             throw new CanalClientException("@CanalListener Illegal destination , please check ");
35
36     }
37
38
39     public List<Class<?>> parameterTypes() {
40         return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList());
41     }
42
43     public boolean isContainDestination(String destination) {
44         if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true;
45         return getListenerEntry().getValue().destination().equals(destination);
46     }
47
48     public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
49             String database, String tableName, CanalEntry.EventType eventType) {
50         Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
51                 || e.getListenerEntry().getValue().database().equals(database);
52         Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0
53                 || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0]))
54                 || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName));
55         Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0
56                 || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType);
57         return databases.and(table).and(eventTypes);
58     }
59
60
61     public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
62         this.bean = bean;
63         this.listenerEntry = entry;
64     }
65
66     public Map.Entry<Method, CanalListener> getListenerEntry() {
67         return listenerEntry;
68     }
69
70     public Object getBean() {
71         return bean;
72     }
73 }