From ddae546da9487622631d47133c962e7d870982de Mon Sep 17 00:00:00 2001
From: duxinglangzi <871364441@qq.com>
Date: 星期二, 06 九月 2022 15:13:54 +0800
Subject: [PATCH] 增加错误消息日志打印

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java |   84 ++++++++++++++++++++++++-----------------
 1 files changed, 49 insertions(+), 35 deletions(-)

diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index 295ece6..6865b86 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -5,6 +5,7 @@
 import com.alibaba.otter.canal.protocol.Message;
 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
+import com.duxinglangzi.canal.starter.mode.CanalMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -13,6 +14,7 @@
 
 /**
  * DML 鏁版嵁鎷夊彇銆佽В鏋�
+ *
  * @author wuqiong 2022/4/11
  */
 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
@@ -22,17 +24,19 @@
     private CanalConnector connector;
     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
-    private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
+    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
+    private Integer local_retry_count;
 
 
     public void initConnect() {
         try {
             // init supportAllTypes
-            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
-                    Arrays.asList(e.getListenerEntry().getValue().eventType())));
+            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
             connector.connect();
             connector.subscribe(endpointInstance.getSubscribe());
             connector.rollback();
+            // 鍒濆鍖栨湰鍦伴噸璇曟鏁�
+            local_retry_count = endpointInstance.getRetryCount();
         } catch (Exception e) {
             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
             setRunning(false);
@@ -40,46 +44,56 @@
 
     }
 
-    public void disconnect(){
+    public void disconnect() {
         // 鍏抽棴杩炴帴
         connector.disconnect();
     }
 
 
     public void doStart() {
-        Message message = null;
         try {
-            message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
-        } catch (Exception clientException) {
-            logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
-            endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
-            if (endpointInstance.getRetryCount() < 0) {
-                logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
-                                "thread interrupt , current connector host: {} , port: {} ",
-                        endpointInstance.getHost(), endpointInstance.getPort());
-                Thread.currentThread().interrupt();
-                disconnect();
-            } else {
-                sleep(endpointInstance.getAcquireInterval());
-            }
-            return;
-        }
-        List<CanalEntry.Entry> entries = message.getEntries();
-        if (message.getId() == -1 || entries.isEmpty()) {
-            sleep(endpointInstance.getAcquireInterval());
-            return;
-        }
-        for (CanalEntry.Entry entry : entries) {
+            Message message = null;
             try {
-                consumer(entry);
-            } catch (Exception e) {
-                e.printStackTrace();
-                logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
-                // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
-                // return;
+                message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
+            } catch (Exception clientException) {
+                logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
+                if (local_retry_count > 0) {
+                    // 閲嶈瘯娆℃暟鍑忎竴
+                    local_retry_count = local_retry_count - 1;
+                    sleep(endpointInstance.getAcquireInterval());
+                } else {
+                    // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼
+                    logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
+                                    "thread interrupt , current connector host: {} , port: {} ",
+                            endpointInstance.getHost(), endpointInstance.getPort());
+                    Thread.currentThread().interrupt();
+                }
+                return;
             }
+            // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀�
+            if (local_retry_count < endpointInstance.getRetryCount())
+                local_retry_count = endpointInstance.getRetryCount();
+            List<CanalEntry.Entry> entries = message.getEntries();
+            if (message.getId() == -1 || entries.isEmpty()) {
+                sleep(endpointInstance.getAcquireInterval());
+                return;
+            }
+            for (CanalEntry.Entry entry : entries) {
+                try {
+                    consumer(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
+                    // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
+                    // return;
+                }
+            }
+            connector.ack(message.getId()); // 鎻愪氦纭
+        } catch (Exception exc) {
+            logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
+            // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌
+            exc.printStackTrace();
         }
-        connector.ack(message.getId()); // 鎻愪氦纭
     }
 
     public DmlMessageTransponderContainer(
@@ -103,7 +117,7 @@
         // 蹇界暐 ddl 璇彞
         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
         CanalEntry.EventType eventType = rowChange.getEventType();
-        if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
+        if (!support_all_types.contains(eventType)) return;
         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
             registrars
                     .stream()
@@ -113,7 +127,7 @@
                             eventType))
                     .forEach(element -> {
                         try {
-                            element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
+                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
                         } catch (IllegalAccessException | InvocationTargetException e) {
                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                             throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e);

--
Gitblit v1.8.0