duxinglangzi
2022-04-22 de8c2b2a4654893dc2c80f1fe095c165485bee5f
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.factory;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.client.CanalConnectors;
5 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import org.springframework.util.Assert;
8 import org.springframework.util.StringUtils;
9
10 import java.net.InetSocketAddress;
11 import java.net.SocketAddress;
12 import java.util.ArrayList;
13 import java.util.List;
14
15
16 public class CanalConnectorFactory {
17
18     public static synchronized CanalConnector createConnector(
19             String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
20         Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
21         Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check ");
22         CanalConnector connector;
23         if (endpointInstance.isClusterEnabled()) {
24             if (!StringUtils.hasText(endpointInstance.getZookeeperAddress()))
25                 throw new CanalClientException("zookeeper address is null");
26             List<SocketAddress> addresses = new ArrayList<>();
27             for (String s : endpointInstance.getZookeeperAddress().split(",")) {
28                 String[] split = s.split(":");
29                 if (split.length != 2)
30                     throw new CanalClientException("error parsing zookeeper address:" + s);
31                 addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
32             }
33             connector = CanalConnectors.newClusterConnector(
34                     addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword());
35         } else {
36             connector = CanalConnectors.newSingleConnector(
37                     new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()),
38                     destination, endpointInstance.getUserName(), endpointInstance.getPassword());
39         }
40         return connector;
41     }
42 }