renxue
2022-10-24 0b055a3f554da3a934e79e88c4781705cbab5a21
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package com.hz.canal.starter.container;
 
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar;
import com.hz.canal.starter.mode.CanalMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.lang.reflect.InvocationTargetException;
import java.util.*;
 
/**
 * DML 数据拉取、解析
 *
 * @author wuqiong 2022/4/11
 */
public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
 
    private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
 
    private CanalConnector connector;
    private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
    private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
    private Integer local_retry_count;
 
 
    public void initConnect() {
        try {
            // init supportAllTypes
            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
            connector.connect();
            connector.subscribe(endpointInstance.getSubscribe());
            connector.rollback();
            // 初始化本地重试次数
            local_retry_count = endpointInstance.getRetryCount();
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
            setRunning(false);
        }
 
    }
 
    public void disconnect() {
        // 关闭连接
        connector.disconnect();
    }
 
 
    public void doStart() {
        try {
            Message message = null;
            try {
                message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
            } catch (Exception clientException) {
                logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
                if (local_retry_count > 0) {
                    // 重试次数减一
                    local_retry_count = local_retry_count - 1;
                    sleep(endpointInstance.getAcquireInterval());
                } else {
                    // 重试次数 <= 0 时,直接终止线程
                    logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                    "thread interrupt , current connector host: {} , port: {} ",
                            endpointInstance.getHost(), endpointInstance.getPort());
                    Thread.currentThread().interrupt();
                }
                return;
            }
            // 如果重试次数小于设置的,则修改
            if (local_retry_count < endpointInstance.getRetryCount())
                local_retry_count = endpointInstance.getRetryCount();
            List<CanalEntry.Entry> entries = message.getEntries();
            if (message.getId() == -1 || entries.isEmpty()) {
                sleep(endpointInstance.getAcquireInterval());
                return;
            }
            for (CanalEntry.Entry entry : entries) {
                try {
                    consumer(entry);
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
                    // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
                    // return;
                }
            }
            connector.ack(message.getId()); // 提交确认
        } catch (Exception exc) {
            logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
            // 防止删除消息时发生错误,或者拉取消息失败等情况
            exc.printStackTrace();
        }
    }
 
    public DmlMessageTransponderContainer(
            CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
            CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
        this.connector = connector;
        this.registrars.addAll(registrars);
        this.endpointInstance = endpointInstance;
    }
 
    private void consumer(CanalEntry.Entry entry) {
        if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
        CanalEntry.RowChange rowChange = null;
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
            throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
        }
 
        // 忽略 ddl 语句
        if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
        CanalEntry.EventType eventType = rowChange.getEventType();
        if (!support_all_types.contains(eventType)) return;
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            registrars
                    .stream()
                    .filter(CanalListenerEndpointRegistrar.filterArgs(
                            entry.getHeader().getSchemaName(),
                            entry.getHeader().getTableName(),
                            eventType))
                    .forEach(element -> {
                        try {
                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
                        }
                    });
        }
    }
 
}