duxinglangzi
2022-08-15 c83c1b4ee7ff9c01a7a67855863c281589f39c72
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.protocol.CanalEntry;
5 import com.alibaba.otter.canal.protocol.Message;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
c83c1b 8 import com.duxinglangzi.canal.starter.mode.CanalMessage;
de8c2b 9 import org.slf4j.Logger;
D 10 import org.slf4j.LoggerFactory;
11
12 import java.lang.reflect.InvocationTargetException;
13 import java.util.*;
14
15 /**
627420 16  * DML 数据拉取、解析
7cf978 17  *
de8c2b 18  * @author wuqiong 2022/4/11
D 19  */
20 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
21
22     private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
23
24     private CanalConnector connector;
25     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
26     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
a8f947 27     private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
D 28     private Integer local_retry_count;
de8c2b 29
D 30
31     public void initConnect() {
5ac2a7 32         try {
33             // init supportAllTypes
a8f947 34             registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
5ac2a7 35             connector.connect();
36             connector.subscribe(endpointInstance.getSubscribe());
37             connector.rollback();
a8f947 38             // 初始化本地重试次数
D 39             local_retry_count = endpointInstance.getRetryCount();
5ac2a7 40         } catch (Exception e) {
41             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
42             setRunning(false);
43         }
de8c2b 44
D 45     }
46
7cf978 47     public void disconnect() {
929202 48         // 关闭连接
D 49         connector.disconnect();
50     }
51
de8c2b 52
D 53     public void doStart() {
54         Message message = null;
55         try {
56             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
57         } catch (Exception clientException) {
5ac2a7 58             logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
a8f947 59             if (local_retry_count > 0) {
D 60                 // 重试次数减一
61                 local_retry_count = local_retry_count - 1;
62                 sleep(endpointInstance.getAcquireInterval());
63             } else {
64                 // 重试次数 <= 0 时,直接终止线程
5ac2a7 65                 logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
de8c2b 66                                 "thread interrupt , current connector host: {} , port: {} ",
D 67                         endpointInstance.getHost(), endpointInstance.getPort());
68                 Thread.currentThread().interrupt();
69             }
70             return;
71         }
a8f947 72         // 如果重试次数小于设置的,则修改
D 73         if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount();
de8c2b 74         List<CanalEntry.Entry> entries = message.getEntries();
D 75         if (message.getId() == -1 || entries.isEmpty()) {
76             sleep(endpointInstance.getAcquireInterval());
77             return;
78         }
3ad94b 79         for (CanalEntry.Entry entry : entries) {
80             try {
81                 consumer(entry);
82             } catch (Exception e) {
83                 e.printStackTrace();
5ac2a7 84                 logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
3ad94b 85                 // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
86                 // return;
87             }
de8c2b 88         }
D 89         connector.ack(message.getId()); // 提交确认
90     }
91
92     public DmlMessageTransponderContainer(
93             CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
94             CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
95         this.connector = connector;
96         this.registrars.addAll(registrars);
97         this.endpointInstance = endpointInstance;
98     }
99
100     private void consumer(CanalEntry.Entry entry) {
101         if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
102         CanalEntry.RowChange rowChange = null;
103         try {
104             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
105         } catch (Exception e) {
5ac2a7 106             logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
de8c2b 107             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
D 108         }
109
110         // 忽略 ddl 语句
111         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
112         CanalEntry.EventType eventType = rowChange.getEventType();
a8f947 113         if (!support_all_types.contains(eventType)) return;
de8c2b 114         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
D 115             registrars
116                     .stream()
117                     .filter(CanalListenerEndpointRegistrar.filterArgs(
118                             entry.getHeader().getSchemaName(),
119                             entry.getHeader().getTableName(),
120                             eventType))
121                     .forEach(element -> {
122                         try {
c83c1b 123                             element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
de8c2b 124                         } catch (IllegalAccessException | InvocationTargetException e) {
5ac2a7 125                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
de8c2b 126                             throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
D 127                         }
128                     });
129         }
130     }
131
132 }