提交 | 用户 | age
|
de8c2b
|
1 |
package com.duxinglangzi.canal.starter.configuration;
|
D |
2 |
|
|
3 |
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
4 |
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
|
5 |
import com.duxinglangzi.canal.starter.annotation.CanalListener;
|
|
6 |
import org.apache.commons.lang3.StringUtils;
|
|
7 |
|
|
8 |
import java.lang.reflect.Method;
|
|
9 |
import java.util.Arrays;
|
|
10 |
import java.util.List;
|
|
11 |
import java.util.Map;
|
|
12 |
import java.util.Set;
|
|
13 |
import java.util.function.Predicate;
|
|
14 |
import java.util.stream.Collectors;
|
|
15 |
|
|
16 |
/**
|
627420
|
17 |
* 登记员
|
7cf978
|
18 |
*
|
de8c2b
|
19 |
* @author wuqiong 2022/4/11
|
D |
20 |
*/
|
|
21 |
public class CanalListenerEndpointRegistrar {
|
|
22 |
|
|
23 |
private Object bean;
|
|
24 |
private Map.Entry<Method, CanalListener> listenerEntry;
|
|
25 |
|
627420
|
26 |
/**
|
D |
27 |
* 1、目前实现的 DML 解析器仅支持两个参数 <p>
|
|
28 |
* 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData <p>
|
|
29 |
* 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
|
7cf978
|
30 |
*
|
627420
|
31 |
* @param sets
|
D |
32 |
* @return void
|
|
33 |
* @author wuqiong 2022-04-23 20:27
|
|
34 |
*/
|
de8c2b
|
35 |
public void checkParameter(Set<String> sets) {
|
D |
36 |
List<Class<?>> classes = parameterTypes();
|
|
37 |
if (classes.size() > 2
|
|
38 |
|| classes.get(1) != CanalEntry.RowData.class
|
|
39 |
|| classes.get(0) != CanalEntry.EventType.class)
|
|
40 |
throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
|
|
41 |
"Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
|
|
42 |
if (StringUtils.isNotBlank(getListenerEntry().getValue().destination())
|
|
43 |
&& !sets.contains(getListenerEntry().getValue().destination()))
|
|
44 |
throw new CanalClientException("@CanalListener Illegal destination , please check ");
|
|
45 |
|
|
46 |
}
|
|
47 |
|
|
48 |
|
|
49 |
public List<Class<?>> parameterTypes() {
|
|
50 |
return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList());
|
|
51 |
}
|
|
52 |
|
|
53 |
public boolean isContainDestination(String destination) {
|
|
54 |
if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true;
|
|
55 |
return getListenerEntry().getValue().destination().equals(destination);
|
|
56 |
}
|
|
57 |
|
627420
|
58 |
/**
|
D |
59 |
* 过滤参数
|
7cf978
|
60 |
*
|
627420
|
61 |
* @param database
|
D |
62 |
* @param tableName
|
|
63 |
* @param eventType
|
|
64 |
* @return java.util.function.Predicate
|
|
65 |
* @author wuqiong 2022-04-23 20:47
|
|
66 |
*/
|
de8c2b
|
67 |
public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
|
D |
68 |
String database, String tableName, CanalEntry.EventType eventType) {
|
|
69 |
Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
|
|
70 |
|| e.getListenerEntry().getValue().database().equals(database);
|
|
71 |
Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0
|
|
72 |
|| (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0]))
|
|
73 |
|| Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName));
|
|
74 |
Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0
|
|
75 |
|| Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType);
|
|
76 |
return databases.and(table).and(eventTypes);
|
|
77 |
}
|
|
78 |
|
|
79 |
|
|
80 |
public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
|
|
81 |
this.bean = bean;
|
|
82 |
this.listenerEntry = entry;
|
|
83 |
}
|
|
84 |
|
|
85 |
public Map.Entry<Method, CanalListener> getListenerEntry() {
|
|
86 |
return listenerEntry;
|
|
87 |
}
|
|
88 |
|
|
89 |
public Object getBean() {
|
|
90 |
return bean;
|
|
91 |
}
|
|
92 |
}
|