提交 | 用户 | age
|
0b055a
|
1 |
package com.hz.canal.starter.container; |
R |
2 |
|
|
3 |
import com.alibaba.otter.canal.client.CanalConnector; |
|
4 |
import com.alibaba.otter.canal.protocol.CanalEntry; |
|
5 |
import com.alibaba.otter.canal.protocol.Message; |
|
6 |
import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; |
|
7 |
import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; |
|
8 |
import com.hz.canal.starter.mode.CanalMessage; |
|
9 |
import org.slf4j.Logger; |
|
10 |
import org.slf4j.LoggerFactory; |
|
11 |
|
|
12 |
import java.lang.reflect.InvocationTargetException; |
|
13 |
import java.util.*; |
|
14 |
|
|
15 |
/** |
|
16 |
* DML 数据拉取、解析 |
|
17 |
* |
|
18 |
* @author wuqiong 2022/4/11 |
|
19 |
*/ |
|
20 |
public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { |
|
21 |
|
|
22 |
private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); |
|
23 |
|
|
24 |
private CanalConnector connector; |
|
25 |
private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; |
|
26 |
private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); |
|
27 |
private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); |
|
28 |
private Integer local_retry_count; |
|
29 |
|
|
30 |
|
|
31 |
public void initConnect() { |
|
32 |
try { |
|
33 |
// init supportAllTypes |
|
34 |
registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); |
|
35 |
connector.connect(); |
|
36 |
connector.subscribe(endpointInstance.getSubscribe()); |
|
37 |
connector.rollback(); |
|
38 |
// 初始化本地重试次数 |
|
39 |
local_retry_count = endpointInstance.getRetryCount(); |
|
40 |
} catch (Exception e) { |
|
41 |
logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); |
|
42 |
setRunning(false); |
|
43 |
} |
|
44 |
|
|
45 |
} |
|
46 |
|
|
47 |
public void disconnect() { |
|
48 |
// 关闭连接 |
|
49 |
connector.disconnect(); |
|
50 |
} |
|
51 |
|
|
52 |
|
|
53 |
public void doStart() { |
|
54 |
try { |
|
55 |
Message message = null; |
|
56 |
try { |
|
57 |
message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据 |
|
58 |
} catch (Exception clientException) { |
|
59 |
logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); |
|
60 |
if (local_retry_count > 0) { |
|
61 |
// 重试次数减一 |
|
62 |
local_retry_count = local_retry_count - 1; |
|
63 |
sleep(endpointInstance.getAcquireInterval()); |
|
64 |
} else { |
|
65 |
// 重试次数 <= 0 时,直接终止线程 |
|
66 |
logger.error("[DmlMessageTransponderContainer] retry count is zero , " + |
|
67 |
"thread interrupt , current connector host: {} , port: {} ", |
|
68 |
endpointInstance.getHost(), endpointInstance.getPort()); |
|
69 |
Thread.currentThread().interrupt(); |
|
70 |
} |
|
71 |
return; |
|
72 |
} |
|
73 |
// 如果重试次数小于设置的,则修改 |
|
74 |
if (local_retry_count < endpointInstance.getRetryCount()) |
|
75 |
local_retry_count = endpointInstance.getRetryCount(); |
|
76 |
List<CanalEntry.Entry> entries = message.getEntries(); |
|
77 |
if (message.getId() == -1 || entries.isEmpty()) { |
|
78 |
sleep(endpointInstance.getAcquireInterval()); |
|
79 |
return; |
|
80 |
} |
|
81 |
for (CanalEntry.Entry entry : entries) { |
|
82 |
try { |
|
83 |
consumer(entry); |
|
84 |
} catch (Exception e) { |
|
85 |
e.printStackTrace(); |
|
86 |
logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); |
|
87 |
// connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据 |
|
88 |
// return; |
|
89 |
} |
|
90 |
} |
|
91 |
connector.ack(message.getId()); // 提交确认 |
|
92 |
} catch (Exception exc) { |
|
93 |
logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); |
|
94 |
// 防止删除消息时发生错误,或者拉取消息失败等情况 |
|
95 |
exc.printStackTrace(); |
|
96 |
} |
|
97 |
} |
|
98 |
|
|
99 |
public DmlMessageTransponderContainer( |
|
100 |
CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars, |
|
101 |
CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { |
|
102 |
this.connector = connector; |
|
103 |
this.registrars.addAll(registrars); |
|
104 |
this.endpointInstance = endpointInstance; |
|
105 |
} |
|
106 |
|
|
107 |
private void consumer(CanalEntry.Entry entry) { |
|
108 |
if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; |
|
109 |
CanalEntry.RowChange rowChange = null; |
|
110 |
try { |
|
111 |
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); |
|
112 |
} catch (Exception e) { |
|
113 |
logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); |
|
114 |
throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); |
|
115 |
} |
|
116 |
|
|
117 |
// 忽略 ddl 语句 |
|
118 |
if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; |
|
119 |
CanalEntry.EventType eventType = rowChange.getEventType(); |
|
120 |
if (!support_all_types.contains(eventType)) return; |
|
121 |
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { |
|
122 |
registrars |
|
123 |
.stream() |
|
124 |
.filter(CanalListenerEndpointRegistrar.filterArgs( |
|
125 |
entry.getHeader().getSchemaName(), |
|
126 |
entry.getHeader().getTableName(), |
|
127 |
eventType)) |
|
128 |
.forEach(element -> { |
|
129 |
try { |
|
130 |
element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); |
|
131 |
} catch (IllegalAccessException | InvocationTargetException e) { |
|
132 |
logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); |
|
133 |
throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e); |
|
134 |
} |
|
135 |
}); |
|
136 |
} |
|
137 |
} |
|
138 |
|
|
139 |
} |