From 5ac2a77c78106bd3786e6579b26962bc4587dc23 Mon Sep 17 00:00:00 2001
From: 独行的浪子 <871364441@qq.com>
Date: 星期二, 26 四月 2022 18:43:12 +0800
Subject: [PATCH] 修改连接不上的情况

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java |   49 +++++++++++++++++++++++++++++++------------------
 1 files changed, 31 insertions(+), 18 deletions(-)

diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index dd426aa..295ece6 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -12,8 +12,8 @@
 import java.util.*;
 
 /**
+ * DML 鏁版嵁鎷夊彇銆佽В鏋�
  * @author wuqiong 2022/4/11
- * @description
  */
 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
 
@@ -26,13 +26,23 @@
 
 
     public void initConnect() {
-        // init supportAllTypes
-        registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
-                Arrays.asList(e.getListenerEntry().getValue().eventType())));
-        connector.connect();
-        connector.subscribe(endpointInstance.getSubscribe());
-        connector.rollback();
+        try {
+            // init supportAllTypes
+            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
+                    Arrays.asList(e.getListenerEntry().getValue().eventType())));
+            connector.connect();
+            connector.subscribe(endpointInstance.getSubscribe());
+            connector.rollback();
+        } catch (Exception e) {
+            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
+            setRunning(false);
+        }
 
+    }
+
+    public void disconnect(){
+        // 鍏抽棴杩炴帴
+        connector.disconnect();
     }
 
 
@@ -41,13 +51,14 @@
         try {
             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
         } catch (Exception clientException) {
-            logger.error("[MessageTransponderContainer] error msg : ", clientException);
+            logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
             endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
             if (endpointInstance.getRetryCount() < 0) {
-                logger.error("[MessageTransponderContainer] retry count is zero ,  " +
+                logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                 "thread interrupt , current connector host: {} , port: {} ",
                         endpointInstance.getHost(), endpointInstance.getPort());
                 Thread.currentThread().interrupt();
+                disconnect();
             } else {
                 sleep(endpointInstance.getAcquireInterval());
             }
@@ -58,13 +69,15 @@
             sleep(endpointInstance.getAcquireInterval());
             return;
         }
-        try {
-            entries.forEach(e -> consumer(e));
-        } catch (Exception e) {
-            e.printStackTrace();
-            logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
-            // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
-            // return;
+        for (CanalEntry.Entry entry : entries) {
+            try {
+                consumer(entry);
+            } catch (Exception e) {
+                e.printStackTrace();
+                logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
+                // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
+                // return;
+            }
         }
         connector.ack(message.getId()); // 鎻愪氦纭
     }
@@ -83,7 +96,7 @@
         try {
             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
         } catch (Exception e) {
-            logger.error("[MessageTransponderContainer_consumer] RowChange parse has an error ", e);
+            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
         }
 
@@ -102,7 +115,7 @@
                         try {
                             element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
                         } catch (IllegalAccessException | InvocationTargetException e) {
-                            logger.error("[MessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
+                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                             throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e);
                         }
                     });

--
Gitblit v1.8.0