From 0b055a3f554da3a934e79e88c4781705cbab5a21 Mon Sep 17 00:00:00 2001 From: renxue <auster_i@163.com> Date: 星期一, 24 十月 2022 11:40:26 +0800 Subject: [PATCH] 修改包下的类名 --- src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java | 278 +++++++++++++++++++++++++++--------------------------- 1 files changed, 139 insertions(+), 139 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java similarity index 95% rename from src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java rename to src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java index 6865b86..480c158 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java @@ -1,139 +1,139 @@ -package com.duxinglangzi.canal.starter.container; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.alibaba.otter.canal.protocol.Message; -import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; -import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar; -import com.duxinglangzi.canal.starter.mode.CanalMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationTargetException; -import java.util.*; - -/** - * DML 鏁版嵁鎷夊彇銆佽В鏋� - * - * @author wuqiong 2022/4/11 - */ -public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { - - private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); - - private CanalConnector connector; - private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; - private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); - private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); - private Integer local_retry_count; - - - public void initConnect() { - try { - // init supportAllTypes - registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); - connector.connect(); - connector.subscribe(endpointInstance.getSubscribe()); - connector.rollback(); - // 鍒濆鍖栨湰鍦伴噸璇曟鏁� - local_retry_count = endpointInstance.getRetryCount(); - } catch (Exception e) { - logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); - setRunning(false); - } - - } - - public void disconnect() { - // 鍏抽棴杩炴帴 - connector.disconnect(); - } - - - public void doStart() { - try { - Message message = null; - try { - message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� - } catch (Exception clientException) { - logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); - if (local_retry_count > 0) { - // 閲嶈瘯娆℃暟鍑忎竴 - local_retry_count = local_retry_count - 1; - sleep(endpointInstance.getAcquireInterval()); - } else { - // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ - logger.error("[DmlMessageTransponderContainer] retry count is zero , " + - "thread interrupt , current connector host: {} , port: {} ", - endpointInstance.getHost(), endpointInstance.getPort()); - Thread.currentThread().interrupt(); - } - return; - } - // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� - if (local_retry_count < endpointInstance.getRetryCount()) - local_retry_count = endpointInstance.getRetryCount(); - List<CanalEntry.Entry> entries = message.getEntries(); - if (message.getId() == -1 || entries.isEmpty()) { - sleep(endpointInstance.getAcquireInterval()); - return; - } - for (CanalEntry.Entry entry : entries) { - try { - consumer(entry); - } catch (Exception e) { - e.printStackTrace(); - logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); - // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 - // return; - } - } - connector.ack(message.getId()); // 鎻愪氦纭 - } catch (Exception exc) { - logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); - // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌 - exc.printStackTrace(); - } - } - - public DmlMessageTransponderContainer( - CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars, - CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { - this.connector = connector; - this.registrars.addAll(registrars); - this.endpointInstance = endpointInstance; - } - - private void consumer(CanalEntry.Entry entry) { - if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; - CanalEntry.RowChange rowChange = null; - try { - rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); - } catch (Exception e) { - logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); - throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); - } - - // 蹇界暐 ddl 璇彞 - if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; - CanalEntry.EventType eventType = rowChange.getEventType(); - if (!support_all_types.contains(eventType)) return; - for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { - registrars - .stream() - .filter(CanalListenerEndpointRegistrar.filterArgs( - entry.getHeader().getSchemaName(), - entry.getHeader().getTableName(), - eventType)) - .forEach(element -> { - try { - element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); - } catch (IllegalAccessException | InvocationTargetException e) { - logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); - throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); - } - }); - } - } - -} +package com.hz.canal.starter.container; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; +import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; +import com.hz.canal.starter.mode.CanalMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.*; + +/** + * DML 鏁版嵁鎷夊彇銆佽В鏋� + * + * @author wuqiong 2022/4/11 + */ +public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { + + private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); + + private CanalConnector connector; + private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; + private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); + private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); + private Integer local_retry_count; + + + public void initConnect() { + try { + // init supportAllTypes + registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); + connector.connect(); + connector.subscribe(endpointInstance.getSubscribe()); + connector.rollback(); + // 鍒濆鍖栨湰鍦伴噸璇曟鏁� + local_retry_count = endpointInstance.getRetryCount(); + } catch (Exception e) { + logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); + setRunning(false); + } + + } + + public void disconnect() { + // 鍏抽棴杩炴帴 + connector.disconnect(); + } + + + public void doStart() { + try { + Message message = null; + try { + message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� + } catch (Exception clientException) { + logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); + if (local_retry_count > 0) { + // 閲嶈瘯娆℃暟鍑忎竴 + local_retry_count = local_retry_count - 1; + sleep(endpointInstance.getAcquireInterval()); + } else { + // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ + logger.error("[DmlMessageTransponderContainer] retry count is zero , " + + "thread interrupt , current connector host: {} , port: {} ", + endpointInstance.getHost(), endpointInstance.getPort()); + Thread.currentThread().interrupt(); + } + return; + } + // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� + if (local_retry_count < endpointInstance.getRetryCount()) + local_retry_count = endpointInstance.getRetryCount(); + List<CanalEntry.Entry> entries = message.getEntries(); + if (message.getId() == -1 || entries.isEmpty()) { + sleep(endpointInstance.getAcquireInterval()); + return; + } + for (CanalEntry.Entry entry : entries) { + try { + consumer(entry); + } catch (Exception e) { + e.printStackTrace(); + logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); + // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 + // return; + } + } + connector.ack(message.getId()); // 鎻愪氦纭 + } catch (Exception exc) { + logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); + // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌 + exc.printStackTrace(); + } + } + + public DmlMessageTransponderContainer( + CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars, + CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { + this.connector = connector; + this.registrars.addAll(registrars); + this.endpointInstance = endpointInstance; + } + + private void consumer(CanalEntry.Entry entry) { + if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; + CanalEntry.RowChange rowChange = null; + try { + rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); + throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); + } + + // 蹇界暐 ddl 璇彞 + if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; + CanalEntry.EventType eventType = rowChange.getEventType(); + if (!support_all_types.contains(eventType)) return; + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + registrars + .stream() + .filter(CanalListenerEndpointRegistrar.filterArgs( + entry.getHeader().getSchemaName(), + entry.getHeader().getTableName(), + eventType)) + .forEach(element -> { + try { + element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); + } catch (IllegalAccessException | InvocationTargetException e) { + logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); + throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); + } + }); + } + } + +} -- Gitblit v1.8.0