duxinglangzi
2022-04-24 fa654ac361fcc231bc389ca446f78d501c7060c8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package com.duxinglangzi.canal.starter.container;
 
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.lang.reflect.InvocationTargetException;
import java.util.*;
 
/**
 * DML 数据拉取、解析
 * @author wuqiong 2022/4/11
 */
public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
 
    private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
 
    private CanalConnector connector;
    private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
    private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
    private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
 
 
    public void initConnect() {
        // init supportAllTypes
        registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
                Arrays.asList(e.getListenerEntry().getValue().eventType())));
        connector.connect();
        connector.subscribe(endpointInstance.getSubscribe());
        connector.rollback();
 
    }
 
 
    public void doStart() {
        Message message = null;
        try {
            message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
        } catch (Exception clientException) {
            logger.error("[MessageTransponderContainer] error msg : ", clientException);
            endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
            if (endpointInstance.getRetryCount() < 0) {
                logger.error("[MessageTransponderContainer] retry count is zero ,  " +
                                "thread interrupt , current connector host: {} , port: {} ",
                        endpointInstance.getHost(), endpointInstance.getPort());
                Thread.currentThread().interrupt();
                connector.disconnect();
            } else {
                sleep(endpointInstance.getAcquireInterval());
            }
            return;
        }
        List<CanalEntry.Entry> entries = message.getEntries();
        if (message.getId() == -1 || entries.isEmpty()) {
            sleep(endpointInstance.getAcquireInterval());
            return;
        }
        try {
            entries.forEach(e -> consumer(e));
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
            // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
            // return;
        }
        connector.ack(message.getId()); // 提交确认
    }
 
    public DmlMessageTransponderContainer(
            CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
            CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
        this.connector = connector;
        this.registrars.addAll(registrars);
        this.endpointInstance = endpointInstance;
    }
 
    private void consumer(CanalEntry.Entry entry) {
        if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            logger.error("[MessageTransponderContainer_consumer] RowChange parse has an error ", e);
            throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
        }
 
        // 忽略 ddl 语句
        if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
        CanalEntry.EventType eventType = rowChange.getEventType();
        if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            registrars
                    .stream()
                    .filter(CanalListenerEndpointRegistrar.filterArgs(
                            entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(),
                            eventType))
                    .forEach(element -> {
                        try {
                            element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("[MessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
                        }
                    });
        }
    }
 
}