From c83c1b4ee7ff9c01a7a67855863c281589f39c72 Mon Sep 17 00:00:00 2001
From: duxinglangzi <871364441@qq.com>
Date: 星期一, 15 八月 2022 19:58:57 +0800
Subject: [PATCH] 修改了listener方法的参数

---
 src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java     |    3 
 src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java                            |   76 +++++++++++++++++++++++++
 src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java |   13 ++--
 README.md                                                                                      |   50 ++++++++++------
 4 files changed, 114 insertions(+), 28 deletions(-)

diff --git a/README.md b/README.md
index 706c80c..2877874 100644
--- a/README.md
+++ b/README.md
@@ -16,9 +16,9 @@
         zookeeper-address:      # zookeeper 鍦板潃(寮�鍚泦缇ょ殑鎯呭喌涓嬬敓鏁�), 渚�: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
         acquire-interval: 1000  # 鏈媺鍙栧埌娑堟伅鎯呭喌涓�,鑾峰彇娑堟伅鐨勬椂闂撮棿闅旀绉掑��
         subscribe: .*\\..*      # 榛樿鎯呭喌涓嬫媺鍙栨墍鏈夊簱銆佹墍鏈夎〃
-  prod:
-    example: example1
-    database: books
+prod:
+  example: example
+  database: books
 
 ```
 
@@ -28,9 +28,11 @@
 
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.duxinglangzi.canal.starter.annotation.CanalInsertListener;
 import com.duxinglangzi.canal.starter.annotation.CanalListener;
 import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
 import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
+import com.duxinglangzi.canal.starter.mode.CanalMessage;
 import org.springframework.stereotype.Service;
 
 import java.util.stream.Collectors;
@@ -44,9 +46,9 @@
 public class CanalListenerTest {
 
     /**
-     * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener 
-     * 
-     * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalEntry.EventType , CanalEntry.RowData 
+     * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener
+     *
+     * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 com.duxinglangzi.canal.starter.mode.CanalMessage
      * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌�
      */
 
@@ -56,51 +58,59 @@
      * 鐩爣鏄� ${prod.example} 鐨�  ${prod.database} 搴�  users琛�
      */
     @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
-    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksUsers", eventType, rowData);
+    public void listenerExampleBooksUsers(CanalMessage message) {
+        printChange("listenerExampleBooksUsers", message);
     }
 
     /**
      * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨�  books搴�  users琛�
      */
-    @CanalUpdateListener(destination = "example", database = "books", table = {"users"})
-    public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksUsers", eventType, rowData);
+    @CanalInsertListener(destination = "example", database = "books", table = {"users"})
+    public void listenerExampleBooksUser(CanalMessage message) {
+        printChange("listenerExampleBooksUsers", message);
     }
 
     /**
      * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨�  books搴�  books琛�
      */
     @CanalUpdateListener(destination = "example", database = "books", table = {"books"})
-    public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksBooks", eventType, rowData);
+    public void listenerExampleBooksBooks(CanalMessage message) {
+        printChange("listenerExampleBooksBooks", message);
     }
 
     /**
      * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨�  books搴撶殑鎵�鏈夎〃
      */
     @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
-    public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleBooksAll", eventType, rowData);
+    public void listenerExampleBooksAll(CanalMessage message) {
+        printChange("listenerExampleBooksAll", message);
     }
 
     /**
      * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨�  鎵�鏈夊簱鐨勬墍鏈夎〃
      */
     @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
-    public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerExampleAll", eventType, rowData);
+    public void listenerExampleAll(CanalMessage message) {
+        printChange("listenerExampleAll", message);
     }
 
     /**
      * 鐩戞帶鏇存柊銆佸垹闄ゃ�佹柊澧炴搷浣� 锛屾墍鏈夐厤缃殑鐩爣涓嬬殑鎵�鏈夊簱鐨勬墍鏈夎〃
      */
     @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
-    public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
-        printChange("listenerAllDml", eventType, rowData);
+    public void listenerAllDml(CanalMessage message) {
+        printChange("listenerAllDml", message);
     }
 
-    public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
+    public void printChange(String method, CanalMessage message) {
+        CanalEntry.EventType eventType = message.getEventType();
+        CanalEntry.RowData rowData = message.getRowData();
+
+
+        System.out.println(" >>>>>>>>>>>>>[褰撳墠鏁版嵁搴�: "+message.getDataBaseName()+" ," +
+                "鏁版嵁搴撹〃鍚�: " + message.getTableName() + " , " +
+                "鏂规硶: " + method );
+
         if (eventType == CanalEntry.EventType.DELETE) {
             rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
                 System.out.println("[鏂规硶: " + method + " ,  delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue());
diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
index c9e0c1d..ad778bc 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java
@@ -2,6 +2,7 @@
 
 import com.alibaba.otter.canal.protocol.CanalEntry;
 import com.alibaba.otter.canal.protocol.exception.CanalClientException;
+import com.duxinglangzi.canal.starter.mode.CanalMessage;
 import org.apache.commons.lang3.StringUtils;
 
 import java.lang.reflect.Method;
@@ -12,7 +13,7 @@
 import java.util.stream.Collectors;
 
 /**
- * 鐧昏鍛�
+ * 鐩戝惉鐨勭粓绔敞鍐屽櫒
  *
  * @author wuqiong 2022/4/11
  */
@@ -42,8 +43,8 @@
     private CanalEntry.EventType[] eventType;
 
     /**
-     * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔涓や釜鍙傛暟 <p>
-     * 2銆佷笖椤哄簭蹇呴』涓�: CanalEntry.EventType 銆� CanalEntry.RowData  <p>
+     * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔1涓弬鏁�, 璇ュ弬鏁板璞″唴鍖呭惈浜�: 搴撳悕銆佽〃鍚嶃�佷簨浠剁被鍨嬨�佸彉鏇寸殑鏁版嵁 <p>
+     * 2銆佹柟娉曞弬鏁板繀椤讳负: CanalMessage  <p>
      * 3銆佸鏋淐analListener 鎸囧畾鐨� destination 涓嶅湪閰嶇疆鏂囦欢鍐咃紝鍒欑洿鎺ユ姏閿� <p>
      *
      * @param sets
@@ -52,11 +53,9 @@
      */
     public void checkParameter(Set<String> sets) {
         List<Class<?>> classes = parameterTypes();
-        if (classes.size() > 2
-                || classes.get(1) != CanalEntry.RowData.class
-                || classes.get(0) != CanalEntry.EventType.class)
+        if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
             throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
-                    "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check ");
+                    "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
         if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
             throw new CanalClientException("@CanalListener Illegal destination , please check ");
 
diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
index e25a8df..7aad7f7 100644
--- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
+++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -5,6 +5,7 @@
 import com.alibaba.otter.canal.protocol.Message;
 import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
 import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
+import com.duxinglangzi.canal.starter.mode.CanalMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -119,7 +120,7 @@
                             eventType))
                     .forEach(element -> {
                         try {
-                            element.getMethod().invoke(element.getBean(), eventType, rowData);
+                            element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
                         } catch (IllegalAccessException | InvocationTargetException e) {
                             logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                             throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e);
diff --git a/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java b/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java
new file mode 100644
index 0000000..13bba23
--- /dev/null
+++ b/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java
@@ -0,0 +1,76 @@
+package com.duxinglangzi.canal.starter.mode;
+
+import com.alibaba.otter.canal.protocol.CanalEntry;
+
+import java.io.Serializable;
+
+/**
+ * 鐩戝惉浜嬩欢鐨� 杩斿洖淇℃伅, 涓昏灏� CanalEntry 閲屼笉鍚屽眰绾х殑甯哥敤鏁版嵁缁勮鍒板悓涓�灞傜骇浣跨敤.<p>
+ * headers 鏄负浜嗘柟渚胯幏鍙栧叾浠栨暟鎹紝姣斿
+ *
+ * @author wuqiong 2022/8/15
+ */
+public class CanalMessage implements Serializable {
+    private static final long serialVersionUID = 730485362580815032L;
+
+    /**
+     * 鏁版嵁搴撳悕
+     */
+    private String dataBaseName;
+
+    /**
+     * 琛ㄥ悕
+     */
+    private String tableName;
+
+    /**
+     * 鍙戠敓鍙樺寲鐨� 浜嬩欢绫诲瀷
+     */
+    private CanalEntry.EventType eventType;
+
+    /**
+     * 鍙戠敓鍙樺寲鐨勬暟鎹�
+     */
+    private CanalEntry.RowData rowData;
+
+    /**
+     * 澶翠俊鎭紝 鍖呭惈: sql鎵ц鏃堕棿銆佹暟鎹簱鏃ュ織鏂囦欢鍚嶃�佹暟鎹簱鏃ュ織鏂囦欢鍋忕Щ閲� 绛変俊鎭�
+     */
+    private CanalEntry.Header entryHeader;
+
+    /**
+     * 鏋勯�犺繑鍥炴暟鎹俊鎭�
+     *
+     * @param entryHeader 澶翠俊鎭�
+     * @param eventType   浜嬩欢绫诲瀷
+     * @param rowData     鍙樺寲鐨勬暟鎹�
+     * @author wuqiong 2022/8/15 16:18
+     */
+    public CanalMessage(CanalEntry.Header entryHeader, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
+        this.entryHeader = entryHeader;
+        this.eventType = eventType;
+        this.rowData = rowData;
+        this.dataBaseName = getEntryHeader().getSchemaName();
+        this.tableName = getEntryHeader().getTableName();
+    }
+
+    public String getDataBaseName() {
+        return dataBaseName;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public CanalEntry.EventType getEventType() {
+        return eventType;
+    }
+
+    public CanalEntry.RowData getRowData() {
+        return rowData;
+    }
+
+    public CanalEntry.Header getEntryHeader() {
+        return entryHeader;
+    }
+}

--
Gitblit v1.8.0