duxinglangzi
2022-06-24 a87aa74a3af27960276ed02f4273386d25d2a231
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.configuration;
D 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
5 import org.apache.commons.lang3.StringUtils;
6
7 import java.lang.reflect.Method;
8 import java.util.Arrays;
9 import java.util.List;
10 import java.util.Set;
11 import java.util.function.Predicate;
12 import java.util.stream.Collectors;
13
14 /**
627420 15  * 登记员
7cf978 16  *
de8c2b 17  * @author wuqiong 2022/4/11
D 18  */
19 public class CanalListenerEndpointRegistrar {
20
21     private Object bean;
a87aa7 22     private Method method;
D 23
24     /**
25      * 如果未进行配置,则使用配置文件里全部 destination
26      */
27     private String destination;
28
29     /**
30      * 数据库名
31      */
32     private String database;
33
34     /**
35      * 数据表名
36      */
37     private String[] table;
38
39     /**
40      * 数据变动类型,此处请注意,默认不包含 DDL
41      */
42     private CanalEntry.EventType[] eventType;
de8c2b 43
627420 44     /**
D 45      * 1、目前实现的 DML 解析器仅支持两个参数 <p>
46      * 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData  <p>
47      * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
7cf978 48      *
627420 49      * @param sets
D 50      * @return void
51      * @author wuqiong 2022-04-23 20:27
52      */
de8c2b 53     public void checkParameter(Set<String> sets) {
D 54         List<Class<?>> classes = parameterTypes();
55         if (classes.size() > 2
56                 || classes.get(1) != CanalEntry.RowData.class
57                 || classes.get(0) != CanalEntry.EventType.class)
58             throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
59                     "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
a87aa7 60         if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
de8c2b 61             throw new CanalClientException("@CanalListener Illegal destination , please check ");
D 62
63     }
64
65
66     public List<Class<?>> parameterTypes() {
a87aa7 67         return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
de8c2b 68     }
D 69
70     public boolean isContainDestination(String destination) {
a87aa7 71         if (StringUtils.isBlank(getDestination())) return true;
D 72         return getDestination().equals(destination);
de8c2b 73     }
D 74
627420 75     /**
D 76      * 过滤参数
7cf978 77      *
627420 78      * @param database
D 79      * @param tableName
80      * @param eventType
81      * @return java.util.function.Predicate
82      * @author wuqiong 2022-04-23 20:47
83      */
de8c2b 84     public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
D 85             String database, String tableName, CanalEntry.EventType eventType) {
a87aa7 86         Predicate<CanalListenerEndpointRegistrar> databases =
D 87                 e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
88         Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
89                 || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
90                 || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
91         Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
92                 || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
de8c2b 93         return databases.and(table).and(eventTypes);
D 94     }
95
96
a87aa7 97     public CanalListenerEndpointRegistrar(
D 98             Object bean, Method method, String destination,
99             String database, String[] tables, CanalEntry.EventType[] eventTypes) {
de8c2b 100         this.bean = bean;
a87aa7 101         this.method = method;
D 102         this.destination = destination;
103         this.database = database;
104         this.table = tables;
105         this.eventType = eventTypes;
de8c2b 106     }
D 107
108     public Object getBean() {
109         return bean;
110     }
a87aa7 111
D 112     public Method getMethod() {
113         return method;
114     }
115
116     public String getDestination() {
117         return destination;
118     }
119
120     public String getDatabase() {
121         return database;
122     }
123
124     public String[] getTable() {
125         return table;
126     }
127
128     public CanalEntry.EventType[] getEventType() {
129         return eventType;
130     }
de8c2b 131 }