duxinglangzi
2022-04-23 6274208525b7e80c208f614915ef973d63834101
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.factory;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.client.CanalConnectors;
5 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import org.springframework.util.Assert;
8 import org.springframework.util.StringUtils;
9
10 import java.net.InetSocketAddress;
11 import java.net.SocketAddress;
12 import java.util.ArrayList;
13 import java.util.List;
14
15
16 public class CanalConnectorFactory {
17
627420 18     /**
D 19      * 创建 CanalConnector
20      * @param destination
21      * @param endpointInstance
22      * @return CanalConnector
23      * @author wuqiong 2022-04-23 20:36
24      */
de8c2b 25     public static synchronized CanalConnector createConnector(
D 26             String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
27         Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
28         Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check ");
29         CanalConnector connector;
30         if (endpointInstance.isClusterEnabled()) {
31             if (!StringUtils.hasText(endpointInstance.getZookeeperAddress()))
32                 throw new CanalClientException("zookeeper address is null");
33             List<SocketAddress> addresses = new ArrayList<>();
34             for (String s : endpointInstance.getZookeeperAddress().split(",")) {
35                 String[] split = s.split(":");
36                 if (split.length != 2)
37                     throw new CanalClientException("error parsing zookeeper address:" + s);
38                 addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
39             }
40             connector = CanalConnectors.newClusterConnector(
41                     addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword());
42         } else {
43             connector = CanalConnectors.newSingleConnector(
44                     new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()),
45                     destination, endpointInstance.getUserName(), endpointInstance.getPassword());
46         }
47         return connector;
48     }
49 }