| | |
| | | public class CanalListenerTest {
|
| | |
|
| | | /**
|
| | | * 目前 Listener 方法的参数必须为 CanalEntry.EventType eventType, CanalEntry.RowData rowData
|
| | | * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData |
| | | * 程序在启动过程中会做检查
|
| | | */
|
| | |
|
| | |
| | | import java.util.Map;
|
| | |
|
| | | /**
|
| | | * Canal连接的配置类
|
| | | * @author wuqiong 2022/4/11
|
| | | * @description
|
| | | */
|
| | | @Order(Ordered.HIGHEST_PRECEDENCE)
|
| | | @ConfigurationProperties(prefix = "spring.canal")
|
| | |
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/12
|
| | | * @description
|
| | | */
|
| | | public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
|
| | |
|
| | | public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME =
|
| | | "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor";
|
| | |
|
| | | /**
|
| | | * 注册 CanalListenerAnnotationBeanPostProcessor 到spring bean 容器内
|
| | | * @param importingClassMetadata
|
| | | * @param registry
|
| | | * @return void
|
| | | * @author wuqiong 2022-04-23 20:21
|
| | | */
|
| | | @Override
|
| | | public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
|
| | | if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
|
| | |
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/12
|
| | | * @description
|
| | | */
|
| | | public class CanalConfigurationSelector implements DeferredImportSelector {
|
| | |
|
| | | /**
|
| | | * 扫描器 导入指定类, 这里导入 CanalBootstrapConfiguration.class
|
| | | * @param importingClassMetadata
|
| | | * @return java.lang.String[]
|
| | | * @author wuqiong 2022-04-23 20:20
|
| | | */
|
| | | @Override
|
| | | public String[] selectImports(AnnotationMetadata importingClassMetadata) {
|
| | | return new String[]{CanalBootstrapConfiguration.class.getName()};
|
| | |
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/11
|
| | | * @description
|
| | | */
|
| | | public class CanalListenerAnnotationBeanPostProcessor implements
|
| | | BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor {
|
| | |
| | | public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
|
| | | if (notAnnotatedClasses.contains(bean.getClass())) return bean;
|
| | | Class<?> targetClass = AopUtils.getTargetClass(bean);
|
| | | // 只扫描类的方法,目前 CanalListener 只支持在方法上
|
| | | Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
|
| | | (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method));
|
| | | if (annotatedMethods.isEmpty()) {
|
| | | this.notAnnotatedClasses.add(bean.getClass());
|
| | | } else {
|
| | | // 先加入到待注册里面
|
| | | annotatedMethods.entrySet().stream()
|
| | | .filter(e -> e != null)
|
| | | .forEach(ele -> registrars.add(new CanalListenerEndpointRegistrar(bean, ele)));
|
| | | logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", bean.getClass().getName(),
|
| | | logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName,
|
| | | annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet()));
|
| | | }
|
| | | return bean;
|
| | |
| | | import java.util.stream.Collectors;
|
| | |
|
| | | /**
|
| | | * 登记员
|
| | | * @author wuqiong 2022/4/11
|
| | | * @description
|
| | | */
|
| | | public class CanalListenerEndpointRegistrar {
|
| | |
|
| | | private Object bean;
|
| | | private Map.Entry<Method, CanalListener> listenerEntry;
|
| | |
|
| | | /**
|
| | | * 1、目前实现的 DML 解析器仅支持两个参数 <p>
|
| | | * 2、且顺序必须为: CanalEntry.EventType 、 CanalEntry.RowData <p>
|
| | | * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
|
| | | * @param sets
|
| | | * @return void
|
| | | * @author wuqiong 2022-04-23 20:27
|
| | | */
|
| | | public void checkParameter(Set<String> sets) {
|
| | | List<Class<?>> classes = parameterTypes();
|
| | | if (classes.size() > 2
|
| | |
| | | return getListenerEntry().getValue().destination().equals(destination);
|
| | | }
|
| | |
|
| | | /**
|
| | | * 过滤参数
|
| | | * @param database
|
| | | * @param tableName
|
| | | * @param eventType
|
| | | * @return java.util.function.Predicate
|
| | | * @author wuqiong 2022-04-23 20:47
|
| | | */
|
| | | public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
|
| | | String database, String tableName, CanalEntry.EventType eventType) {
|
| | | Predicate<CanalListenerEndpointRegistrar> databases = e -> StringUtils.isBlank(e.getListenerEntry().getValue().database())
|
| | |
| | | import java.util.concurrent.TimeUnit;
|
| | |
|
| | | /**
|
| | | * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理
|
| | | * @author wuqiong 2022/4/11
|
| | | * @description
|
| | | */
|
| | | public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
|
| | | protected boolean isRunning = false;
|
| | |
| | |
|
| | | @Override
|
| | | public void start() {
|
| | | setRunning(true);
|
| | | new Thread(() -> {
|
| | | // spring 启动后 才会进行canal数据拉取
|
| | | while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
|
| | | sleep(5L * SLEEP_TIME_MILLI_SECONDS);
|
| | | initConnect();
|
| | | while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
|
| | | }).start();
|
| | | setRunning(true);
|
| | | }
|
| | |
|
| | | @Override
|
| | |
| | | import java.util.*;
|
| | |
|
| | | /**
|
| | | * DML 数据拉取、解析
|
| | | * @author wuqiong 2022/4/11
|
| | | * @description
|
| | | */
|
| | | public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
|
| | |
|
| | |
| | |
|
| | | public class CanalConnectorFactory {
|
| | |
|
| | | /**
|
| | | * 创建 CanalConnector
|
| | | * @param destination
|
| | | * @param endpointInstance
|
| | | * @return CanalConnector
|
| | | * @author wuqiong 2022-04-23 20:36
|
| | | */
|
| | | public static synchronized CanalConnector createConnector(
|
| | | String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
|
| | | Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
|
| | |
| | | private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#";
|
| | |
|
| | |
|
| | | /**
|
| | | * 将所有待注册的端点,注册到spring中
|
| | | * @param beanFactory
|
| | | * @param canalConfig
|
| | | * @param registrars
|
| | | * @return void
|
| | | * @author wuqiong 2022-04-23 20:34
|
| | | */
|
| | | public static void registerListenerContainer(
|
| | | ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig,
|
| | | Set<CanalListenerEndpointRegistrar> registrars) {
|
| | |
| | | if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 如果已经存在则不在创建
|
| | | List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>();
|
| | | for (CanalListenerEndpointRegistrar registrar : registrars) {
|
| | | if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
|
| | | registrar.checkParameter(canalConfig.getInstances().keySet());
|
| | | if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
|
| | | registrarList.add(registrar);
|
| | | }
|
| | | if (registrarList.isEmpty()) continue;
|
| | |
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/16
|
| | | * @description
|
| | | */
|
| | | public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent>{
|
| | |
|
| | |
| | |
|
| | | @Override
|
| | | public void onApplicationEvent(ApplicationReadyEvent event) {
|
| | | // 确保程序启动之后,再放行所有的 canal transponder
|
| | | if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true);
|
| | | }
|
| | | }
|