From a8f947d2bc621051821e0cc57335aa6ca1776a8e Mon Sep 17 00:00:00 2001
From: duxinglangzi <871364441@qq.com>
Date: 星期四, 04 八月 2022 18:16:29 +0800
Subject: [PATCH] 修改启用方式为EnableCanalListener注解 修改重试次数瑕疵

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java |   22 ++++++++++++++--------
 1 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index 0374716..e25a8df 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -23,16 +23,19 @@
     private CanalConnector connector;
     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
-    private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
+    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
+    private Integer local_retry_count;
 
 
     public void initConnect() {
         try {
             // init supportAllTypes
-            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
+            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
             connector.connect();
             connector.subscribe(endpointInstance.getSubscribe());
             connector.rollback();
+            // 鍒濆鍖栨湰鍦伴噸璇曟鏁�
+            local_retry_count = endpointInstance.getRetryCount();
         } catch (Exception e) {
             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
             setRunning(false);
@@ -52,18 +55,21 @@
             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
         } catch (Exception clientException) {
             logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
-            endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
-            if (endpointInstance.getRetryCount() < 0) {
+            if (local_retry_count > 0) {
+                // 閲嶈瘯娆℃暟鍑忎竴
+                local_retry_count = local_retry_count - 1;
+                sleep(endpointInstance.getAcquireInterval());
+            } else {
+                // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼
                 logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                 "thread interrupt , current connector host: {} , port: {} ",
                         endpointInstance.getHost(), endpointInstance.getPort());
                 Thread.currentThread().interrupt();
-                disconnect();
-            } else {
-                sleep(endpointInstance.getAcquireInterval());
             }
             return;
         }
+        // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀�
+        if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount();
         List<CanalEntry.Entry> entries = message.getEntries();
         if (message.getId() == -1 || entries.isEmpty()) {
             sleep(endpointInstance.getAcquireInterval());
@@ -103,7 +109,7 @@
         // 蹇界暐 ddl 璇彞
         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
         CanalEntry.EventType eventType = rowChange.getEventType();
-        if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
+        if (!support_all_types.contains(eventType)) return;
         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
             registrars
                     .stream()

--
Gitblit v1.8.0