renxue
2022-10-24 0b055a3f554da3a934e79e88c4781705cbab5a21
提交 | 用户 | age
0b055a 1 package com.hz.canal.starter.configuration;
R 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
5 import com.hz.canal.starter.mode.CanalMessage;
6 import org.apache.commons.lang3.StringUtils;
7
8 import java.lang.reflect.Method;
9 import java.util.Arrays;
10 import java.util.List;
11 import java.util.Set;
12 import java.util.function.Predicate;
13 import java.util.stream.Collectors;
14
15 /**
16  * 监听的终端注册器
17  *
18  * @author wuqiong 2022/4/11
19  */
20 public class CanalListenerEndpointRegistrar {
21
22     private Object bean;
23     private Method method;
24
25     /**
26      * 如果未进行配置,则使用配置文件里全部 destination
27      */
28     private String destination;
29
30     /**
31      * 数据库名
32      */
33     private String database;
34
35     /**
36      * 数据表名
37      */
38     private String[] table;
39
40     /**
41      * 数据变动类型,此处请注意,默认不包含 DDL
42      */
43     private CanalEntry.EventType[] eventType;
44
45     /**
46      * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
47      * 2、方法参数必须为: CanalMessage  <p>
48      * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
49      *
50      * @param sets
51      * @return void
52      * @author wuqiong 2022-04-23 20:27
53      */
54     public void checkParameter(Set<String> sets) {
55         List<Class<?>> classes = parameterTypes();
56         if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
57             throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
58                     "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
59         if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
60             throw new CanalClientException("@CanalListener Illegal destination  " + getDestination() + ", please check ");
61
62     }
63
64
65     public List<Class<?>> parameterTypes() {
66         return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
67     }
68
69     public boolean isContainDestination(String destination) {
70         if (StringUtils.isBlank(getDestination())) return true;
71         return getDestination().equals(destination);
72     }
73
74     /**
75      * 过滤参数
76      *
77      * @param database
78      * @param tableName
79      * @param eventType
80      * @return java.util.function.Predicate
81      * @author wuqiong 2022-04-23 20:47
82      */
83     public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
84             String database, String tableName, CanalEntry.EventType eventType) {
85         Predicate<CanalListenerEndpointRegistrar> databases =
86                 e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
87         Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
88                 || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
89                 || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
90         Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
91                 || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
92         return databases.and(table).and(eventTypes);
93     }
94
95
96     public CanalListenerEndpointRegistrar(
97             Object bean, Method method, String destination,
98             String database, String[] tables, CanalEntry.EventType[] eventTypes) {
99         this.bean = bean;
100         this.method = method;
101         this.destination = destination;
102         this.database = database;
103         this.table = tables;
104         this.eventType = eventTypes;
105     }
106
107     public Object getBean() {
108         return bean;
109     }
110
111     public Method getMethod() {
112         return method;
113     }
114
115     public String getDestination() {
116         return destination;
117     }
118
119     public String getDatabase() {
120         return database;
121     }
122
123     public String[] getTable() {
124         return table;
125     }
126
127     public CanalEntry.EventType[] getEventType() {
128         return eventType;
129     }
130 }