renxue
2022-10-24 0b055a3f554da3a934e79e88c4781705cbab5a21
提交 | 用户 | age
0b055a 1 package com.hz.canal.starter.factory;
R 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.client.CanalConnectors;
5 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
6 import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import org.springframework.util.Assert;
8 import org.springframework.util.StringUtils;
9
10 import java.net.InetSocketAddress;
11 import java.net.SocketAddress;
12 import java.util.ArrayList;
13 import java.util.List;
14
15
16 public class CanalConnectorFactory {
17
18     /**
19      * 创建 CanalConnector
20      *
21      * @param destination
22      * @param endpointInstance
23      * @return CanalConnector
24      * @author wuqiong 2022-04-23 20:36
25      */
26     public static synchronized CanalConnector createConnector(
27             String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
28         Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
29         Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check ");
30         CanalConnector connector;
31         if (endpointInstance.isClusterEnabled()) {
32             if (!StringUtils.hasText(endpointInstance.getZookeeperAddress()))
33                 throw new CanalClientException("zookeeper address is null");
34             List<SocketAddress> addresses = new ArrayList<>();
35             for (String s : endpointInstance.getZookeeperAddress().split(",")) {
36                 String[] split = s.split(":");
37                 if (split.length != 2)
38                     throw new CanalClientException("error parsing zookeeper address:" + s);
39                 addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
40             }
41             connector = CanalConnectors.newClusterConnector(
42                     addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword());
43         } else {
44             connector = CanalConnectors.newSingleConnector(
45                     new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()),
46                     destination, endpointInstance.getUserName(), endpointInstance.getPassword());
47         }
48         return connector;
49     }
50 }