From 3ad94b9abcbde117c6d6ce43a9d023b7755ec0eb Mon Sep 17 00:00:00 2001
From: 独行的浪子 <871364441@qq.com>
Date: 星期二, 26 四月 2022 17:05:11 +0800
Subject: [PATCH] Update DmlMessageTransponderContainer.java

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java |   23 +++++++++++++++--------
 1 files changed, 15 insertions(+), 8 deletions(-)

diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index ff89712..9aadc35 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -35,6 +35,11 @@
 
     }
 
+    public void disconnect(){
+        // 鍏抽棴杩炴帴
+        connector.disconnect();
+    }
+
 
     public void doStart() {
         Message message = null;
@@ -48,7 +53,7 @@
                                 "thread interrupt , current connector host: {} , port: {} ",
                         endpointInstance.getHost(), endpointInstance.getPort());
                 Thread.currentThread().interrupt();
-                connector.disconnect();
+                disconnect();
             } else {
                 sleep(endpointInstance.getAcquireInterval());
             }
@@ -59,13 +64,15 @@
             sleep(endpointInstance.getAcquireInterval());
             return;
         }
-        try {
-            entries.forEach(e -> consumer(e));
-        } catch (Exception e) {
-            e.printStackTrace();
-            logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
-            // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
-            // return;
+        for (CanalEntry.Entry entry : entries) {
+            try {
+                consumer(entry);
+            } catch (Exception e) {
+                e.printStackTrace();
+                logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
+                // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
+                // return;
+            }
         }
         connector.ack(message.getId()); // 鎻愪氦纭
     }

--
Gitblit v1.8.0