duxinglangzi
2022-06-24 a87aa74a3af27960276ed02f4273386d25d2a231
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.protocol.CanalEntry;
5 import com.alibaba.otter.canal.protocol.Message;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import java.lang.reflect.InvocationTargetException;
12 import java.util.*;
13
14 /**
627420 15  * DML 数据拉取、解析
7cf978 16  *
de8c2b 17  * @author wuqiong 2022/4/11
D 18  */
19 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
20
21     private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
22
23     private CanalConnector connector;
24     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
25     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
26     private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
27
28
29     public void initConnect() {
5ac2a7 30         try {
31             // init supportAllTypes
a87aa7 32             registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
5ac2a7 33             connector.connect();
34             connector.subscribe(endpointInstance.getSubscribe());
35             connector.rollback();
36         } catch (Exception e) {
37             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
38             setRunning(false);
39         }
de8c2b 40
D 41     }
42
7cf978 43     public void disconnect() {
929202 44         // 关闭连接
D 45         connector.disconnect();
46     }
47
de8c2b 48
D 49     public void doStart() {
50         Message message = null;
51         try {
52             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
53         } catch (Exception clientException) {
5ac2a7 54             logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
de8c2b 55             endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
D 56             if (endpointInstance.getRetryCount() < 0) {
5ac2a7 57                 logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
de8c2b 58                                 "thread interrupt , current connector host: {} , port: {} ",
D 59                         endpointInstance.getHost(), endpointInstance.getPort());
60                 Thread.currentThread().interrupt();
929202 61                 disconnect();
de8c2b 62             } else {
D 63                 sleep(endpointInstance.getAcquireInterval());
64             }
65             return;
66         }
67         List<CanalEntry.Entry> entries = message.getEntries();
68         if (message.getId() == -1 || entries.isEmpty()) {
69             sleep(endpointInstance.getAcquireInterval());
70             return;
71         }
3ad94b 72         for (CanalEntry.Entry entry : entries) {
73             try {
74                 consumer(entry);
75             } catch (Exception e) {
76                 e.printStackTrace();
5ac2a7 77                 logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
3ad94b 78                 // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
79                 // return;
80             }
de8c2b 81         }
D 82         connector.ack(message.getId()); // 提交确认
83     }
84
85     public DmlMessageTransponderContainer(
86             CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
87             CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
88         this.connector = connector;
89         this.registrars.addAll(registrars);
90         this.endpointInstance = endpointInstance;
91     }
92
93     private void consumer(CanalEntry.Entry entry) {
94         if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
95         CanalEntry.RowChange rowChange = null;
96         try {
97             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
98         } catch (Exception e) {
5ac2a7 99             logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
de8c2b 100             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
D 101         }
102
103         // 忽略 ddl 语句
104         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
105         CanalEntry.EventType eventType = rowChange.getEventType();
106         if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
107         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
108             registrars
109                     .stream()
110                     .filter(CanalListenerEndpointRegistrar.filterArgs(
111                             entry.getHeader().getSchemaName(),
112                             entry.getHeader().getTableName(),
113                             eventType))
114                     .forEach(element -> {
115                         try {
a87aa7 116                             element.getMethod().invoke(element.getBean(), eventType, rowData);
de8c2b 117                         } catch (IllegalAccessException | InvocationTargetException e) {
5ac2a7 118                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
de8c2b 119                             throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
D 120                         }
121                     });
122         }
123     }
124
125 }