duxinglangzi
2022-09-06 ddae546da9487622631d47133c962e7d870982de
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.protocol.CanalEntry;
5 import com.alibaba.otter.canal.protocol.Message;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
c83c1b 8 import com.duxinglangzi.canal.starter.mode.CanalMessage;
de8c2b 9 import org.slf4j.Logger;
D 10 import org.slf4j.LoggerFactory;
11
12 import java.lang.reflect.InvocationTargetException;
13 import java.util.*;
14
15 /**
627420 16  * DML 数据拉取、解析
7cf978 17  *
de8c2b 18  * @author wuqiong 2022/4/11
D 19  */
20 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
21
22     private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
23
24     private CanalConnector connector;
25     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
26     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
a8f947 27     private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
D 28     private Integer local_retry_count;
de8c2b 29
D 30
31     public void initConnect() {
5ac2a7 32         try {
33             // init supportAllTypes
a8f947 34             registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
5ac2a7 35             connector.connect();
36             connector.subscribe(endpointInstance.getSubscribe());
37             connector.rollback();
a8f947 38             // 初始化本地重试次数
D 39             local_retry_count = endpointInstance.getRetryCount();
5ac2a7 40         } catch (Exception e) {
41             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
42             setRunning(false);
43         }
de8c2b 44
D 45     }
46
7cf978 47     public void disconnect() {
929202 48         // 关闭连接
D 49         connector.disconnect();
50     }
51
de8c2b 52
D 53     public void doStart() {
54         try {
ddae54 55             Message message = null;
3ad94b 56             try {
ddae54 57                 message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
D 58             } catch (Exception clientException) {
59                 logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
60                 if (local_retry_count > 0) {
61                     // 重试次数减一
62                     local_retry_count = local_retry_count - 1;
63                     sleep(endpointInstance.getAcquireInterval());
64                 } else {
65                     // 重试次数 <= 0 时,直接终止线程
66                     logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
67                                     "thread interrupt , current connector host: {} , port: {} ",
68                             endpointInstance.getHost(), endpointInstance.getPort());
69                     Thread.currentThread().interrupt();
70                 }
71                 return;
3ad94b 72             }
ddae54 73             // 如果重试次数小于设置的,则修改
D 74             if (local_retry_count < endpointInstance.getRetryCount())
75                 local_retry_count = endpointInstance.getRetryCount();
76             List<CanalEntry.Entry> entries = message.getEntries();
77             if (message.getId() == -1 || entries.isEmpty()) {
78                 sleep(endpointInstance.getAcquireInterval());
79                 return;
80             }
81             for (CanalEntry.Entry entry : entries) {
82                 try {
83                     consumer(entry);
84                 } catch (Exception e) {
85                     e.printStackTrace();
86                     logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
87                     // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
88                     // return;
89                 }
90             }
91             connector.ack(message.getId()); // 提交确认
92         } catch (Exception exc) {
93             logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
94             // 防止删除消息时发生错误,或者拉取消息失败等情况
95             exc.printStackTrace();
de8c2b 96         }
D 97     }
98
99     public DmlMessageTransponderContainer(
100             CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
101             CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
102         this.connector = connector;
103         this.registrars.addAll(registrars);
104         this.endpointInstance = endpointInstance;
105     }
106
107     private void consumer(CanalEntry.Entry entry) {
108         if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
109         CanalEntry.RowChange rowChange = null;
110         try {
111             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
112         } catch (Exception e) {
5ac2a7 113             logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
de8c2b 114             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
D 115         }
116
117         // 忽略 ddl 语句
118         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
119         CanalEntry.EventType eventType = rowChange.getEventType();
a8f947 120         if (!support_all_types.contains(eventType)) return;
de8c2b 121         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
D 122             registrars
123                     .stream()
124                     .filter(CanalListenerEndpointRegistrar.filterArgs(
125                             entry.getHeader().getSchemaName(),
126                             entry.getHeader().getTableName(),
127                             eventType))
128                     .forEach(element -> {
129                         try {
c83c1b 130                             element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
de8c2b 131                         } catch (IllegalAccessException | InvocationTargetException e) {
5ac2a7 132                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
de8c2b 133                             throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
D 134                         }
135                     });
136         }
137     }
138
139 }