renxue
2022-10-24 0b055a3f554da3a934e79e88c4781705cbab5a21
提交 | 用户 | age
0b055a 1 package com.hz.canal.starter.container;
R 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.protocol.CanalEntry;
5 import com.alibaba.otter.canal.protocol.Message;
6 import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar;
8 import com.hz.canal.starter.mode.CanalMessage;
9 import org.slf4j.Logger;
10 import org.slf4j.LoggerFactory;
11
12 import java.lang.reflect.InvocationTargetException;
13 import java.util.*;
14
15 /**
16  * DML 数据拉取、解析
17  *
18  * @author wuqiong 2022/4/11
19  */
20 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
21
22     private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
23
24     private CanalConnector connector;
25     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
26     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
27     private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
28     private Integer local_retry_count;
29
30
31     public void initConnect() {
32         try {
33             // init supportAllTypes
34             registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
35             connector.connect();
36             connector.subscribe(endpointInstance.getSubscribe());
37             connector.rollback();
38             // 初始化本地重试次数
39             local_retry_count = endpointInstance.getRetryCount();
40         } catch (Exception e) {
41             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
42             setRunning(false);
43         }
44
45     }
46
47     public void disconnect() {
48         // 关闭连接
49         connector.disconnect();
50     }
51
52
53     public void doStart() {
54         try {
55             Message message = null;
56             try {
57                 message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
58             } catch (Exception clientException) {
59                 logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
60                 if (local_retry_count > 0) {
61                     // 重试次数减一
62                     local_retry_count = local_retry_count - 1;
63                     sleep(endpointInstance.getAcquireInterval());
64                 } else {
65                     // 重试次数 <= 0 时,直接终止线程
66                     logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
67                                     "thread interrupt , current connector host: {} , port: {} ",
68                             endpointInstance.getHost(), endpointInstance.getPort());
69                     Thread.currentThread().interrupt();
70                 }
71                 return;
72             }
73             // 如果重试次数小于设置的,则修改
74             if (local_retry_count < endpointInstance.getRetryCount())
75                 local_retry_count = endpointInstance.getRetryCount();
76             List<CanalEntry.Entry> entries = message.getEntries();
77             if (message.getId() == -1 || entries.isEmpty()) {
78                 sleep(endpointInstance.getAcquireInterval());
79                 return;
80             }
81             for (CanalEntry.Entry entry : entries) {
82                 try {
83                     consumer(entry);
84                 } catch (Exception e) {
85                     e.printStackTrace();
86                     logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
87                     // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
88                     // return;
89                 }
90             }
91             connector.ack(message.getId()); // 提交确认
92         } catch (Exception exc) {
93             logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
94             // 防止删除消息时发生错误,或者拉取消息失败等情况
95             exc.printStackTrace();
96         }
97     }
98
99     public DmlMessageTransponderContainer(
100             CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
101             CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
102         this.connector = connector;
103         this.registrars.addAll(registrars);
104         this.endpointInstance = endpointInstance;
105     }
106
107     private void consumer(CanalEntry.Entry entry) {
108         if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
109         CanalEntry.RowChange rowChange = null;
110         try {
111             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
112         } catch (Exception e) {
113             logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
114             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
115         }
116
117         // 忽略 ddl 语句
118         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
119         CanalEntry.EventType eventType = rowChange.getEventType();
120         if (!support_all_types.contains(eventType)) return;
121         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
122             registrars
123                     .stream()
124                     .filter(CanalListenerEndpointRegistrar.filterArgs(
125                             entry.getHeader().getSchemaName(),
126                             entry.getHeader().getTableName(),
127                             eventType))
128                     .forEach(element -> {
129                         try {
130                             element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
131                         } catch (IllegalAccessException | InvocationTargetException e) {
132                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
133                             throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
134                         }
135                     });
136         }
137     }
138
139 }