From a87aa74a3af27960276ed02f4273386d25d2a231 Mon Sep 17 00:00:00 2001
From: duxinglangzi <871364441@qq.com>
Date: 星期五, 24 六月 2022 15:41:11 +0800
Subject: [PATCH] 支持动态参数配置

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java |   31 ++++++++++++++++++-------------
 1 files changed, 18 insertions(+), 13 deletions(-)

diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index 9aadc35..0374716 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -13,6 +13,7 @@
 
 /**
  * DML 鏁版嵁鎷夊彇銆佽В鏋�
+ *
  * @author wuqiong 2022/4/11
  */
 public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
@@ -26,16 +27,20 @@
 
 
     public void initConnect() {
-        // init supportAllTypes
-        registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
-                Arrays.asList(e.getListenerEntry().getValue().eventType())));
-        connector.connect();
-        connector.subscribe(endpointInstance.getSubscribe());
-        connector.rollback();
+        try {
+            // init supportAllTypes
+            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
+            connector.connect();
+            connector.subscribe(endpointInstance.getSubscribe());
+            connector.rollback();
+        } catch (Exception e) {
+            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
+            setRunning(false);
+        }
 
     }
 
-    public void disconnect(){
+    public void disconnect() {
         // 鍏抽棴杩炴帴
         connector.disconnect();
     }
@@ -46,10 +51,10 @@
         try {
             message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹�
         } catch (Exception clientException) {
-            logger.error("[MessageTransponderContainer] error msg : ", clientException);
+            logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
             endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
             if (endpointInstance.getRetryCount() < 0) {
-                logger.error("[MessageTransponderContainer] retry count is zero ,  " +
+                logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                 "thread interrupt , current connector host: {} , port: {} ",
                         endpointInstance.getHost(), endpointInstance.getPort());
                 Thread.currentThread().interrupt();
@@ -69,7 +74,7 @@
                 consumer(entry);
             } catch (Exception e) {
                 e.printStackTrace();
-                logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
+                logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
                 // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁
                 // return;
             }
@@ -91,7 +96,7 @@
         try {
             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
         } catch (Exception e) {
-            logger.error("[MessageTransponderContainer_consumer] RowChange parse has an error ", e);
+            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
             throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
         }
 
@@ -108,9 +113,9 @@
                             eventType))
                     .forEach(element -> {
                         try {
-                            element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
+                            element.getMethod().invoke(element.getBean(), eventType, rowData);
                         } catch (IllegalAccessException | InvocationTargetException e) {
-                            logger.error("[MessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
+                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                             throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e);
                         }
                     });

--
Gitblit v1.8.0