duxinglangzi
2022-09-06 ed3a1614384279b7d3a97e7411b649476a934ddb
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.factory;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.client.CanalConnectors;
5 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import org.springframework.util.Assert;
8 import org.springframework.util.StringUtils;
9
10 import java.net.InetSocketAddress;
11 import java.net.SocketAddress;
12 import java.util.ArrayList;
13 import java.util.List;
14
15
16 public class CanalConnectorFactory {
17
627420 18     /**
D 19      * 创建 CanalConnector
7cf978 20      *
627420 21      * @param destination
D 22      * @param endpointInstance
23      * @return CanalConnector
24      * @author wuqiong 2022-04-23 20:36
25      */
de8c2b 26     public static synchronized CanalConnector createConnector(
D 27             String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
28         Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
29         Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check ");
30         CanalConnector connector;
31         if (endpointInstance.isClusterEnabled()) {
32             if (!StringUtils.hasText(endpointInstance.getZookeeperAddress()))
33                 throw new CanalClientException("zookeeper address is null");
34             List<SocketAddress> addresses = new ArrayList<>();
35             for (String s : endpointInstance.getZookeeperAddress().split(",")) {
36                 String[] split = s.split(":");
37                 if (split.length != 2)
38                     throw new CanalClientException("error parsing zookeeper address:" + s);
39                 addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
40             }
41             connector = CanalConnectors.newClusterConnector(
42                     addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword());
43         } else {
44             connector = CanalConnectors.newSingleConnector(
45                     new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()),
46                     destination, endpointInstance.getUserName(), endpointInstance.getPassword());
47         }
48         return connector;
49     }
50 }