duxinglangzi
2022-05-16 7cf9781edf66c75570af378a96b4011ff63ada92
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.factory;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
5 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
6 import com.duxinglangzi.canal.starter.container.DmlMessageTransponderContainer;
7 import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
8
9 import java.util.ArrayList;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Set;
13
14 /**
15  * @author wuqiong 2022/4/18
16  * @description
17  */
18 public class TransponderContainerFactory {
19
20     private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#";
21
22
627420 23     /**
D 24      * 将所有待注册的端点,注册到spring中
7cf978 25      *
627420 26      * @param beanFactory
D 27      * @param canalConfig
28      * @param registrars
29      * @return void
30      * @author wuqiong 2022-04-23 20:34
31      */
de8c2b 32     public static void registerListenerContainer(
D 33             ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig,
34             Set<CanalListenerEndpointRegistrar> registrars) {
35         if (registrars == null || registrars.isEmpty()) return;
36         if (canalConfig == null || canalConfig.getInstances().isEmpty()) return;
37
38         for (Map.Entry<String, CanalAutoConfigurationProperties.EndpointInstance> endpointInstance : canalConfig.getInstances().entrySet()) {
39             if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 如果已经存在则不在创建
40             List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>();
41             for (CanalListenerEndpointRegistrar registrar : registrars) {
42                 registrar.checkParameter(canalConfig.getInstances().keySet());
627420 43                 if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
de8c2b 44                 registrarList.add(registrar);
D 45             }
46             if (registrarList.isEmpty()) continue;
47             registerTransponderContainer(
48                     endpointInstance.getKey(), endpointInstance.getValue(), beanFactory, registrarList);
49         }
50
51     }
52
53     private static void registerTransponderContainer(
54             String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance,
55             ConfigurableListableBeanFactory beanFactory, List<CanalListenerEndpointRegistrar> registrarList) {
56         CanalConnector connector = CanalConnectorFactory.createConnector(destination, endpointInstance);
57         beanFactory.registerSingleton(getContainerID(destination),
58                 new DmlMessageTransponderContainer(connector, registrarList, endpointInstance));
59     }
60
61
62     private static String getContainerID(String destination) {
63         return CONTAINER_ID_PREFIX + "#" + destination;
64     }
65
66
67 }