duxinglangzi
2022-05-16 7cf9781edf66c75570af378a96b4011ff63ada92
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.protocol.CanalEntry;
5 import com.alibaba.otter.canal.protocol.Message;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import java.lang.reflect.InvocationTargetException;
12 import java.util.*;
13
14 /**
627420 15  * DML 数据拉取、解析
7cf978 16  *
de8c2b 17  * @author wuqiong 2022/4/11
D 18  */
19 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
20
21     private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
22
23     private CanalConnector connector;
24     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
25     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
26     private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
27
28
29     public void initConnect() {
5ac2a7 30         try {
31             // init supportAllTypes
32             registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
33                     Arrays.asList(e.getListenerEntry().getValue().eventType())));
34             connector.connect();
35             connector.subscribe(endpointInstance.getSubscribe());
36             connector.rollback();
37         } catch (Exception e) {
38             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
39             setRunning(false);
40         }
de8c2b 41
D 42     }
43
7cf978 44     public void disconnect() {
929202 45         // 关闭连接
D 46         connector.disconnect();
47     }
48
de8c2b 49
D 50     public void doStart() {
51         Message message = null;
52         try {
53             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
54         } catch (Exception clientException) {
5ac2a7 55             logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
de8c2b 56             endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
D 57             if (endpointInstance.getRetryCount() < 0) {
5ac2a7 58                 logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
de8c2b 59                                 "thread interrupt , current connector host: {} , port: {} ",
D 60                         endpointInstance.getHost(), endpointInstance.getPort());
61                 Thread.currentThread().interrupt();
929202 62                 disconnect();
de8c2b 63             } else {
D 64                 sleep(endpointInstance.getAcquireInterval());
65             }
66             return;
67         }
68         List<CanalEntry.Entry> entries = message.getEntries();
69         if (message.getId() == -1 || entries.isEmpty()) {
70             sleep(endpointInstance.getAcquireInterval());
71             return;
72         }
3ad94b 73         for (CanalEntry.Entry entry : entries) {
74             try {
75                 consumer(entry);
76             } catch (Exception e) {
77                 e.printStackTrace();
5ac2a7 78                 logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
3ad94b 79                 // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
80                 // return;
81             }
de8c2b 82         }
D 83         connector.ack(message.getId()); // 提交确认
84     }
85
86     public DmlMessageTransponderContainer(
87             CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
88             CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
89         this.connector = connector;
90         this.registrars.addAll(registrars);
91         this.endpointInstance = endpointInstance;
92     }
93
94     private void consumer(CanalEntry.Entry entry) {
95         if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
96         CanalEntry.RowChange rowChange = null;
97         try {
98             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
99         } catch (Exception e) {
5ac2a7 100             logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
de8c2b 101             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
D 102         }
103
104         // 忽略 ddl 语句
105         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
106         CanalEntry.EventType eventType = rowChange.getEventType();
107         if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
108         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
109             registrars
110                     .stream()
111                     .filter(CanalListenerEndpointRegistrar.filterArgs(
112                             entry.getHeader().getSchemaName(),
113                             entry.getHeader().getTableName(),
114                             eventType))
115                     .forEach(element -> {
116                         try {
117                             element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
118                         } catch (IllegalAccessException | InvocationTargetException e) {
5ac2a7 119                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
de8c2b 120                             throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
D 121                         }
122                     });
123         }
124     }
125
126 }