duxinglangzi
2022-04-23 6274208525b7e80c208f614915ef973d63834101
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package com.duxinglangzi.canal.starter.configuration;
 
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import org.apache.commons.lang3.StringUtils;
 
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
 
/**
 * 登记员
 * @author wuqiong 2022/4/11
 */
public class CanalListenerEndpointRegistrar {
 
    private Object bean;
    private Map.Entry<Method, CanalListener> listenerEntry;
 
    /**
     * 1、目前实现的 DML 解析器仅支持两个参数 <p>
     * 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData  <p>
     * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
     * @param sets
     * @return void
     * @author wuqiong 2022-04-23 20:27
     */
    public void checkParameter(Set<String> sets) {
        List<Class<?>> classes = parameterTypes();
        if (classes.size() > 2
                || classes.get(1) != CanalEntry.RowData.class
                || classes.get(0) != CanalEntry.EventType.class)
            throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                    "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
        if (StringUtils.isNotBlank(getListenerEntry().getValue().destination())
                && !sets.contains(getListenerEntry().getValue().destination()))
            throw new CanalClientException("@CanalListener Illegal destination , please check ");
 
    }
 
 
    public List<Class<?>> parameterTypes() {
        return Arrays.stream(getListenerEntry().getKey().getParameterTypes()).collect(Collectors.toList());
    }
 
    public boolean isContainDestination(String destination) {
        if (StringUtils.isBlank(getListenerEntry().getValue().destination())) return true;
        return getListenerEntry().getValue().destination().equals(destination);
    }
 
    /**
     * 过滤参数
     * @param database
     * @param tableName
     * @param eventType
     * @return java.util.function.Predicate
     * @author wuqiong 2022-04-23 20:47
     */
    public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
            String database, String tableName, CanalEntry.EventType eventType) {
        Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
                || e.getListenerEntry().getValue().database().equals(database);
        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getListenerEntry().getValue().table().length == 0
                || (e.getListenerEntry().getValue().table().length == 1 && "".equals(e.getListenerEntry().getValue().table()[0]))
                || Arrays.stream(e.getListenerEntry().getValue().table()).anyMatch(s -> s.equals(tableName));
        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getListenerEntry().getValue().eventType().length == 0
                || Arrays.stream(e.getListenerEntry().getValue().eventType()).anyMatch(eve -> eve == eventType);
        return databases.and(table).and(eventTypes);
    }
 
 
    public CanalListenerEndpointRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
        this.bean = bean;
        this.listenerEntry = entry;
    }
 
    public Map.Entry<Method, CanalListener> getListenerEntry() {
        return listenerEntry;
    }
 
    public Object getBean() {
        return bean;
    }
}