duxinglangzi
2022-08-04 a8f947d2bc621051821e0cc57335aa6ca1776a8e
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.client.CanalConnector;
4 import com.alibaba.otter.canal.protocol.CanalEntry;
5 import com.alibaba.otter.canal.protocol.Message;
6 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
7 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10
11 import java.lang.reflect.InvocationTargetException;
12 import java.util.*;
13
14 /**
627420 15  * DML 数据拉取、解析
7cf978 16  *
de8c2b 17  * @author wuqiong 2022/4/11
D 18  */
19 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
20
21     private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
22
23     private CanalConnector connector;
24     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
25     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
a8f947 26     private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
D 27     private Integer local_retry_count;
de8c2b 28
D 29
30     public void initConnect() {
5ac2a7 31         try {
32             // init supportAllTypes
a8f947 33             registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
5ac2a7 34             connector.connect();
35             connector.subscribe(endpointInstance.getSubscribe());
36             connector.rollback();
a8f947 37             // 初始化本地重试次数
D 38             local_retry_count = endpointInstance.getRetryCount();
5ac2a7 39         } catch (Exception e) {
40             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
41             setRunning(false);
42         }
de8c2b 43
D 44     }
45
7cf978 46     public void disconnect() {
929202 47         // 关闭连接
D 48         connector.disconnect();
49     }
50
de8c2b 51
D 52     public void doStart() {
53         Message message = null;
54         try {
55             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
56         } catch (Exception clientException) {
5ac2a7 57             logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
a8f947 58             if (local_retry_count > 0) {
D 59                 // 重试次数减一
60                 local_retry_count = local_retry_count - 1;
61                 sleep(endpointInstance.getAcquireInterval());
62             } else {
63                 // 重试次数 <= 0 时,直接终止线程
5ac2a7 64                 logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
de8c2b 65                                 "thread interrupt , current connector host: {} , port: {} ",
D 66                         endpointInstance.getHost(), endpointInstance.getPort());
67                 Thread.currentThread().interrupt();
68             }
69             return;
70         }
a8f947 71         // 如果重试次数小于设置的,则修改
D 72         if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount();
de8c2b 73         List<CanalEntry.Entry> entries = message.getEntries();
D 74         if (message.getId() == -1 || entries.isEmpty()) {
75             sleep(endpointInstance.getAcquireInterval());
76             return;
77         }
3ad94b 78         for (CanalEntry.Entry entry : entries) {
79             try {
80                 consumer(entry);
81             } catch (Exception e) {
82                 e.printStackTrace();
5ac2a7 83                 logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
3ad94b 84                 // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
85                 // return;
86             }
de8c2b 87         }
D 88         connector.ack(message.getId()); // 提交确认
89     }
90
91     public DmlMessageTransponderContainer(
92             CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
93             CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
94         this.connector = connector;
95         this.registrars.addAll(registrars);
96         this.endpointInstance = endpointInstance;
97     }
98
99     private void consumer(CanalEntry.Entry entry) {
100         if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
101         CanalEntry.RowChange rowChange = null;
102         try {
103             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
104         } catch (Exception e) {
5ac2a7 105             logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
de8c2b 106             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
D 107         }
108
109         // 忽略 ddl 语句
110         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
111         CanalEntry.EventType eventType = rowChange.getEventType();
a8f947 112         if (!support_all_types.contains(eventType)) return;
de8c2b 113         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
D 114             registrars
115                     .stream()
116                     .filter(CanalListenerEndpointRegistrar.filterArgs(
117                             entry.getHeader().getSchemaName(),
118                             entry.getHeader().getTableName(),
119                             eventType))
120                     .forEach(element -> {
121                         try {
a87aa7 122                             element.getMethod().invoke(element.getBean(), eventType, rowData);
de8c2b 123                         } catch (IllegalAccessException | InvocationTargetException e) {
5ac2a7 124                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
de8c2b 125                             throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
D 126                         }
127                     });
128         }
129     }
130
131 }