package com.hz.canal.starter.container; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; import com.hz.canal.starter.mode.CanalMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.InvocationTargetException; import java.util.*; /** * DML 数据拉取、解析 * * @author wuqiong 2022/4/11 */ public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); private CanalConnector connector; private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; private List registrars = new ArrayList<>(); private Set support_all_types = new HashSet<>(); private Integer local_retry_count; public void initConnect() { try { // init supportAllTypes registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); connector.connect(); connector.subscribe(endpointInstance.getSubscribe()); connector.rollback(); // 初始化本地重试次数 local_retry_count = endpointInstance.getRetryCount(); } catch (Exception e) { logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); setRunning(false); } } public void disconnect() { // 关闭连接 connector.disconnect(); } public void doStart() { try { Message message = null; try { message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据 } catch (Exception clientException) { logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); if (local_retry_count > 0) { // 重试次数减一 local_retry_count = local_retry_count - 1; sleep(endpointInstance.getAcquireInterval()); } else { // 重试次数 <= 0 时,直接终止线程 logger.error("[DmlMessageTransponderContainer] retry count is zero , " + "thread interrupt , current connector host: {} , port: {} ", endpointInstance.getHost(), endpointInstance.getPort()); Thread.currentThread().interrupt(); } return; } // 如果重试次数小于设置的,则修改 if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount(); List entries = message.getEntries(); if (message.getId() == -1 || entries.isEmpty()) { sleep(endpointInstance.getAcquireInterval()); return; } for (CanalEntry.Entry entry : entries) { try { consumer(entry); } catch (Exception e) { e.printStackTrace(); logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据 // return; } } connector.ack(message.getId()); // 提交确认 } catch (Exception exc) { logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); // 防止删除消息时发生错误,或者拉取消息失败等情况 exc.printStackTrace(); } } public DmlMessageTransponderContainer( CanalConnector connector, List registrars, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { this.connector = connector; this.registrars.addAll(registrars); this.endpointInstance = endpointInstance; } private void consumer(CanalEntry.Entry entry) { if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; CanalEntry.RowChange rowChange = null; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); } // 忽略 ddl 语句 if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; CanalEntry.EventType eventType = rowChange.getEventType(); if (!support_all_types.contains(eventType)) return; for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { registrars .stream() .filter(CanalListenerEndpointRegistrar.filterArgs( entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)) .forEach(element -> { try { element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); } catch (IllegalAccessException | InvocationTargetException e) { logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e); } }); } } }