From 0b055a3f554da3a934e79e88c4781705cbab5a21 Mon Sep 17 00:00:00 2001
From: renxue <auster_i@163.com>
Date: 星期一, 24 十月 2022 11:40:26 +0800
Subject: [PATCH] 修改包下的类名

---
 src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java |  278 +++++++++++++++++++++++++++---------------------------
 1 files changed, 139 insertions(+), 139 deletions(-)

diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java
similarity index 95%
rename from src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
rename to src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java
index 6865b86..480c158 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java
@@ -1,139 +1,139 @@
-package com.duxinglangzi.canal.starter.container;
-
-import com.alibaba.otter.canal.client.CanalConnector;
-import com.alibaba.otter.canal.protocol.CanalEntry;
-import com.alibaba.otter.canal.protocol.Message;
-import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
-import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
-import com.duxinglangzi.canal.starter.mode.CanalMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.util.*;
-
-/**
- * DML 鏁版嵁鎷夊彇銆佽В鏋�
- *
- * @author wuqiong 2022/4/11
- */
-public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
-
-    private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
-
-    private CanalConnector connector;
-    private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
-    private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
-    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
-    private Integer local_retry_count;
-
-
-    public void initConnect() {
-        try {
-            // init supportAllTypes
-            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
-            connector.connect();
-            connector.subscribe(endpointInstance.getSubscribe());
-            connector.rollback();
-            // 鍒濆鍖栨湰鍦伴噸璇曟鏁�
-            local_retry_count = endpointInstance.getRetryCount();
-        } catch (Exception e) {
-            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
-            setRunning(false);
-        }
-
-    }
-
-    public void disconnect() {
-        // 鍏抽棴杩炴帴
-        connector.disconnect();
-    }
-
-
-    public void doStart() {
-        try {
-            Message message = null;
-            try {
-                message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
-            } catch (Exception clientException) {
-                logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
-                if (local_retry_count > 0) {
-                    // 閲嶈瘯娆℃暟鍑忎竴
-                    local_retry_count = local_retry_count - 1;
-                    sleep(endpointInstance.getAcquireInterval());
-                } else {
-                    // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼
-                    logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
-                                    "thread interrupt , current connector host: {} , port: {} ",
-                            endpointInstance.getHost(), endpointInstance.getPort());
-                    Thread.currentThread().interrupt();
-                }
-                return;
-            }
-            // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀�
-            if (local_retry_count < endpointInstance.getRetryCount())
-                local_retry_count = endpointInstance.getRetryCount();
-            List<CanalEntry.Entry> entries = message.getEntries();
-            if (message.getId() == -1 || entries.isEmpty()) {
-                sleep(endpointInstance.getAcquireInterval());
-                return;
-            }
-            for (CanalEntry.Entry entry : entries) {
-                try {
-                    consumer(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
-                    // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
-                    // return;
-                }
-            }
-            connector.ack(message.getId()); // 鎻愪氦纭
-        } catch (Exception exc) {
-            logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
-            // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌
-            exc.printStackTrace();
-        }
-    }
-
-    public DmlMessageTransponderContainer(
-            CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
-            CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
-        this.connector = connector;
-        this.registrars.addAll(registrars);
-        this.endpointInstance = endpointInstance;
-    }
-
-    private void consumer(CanalEntry.Entry entry) {
-        if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
-        CanalEntry.RowChange rowChange = null;
-        try {
-            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
-        } catch (Exception e) {
-            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
-            throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
-        }
-
-        // 蹇界暐 ddl 璇彞
-        if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
-        CanalEntry.EventType eventType = rowChange.getEventType();
-        if (!support_all_types.contains(eventType)) return;
-        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
-            registrars
-                    .stream()
-                    .filter(CanalListenerEndpointRegistrar.filterArgs(
-                            entry.getHeader().getSchemaName(),
-                            entry.getHeader().getTableName(),
-                            eventType))
-                    .forEach(element -> {
-                        try {
-                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
-                        } catch (IllegalAccessException | InvocationTargetException e) {
-                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
-                            throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e);
-                        }
-                    });
-        }
-    }
-
-}
+package com.hz.canal.starter.container;
+
+import com.alibaba.otter.canal.client.CanalConnector;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.alibaba.otter.canal.protocol.Message;
+import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties;
+import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar;
+import com.hz.canal.starter.mode.CanalMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
+/**
+ * DML 鏁版嵁鎷夊彇銆佽В鏋�
+ *
+ * @author wuqiong 2022/4/11
+ */
+public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
+
+    private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
+
+    private CanalConnector connector;
+    private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
+    private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
+    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
+    private Integer local_retry_count;
+
+
+    public void initConnect() {
+        try {
+            // init supportAllTypes
+            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
+            connector.connect();
+            connector.subscribe(endpointInstance.getSubscribe());
+            connector.rollback();
+            // 鍒濆鍖栨湰鍦伴噸璇曟鏁�
+            local_retry_count = endpointInstance.getRetryCount();
+        } catch (Exception e) {
+            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
+            setRunning(false);
+        }
+
+    }
+
+    public void disconnect() {
+        // 鍏抽棴杩炴帴
+        connector.disconnect();
+    }
+
+
+    public void doStart() {
+        try {
+            Message message = null;
+            try {
+                message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
+            } catch (Exception clientException) {
+                logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
+                if (local_retry_count > 0) {
+                    // 閲嶈瘯娆℃暟鍑忎竴
+                    local_retry_count = local_retry_count - 1;
+                    sleep(endpointInstance.getAcquireInterval());
+                } else {
+                    // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼
+                    logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
+                                    "thread interrupt , current connector host: {} , port: {} ",
+                            endpointInstance.getHost(), endpointInstance.getPort());
+                    Thread.currentThread().interrupt();
+                }
+                return;
+            }
+            // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀�
+            if (local_retry_count < endpointInstance.getRetryCount())
+                local_retry_count = endpointInstance.getRetryCount();
+            List<CanalEntry.Entry> entries = message.getEntries();
+            if (message.getId() == -1 || entries.isEmpty()) {
+                sleep(endpointInstance.getAcquireInterval());
+                return;
+            }
+            for (CanalEntry.Entry entry : entries) {
+                try {
+                    consumer(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
+                    // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
+                    // return;
+                }
+            }
+            connector.ack(message.getId()); // 鎻愪氦纭
+        } catch (Exception exc) {
+            logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
+            // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌
+            exc.printStackTrace();
+        }
+    }
+
+    public DmlMessageTransponderContainer(
+            CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
+            CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
+        this.connector = connector;
+        this.registrars.addAll(registrars);
+        this.endpointInstance = endpointInstance;
+    }
+
+    private void consumer(CanalEntry.Entry entry) {
+        if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
+        CanalEntry.RowChange rowChange = null;
+        try {
+            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+        } catch (Exception e) {
+            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
+            throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
+        }
+
+        // 蹇界暐 ddl 璇彞
+        if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
+        CanalEntry.EventType eventType = rowChange.getEventType();
+        if (!support_all_types.contains(eventType)) return;
+        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
+            registrars
+                    .stream()
+                    .filter(CanalListenerEndpointRegistrar.filterArgs(
+                            entry.getHeader().getSchemaName(),
+                            entry.getHeader().getTableName(),
+                            eventType))
+                    .forEach(element -> {
+                        try {
+                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
+                        } catch (IllegalAccessException | InvocationTargetException e) {
+                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
+                            throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e);
+                        }
+                    });
+        }
+    }
+
+}

--
Gitblit v1.8.0