From a8f947d2bc621051821e0cc57335aa6ca1776a8e Mon Sep 17 00:00:00 2001
From: duxinglangzi <871364441@qq.com>
Date: 星期四, 04 八月 2022 18:16:29 +0800
Subject: [PATCH] 修改启用方式为EnableCanalListener注解 修改重试次数瑕疵

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java    |   22 +++++++----
 src/main/resources/META-INF/spring.factories                                                  |   10 ++--
 src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java |    2 
 README.md                                                                                     |   29 ++++++++------
 src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java              |   21 ++++++++++
 5 files changed, 58 insertions(+), 26 deletions(-)

diff --git a/README.md b/README.md
index 30e716e..706c80c 100644
--- a/README.md
+++ b/README.md
@@ -22,13 +22,15 @@
 
 ```
 
-### 鍦╯pring boot 椤圭洰涓殑浠g爜浣跨敤瀹炰緥 
+### 鍦╯pring boot 椤圭洰涓殑浠g爜浣跨敤瀹炰緥 (娉ㄦ剰闇�瑕佷娇鐢� EnableCanalListener 娉ㄨВ寮�鍚� canal listener )
+
 ```java
 
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.duxinglangzi.canal.starter.annotation.CanalListener;
 import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
+import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
 import org.springframework.stereotype.Service;
 
 import java.util.stream.Collectors;
@@ -37,14 +39,17 @@
  * @author wuqiong 2022/4/12
  * @description
  */
+@EnableCanalListener
 @Service
 public class CanalListenerTest {
 
     /**
+     * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener 
+     * 
      * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalEntry.EventType , CanalEntry.RowData 
      * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌�
      */
-    
+
     /**
      * 鐩戞帶鏇存柊鎿嶄綔
      * 鏀寔鍔ㄦ�佸弬鏁伴厤缃紝閰嶇疆椤归渶鍦� yml 鎴� properties 杩涜閰嶇疆
@@ -52,7 +57,7 @@
      */
     @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
     public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksUsers",eventType, rowData);
+        printChange("listenerExampleBooksUsers", eventType, rowData);
     }
 
     /**
@@ -60,7 +65,7 @@
      */
     @CanalUpdateListener(destination = "example", database = "books", table = {"users"})
     public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksUsers",eventType, rowData);
+        printChange("listenerExampleBooksUsers", eventType, rowData);
     }
 
     /**
@@ -68,7 +73,7 @@
      */
     @CanalUpdateListener(destination = "example", database = "books", table = {"books"})
     public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksBooks",eventType, rowData);
+        printChange("listenerExampleBooksBooks", eventType, rowData);
     }
 
     /**
@@ -76,7 +81,7 @@
      */
     @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
     public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksAll",eventType, rowData);
+        printChange("listenerExampleBooksAll", eventType, rowData);
     }
 
     /**
@@ -84,7 +89,7 @@
      */
     @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
     public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleAll",eventType, rowData);
+        printChange("listenerExampleAll", eventType, rowData);
     }
 
     /**
@@ -92,19 +97,19 @@
      */
     @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
     public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerAllDml",eventType, rowData);
+        printChange("listenerAllDml", eventType, rowData);
     }
 
-    public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
+    public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
         if (eventType == CanalEntry.EventType.DELETE) {
             rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
-                System.out.println("[鏂规硶: "+method+" ,  delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue());
+                System.out.println("[鏂规硶: " + method + " ,  delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue());
             });
         }
 
         if (eventType == CanalEntry.EventType.INSERT) {
             rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
-                System.out.println("[鏂规硶: "+method+" ,insert 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鏂板鐨勫�间负: " + ele.getValue());
+                System.out.println("[鏂规硶: " + method + " ,insert 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鏂板鐨勫�间负: " + ele.getValue());
             });
         }
 
@@ -112,7 +117,7 @@
             for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
                 CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
                 CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
-                System.out.println("[鏂规硶: "+method+" , update 璇彞 ] -->> 瀛楁鍚�," + afterColumn.getName() +
+                System.out.println("[鏂规硶: " + method + " , update 璇彞 ] -->> 瀛楁鍚�," + afterColumn.getName() +
                         " , 鏄惁淇敼: " + afterColumn.getUpdated() +
                         " , 淇敼鍓嶇殑鍊�: " + beforeColumn.getValue() +
                         " , 淇敼鍚庣殑鍊�: " + afterColumn.getValue());
diff --git a/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java b/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java
new file mode 100644
index 0000000..aa8208e
--- /dev/null
+++ b/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java
@@ -0,0 +1,21 @@
+package com.duxinglangzi.canal.starter.annotation;
+
+import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
+import com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector;
+import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener;
+import org.springframework.context.annotation.Import;
+
+import java.lang.annotation.*;
+
+/**
+ * 寮�鍚� canal listener
+ *
+ * @author wuqiong 2022/8/4
+ */
+@Documented
+@Inherited
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class})
+public @interface EnableCanalListener {
+}
diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java
index 9cfe177..c0a5094 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java
@@ -36,7 +36,7 @@
                 sleep(5L * SLEEP_TIME_MILLI_SECONDS);
             initConnect();
             while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
-            disconnect(); // 绾跨▼琚粓姝㈡垨鑰呭鍣ㄥ凡缁忓仠姝�
+            disconnect(); // 绾跨▼琚粓姝㈡垨鑰呭鍣ㄥ凡缁忓仠姝�,闇�瑕佸叧闂繛鎺�
         }).start();
         setRunning(true);
     }
diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index 0374716..e25a8df 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -23,16 +23,19 @@
     private CanalConnector connector;
     private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
     private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
-    private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
+    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
+    private Integer local_retry_count;
 
 
     public void initConnect() {
         try {
             // init supportAllTypes
-            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
+            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
             connector.connect();
             connector.subscribe(endpointInstance.getSubscribe());
             connector.rollback();
+            // 鍒濆鍖栨湰鍦伴噸璇曟鏁�
+            local_retry_count = endpointInstance.getRetryCount();
         } catch (Exception e) {
             logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
             setRunning(false);
@@ -52,18 +55,21 @@
             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
         } catch (Exception clientException) {
             logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
-            endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
-            if (endpointInstance.getRetryCount() < 0) {
+            if (local_retry_count > 0) {
+                // 閲嶈瘯娆℃暟鍑忎竴
+                local_retry_count = local_retry_count - 1;
+                sleep(endpointInstance.getAcquireInterval());
+            } else {
+                // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼
                 logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                 "thread interrupt , current connector host: {} , port: {} ",
                         endpointInstance.getHost(), endpointInstance.getPort());
                 Thread.currentThread().interrupt();
-                disconnect();
-            } else {
-                sleep(endpointInstance.getAcquireInterval());
             }
             return;
         }
+        // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀�
+        if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount();
         List<CanalEntry.Entry> entries = message.getEntries();
         if (message.getId() == -1 || entries.isEmpty()) {
             sleep(endpointInstance.getAcquireInterval());
@@ -103,7 +109,7 @@
         // 蹇界暐 ddl 璇彞
         if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
         CanalEntry.EventType eventType = rowChange.getEventType();
-        if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
+        if (!support_all_types.contains(eventType)) return;
         for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
             registrars
                     .stream()
diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories
index 6ded0cf..5fb7ad5 100644
--- a/src/main/resources/META-INF/spring.factories
+++ b/src/main/resources/META-INF/spring.factories
@@ -1,9 +1,9 @@
 
 # Auto Configure
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\
-com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector
+# org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+# com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\
+# com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector
 
 # Application Listeners
-org.springframework.context.ApplicationListener=\
-com.duxinglangzi.canal.starter.listener.ApplicationReadyListener
+# org.springframework.context.ApplicationListener=\
+# com.duxinglangzi.canal.starter.listener.ApplicationReadyListener

--
Gitblit v1.8.0