duxinglangzi
2022-09-06 ddae546da9487622631d47133c962e7d870982de
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.configuration;
D 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
c83c1b 5 import com.duxinglangzi.canal.starter.mode.CanalMessage;
de8c2b 6 import org.apache.commons.lang3.StringUtils;
D 7
8 import java.lang.reflect.Method;
9 import java.util.Arrays;
10 import java.util.List;
11 import java.util.Set;
12 import java.util.function.Predicate;
13 import java.util.stream.Collectors;
14
15 /**
c83c1b 16  * 监听的终端注册器
7cf978 17  *
de8c2b 18  * @author wuqiong 2022/4/11
D 19  */
20 public class CanalListenerEndpointRegistrar {
21
22     private Object bean;
a87aa7 23     private Method method;
D 24
25     /**
26      * 如果未进行配置,则使用配置文件里全部 destination
27      */
28     private String destination;
29
30     /**
31      * 数据库名
32      */
33     private String database;
34
35     /**
36      * 数据表名
37      */
38     private String[] table;
39
40     /**
41      * 数据变动类型,此处请注意,默认不包含 DDL
42      */
43     private CanalEntry.EventType[] eventType;
de8c2b 44
627420 45     /**
c83c1b 46      * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
D 47      * 2、方法参数必须为: CanalMessage  <p>
627420 48      * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
7cf978 49      *
627420 50      * @param sets
D 51      * @return void
52      * @author wuqiong 2022-04-23 20:27
53      */
de8c2b 54     public void checkParameter(Set<String> sets) {
D 55         List<Class<?>> classes = parameterTypes();
c83c1b 56         if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
de8c2b 57             throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
c83c1b 58                     "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
a87aa7 59         if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
cf3e03 60             throw new CanalClientException("@CanalListener Illegal destination  " + getDestination() + ", please check ");
de8c2b 61
D 62     }
63
64
65     public List<Class<?>> parameterTypes() {
a87aa7 66         return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
de8c2b 67     }
D 68
69     public boolean isContainDestination(String destination) {
a87aa7 70         if (StringUtils.isBlank(getDestination())) return true;
D 71         return getDestination().equals(destination);
de8c2b 72     }
D 73
627420 74     /**
D 75      * 过滤参数
7cf978 76      *
627420 77      * @param database
D 78      * @param tableName
79      * @param eventType
80      * @return java.util.function.Predicate
81      * @author wuqiong 2022-04-23 20:47
82      */
de8c2b 83     public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
D 84             String database, String tableName, CanalEntry.EventType eventType) {
a87aa7 85         Predicate<CanalListenerEndpointRegistrar> databases =
D 86                 e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
87         Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
88                 || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
89                 || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
90         Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
91                 || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
de8c2b 92         return databases.and(table).and(eventTypes);
D 93     }
94
95
a87aa7 96     public CanalListenerEndpointRegistrar(
D 97             Object bean, Method method, String destination,
98             String database, String[] tables, CanalEntry.EventType[] eventTypes) {
de8c2b 99         this.bean = bean;
a87aa7 100         this.method = method;
D 101         this.destination = destination;
102         this.database = database;
103         this.table = tables;
104         this.eventType = eventTypes;
de8c2b 105     }
D 106
107     public Object getBean() {
108         return bean;
109     }
a87aa7 110
D 111     public Method getMethod() {
112         return method;
113     }
114
115     public String getDestination() {
116         return destination;
117     }
118
119     public String getDatabase() {
120         return database;
121     }
122
123     public String[] getTable() {
124         return table;
125     }
126
127     public CanalEntry.EventType[] getEventType() {
128         return eventType;
129     }
de8c2b 130 }