renxue
2022-10-24 0b055a3f554da3a934e79e88c4781705cbab5a21
src/main/java/com/hz/canal/starter/configuration/CanalListenerEndpointRegistrar.java
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -1,130 +1,130 @@
package com.duxinglangzi.canal.starter.configuration;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.duxinglangzi.canal.starter.mode.CanalMessage;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
 * 监听的终端注册器
 *
 * @author wuqiong 2022/4/11
 */
public class CanalListenerEndpointRegistrar {
    private Object bean;
    private Method method;
    /**
     * 如果未进行配置,则使用配置文件里全部 destination
     */
    private String destination;
    /**
     * 数据库名
     */
    private String database;
    /**
     * 数据表名
     */
    private String[] table;
    /**
     * 数据变动类型,此处请注意,默认不包含 DDL
     */
    private CanalEntry.EventType[] eventType;
    /**
     * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
     * 2、方法参数必须为: CanalMessage  <p>
     * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
     *
     * @param sets
     * @return void
     * @author wuqiong 2022-04-23 20:27
     */
    public void checkParameter(Set<String> sets) {
        List<Class<?>> classes = parameterTypes();
        if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
            throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                    "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
        if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
            throw new CanalClientException("@CanalListener Illegal destination  " + getDestination() + ", please check ");
    }
    public List<Class<?>> parameterTypes() {
        return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
    }
    public boolean isContainDestination(String destination) {
        if (StringUtils.isBlank(getDestination())) return true;
        return getDestination().equals(destination);
    }
    /**
     * 过滤参数
     *
     * @param database
     * @param tableName
     * @param eventType
     * @return java.util.function.Predicate
     * @author wuqiong 2022-04-23 20:47
     */
    public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
            String database, String tableName, CanalEntry.EventType eventType) {
        Predicate<CanalListenerEndpointRegistrar> databases =
                e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
                || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
                || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
                || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
        return databases.and(table).and(eventTypes);
    }
    public CanalListenerEndpointRegistrar(
            Object bean, Method method, String destination,
            String database, String[] tables, CanalEntry.EventType[] eventTypes) {
        this.bean = bean;
        this.method = method;
        this.destination = destination;
        this.database = database;
        this.table = tables;
        this.eventType = eventTypes;
    }
    public Object getBean() {
        return bean;
    }
    public Method getMethod() {
        return method;
    }
    public String getDestination() {
        return destination;
    }
    public String getDatabase() {
        return database;
    }
    public String[] getTable() {
        return table;
    }
    public CanalEntry.EventType[] getEventType() {
        return eventType;
    }
}
package com.hz.canal.starter.configuration;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import com.hz.canal.starter.mode.CanalMessage;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
 * 监听的终端注册器
 *
 * @author wuqiong 2022/4/11
 */
public class CanalListenerEndpointRegistrar {
    private Object bean;
    private Method method;
    /**
     * 如果未进行配置,则使用配置文件里全部 destination
     */
    private String destination;
    /**
     * 数据库名
     */
    private String database;
    /**
     * 数据表名
     */
    private String[] table;
    /**
     * 数据变动类型,此处请注意,默认不包含 DDL
     */
    private CanalEntry.EventType[] eventType;
    /**
     * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
     * 2、方法参数必须为: CanalMessage  <p>
     * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
     *
     * @param sets
     * @return void
     * @author wuqiong 2022-04-23 20:27
     */
    public void checkParameter(Set<String> sets) {
        List<Class<?>> classes = parameterTypes();
        if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
            throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
                    "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
        if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
            throw new CanalClientException("@CanalListener Illegal destination  " + getDestination() + ", please check ");
    }
    public List<Class<?>> parameterTypes() {
        return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
    }
    public boolean isContainDestination(String destination) {
        if (StringUtils.isBlank(getDestination())) return true;
        return getDestination().equals(destination);
    }
    /**
     * 过滤参数
     *
     * @param database
     * @param tableName
     * @param eventType
     * @return java.util.function.Predicate
     * @author wuqiong 2022-04-23 20:47
     */
    public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
            String database, String tableName, CanalEntry.EventType eventType) {
        Predicate<CanalListenerEndpointRegistrar> databases =
                e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
        Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
                || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
                || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
        Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
                || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
        return databases.and(table).and(eventTypes);
    }
    public CanalListenerEndpointRegistrar(
            Object bean, Method method, String destination,
            String database, String[] tables, CanalEntry.EventType[] eventTypes) {
        this.bean = bean;
        this.method = method;
        this.destination = destination;
        this.database = database;
        this.table = tables;
        this.eventType = eventTypes;
    }
    public Object getBean() {
        return bean;
    }
    public Method getMethod() {
        return method;
    }
    public String getDestination() {
        return destination;
    }
    public String getDatabase() {
        return database;
    }
    public String[] getTable() {
        return table;
    }
    public CanalEntry.EventType[] getEventType() {
        return eventType;
    }
}