From c83c1b4ee7ff9c01a7a67855863c281589f39c72 Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期一, 15 八月 2022 19:58:57 +0800 Subject: [PATCH] 修改了listener方法的参数 --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 3 src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java | 76 +++++++++++++++++++++++++ src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java | 13 ++-- README.md | 50 ++++++++++------ 4 files changed, 114 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 706c80c..2877874 100644 --- a/README.md +++ b/README.md @@ -16,9 +16,9 @@ zookeeper-address: # zookeeper 鍦板潃(寮�鍚泦缇ょ殑鎯呭喌涓嬬敓鏁�), 渚�: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 acquire-interval: 1000 # 鏈媺鍙栧埌娑堟伅鎯呭喌涓�,鑾峰彇娑堟伅鐨勬椂闂撮棿闅旀绉掑�� subscribe: .*\\..* # 榛樿鎯呭喌涓嬫媺鍙栨墍鏈夊簱銆佹墍鏈夎〃 - prod: - example: example1 - database: books +prod: + example: example + database: books ``` @@ -28,9 +28,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry; +import com.duxinglangzi.canal.starter.annotation.CanalInsertListener; import com.duxinglangzi.canal.starter.annotation.CanalListener; import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener; import com.duxinglangzi.canal.starter.annotation.EnableCanalListener; +import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.springframework.stereotype.Service; import java.util.stream.Collectors; @@ -44,9 +46,9 @@ public class CanalListenerTest { /** - * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener - * - * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalEntry.EventType , CanalEntry.RowData + * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener + * + * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 com.duxinglangzi.canal.starter.mode.CanalMessage * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌� */ @@ -56,51 +58,59 @@ * 鐩爣鏄� ${prod.example} 鐨� ${prod.database} 搴� users琛� */ @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"}) - public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksUsers", eventType, rowData); + public void listenerExampleBooksUsers(CanalMessage message) { + printChange("listenerExampleBooksUsers", message); } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� books搴� users琛� */ - @CanalUpdateListener(destination = "example", database = "books", table = {"users"}) - public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksUsers", eventType, rowData); + @CanalInsertListener(destination = "example", database = "books", table = {"users"}) + public void listenerExampleBooksUser(CanalMessage message) { + printChange("listenerExampleBooksUsers", message); } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� books搴� books琛� */ @CanalUpdateListener(destination = "example", database = "books", table = {"books"}) - public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksBooks", eventType, rowData); + public void listenerExampleBooksBooks(CanalMessage message) { + printChange("listenerExampleBooksBooks", message); } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� books搴撶殑鎵�鏈夎〃 */ @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE) - public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksAll", eventType, rowData); + public void listenerExampleBooksAll(CanalMessage message) { + printChange("listenerExampleBooksAll", message); } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� 鎵�鏈夊簱鐨勬墍鏈夎〃 */ @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE) - public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleAll", eventType, rowData); + public void listenerExampleAll(CanalMessage message) { + printChange("listenerExampleAll", message); } /** * 鐩戞帶鏇存柊銆佸垹闄ゃ�佹柊澧炴搷浣� 锛屾墍鏈夐厤缃殑鐩爣涓嬬殑鎵�鏈夊簱鐨勬墍鏈夎〃 */ @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) - public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerAllDml", eventType, rowData); + public void listenerAllDml(CanalMessage message) { + printChange("listenerAllDml", message); } - public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + public void printChange(String method, CanalMessage message) { + CanalEntry.EventType eventType = message.getEventType(); + CanalEntry.RowData rowData = message.getRowData(); + + + System.out.println(" >>>>>>>>>>>>>[褰撳墠鏁版嵁搴�: "+message.getDataBaseName()+" ," + + "鏁版嵁搴撹〃鍚�: " + message.getTableName() + " , " + + "鏂规硶: " + method ); + if (eventType == CanalEntry.EventType.DELETE) { rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { System.out.println("[鏂规硶: " + method + " , delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue()); diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java index c9e0c1d..ad778bc 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java +++ b/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java @@ -2,6 +2,7 @@ import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; @@ -12,7 +13,7 @@ import java.util.stream.Collectors; /** - * 鐧昏鍛� + * 鐩戝惉鐨勭粓绔敞鍐屽櫒 * * @author wuqiong 2022/4/11 */ @@ -42,8 +43,8 @@ private CanalEntry.EventType[] eventType; /** - * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔涓や釜鍙傛暟 <p> - * 2銆佷笖椤哄簭蹇呴』涓�: CanalEntry.EventType 銆� CanalEntry.RowData <p> + * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔1涓弬鏁�, 璇ュ弬鏁板璞″唴鍖呭惈浜�: 搴撳悕銆佽〃鍚嶃�佷簨浠剁被鍨嬨�佸彉鏇寸殑鏁版嵁 <p> + * 2銆佹柟娉曞弬鏁板繀椤讳负: CanalMessage <p> * 3銆佸鏋淐analListener 鎸囧畾鐨� destination 涓嶅湪閰嶇疆鏂囦欢鍐咃紝鍒欑洿鎺ユ姏閿� <p> * * @param sets @@ -52,11 +53,9 @@ */ public void checkParameter(Set<String> sets) { List<Class<?>> classes = parameterTypes(); - if (classes.size() > 2 - || classes.get(1) != CanalEntry.RowData.class - || classes.get(0) != CanalEntry.EventType.class) + if (classes.size() != 1 || classes.get(0) != CanalMessage.class) throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + - "Need Parameter Type [CanalEntry.EventType,CanalEntry.RowData] please check "); + "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check "); if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) throw new CanalClientException("@CanalListener Illegal destination , please check "); diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index e25a8df..7aad7f7 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -5,6 +5,7 @@ import com.alibaba.otter.canal.protocol.Message; import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar; +import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,7 +120,7 @@ eventType)) .forEach(element -> { try { - element.getMethod().invoke(element.getBean(), eventType, rowData); + element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); } catch (IllegalAccessException | InvocationTargetException e) { logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); diff --git a/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java b/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java new file mode 100644 index 0000000..13bba23 --- /dev/null +++ b/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java @@ -0,0 +1,76 @@ +package com.duxinglangzi.canal.starter.mode; + +import com.alibaba.otter.canal.protocol.CanalEntry; + +import java.io.Serializable; + +/** + * 鐩戝惉浜嬩欢鐨� 杩斿洖淇℃伅, 涓昏灏� CanalEntry 閲屼笉鍚屽眰绾х殑甯哥敤鏁版嵁缁勮鍒板悓涓�灞傜骇浣跨敤.<p> + * headers 鏄负浜嗘柟渚胯幏鍙栧叾浠栨暟鎹紝姣斿 + * + * @author wuqiong 2022/8/15 + */ +public class CanalMessage implements Serializable { + private static final long serialVersionUID = 730485362580815032L; + + /** + * 鏁版嵁搴撳悕 + */ + private String dataBaseName; + + /** + * 琛ㄥ悕 + */ + private String tableName; + + /** + * 鍙戠敓鍙樺寲鐨� 浜嬩欢绫诲瀷 + */ + private CanalEntry.EventType eventType; + + /** + * 鍙戠敓鍙樺寲鐨勬暟鎹� + */ + private CanalEntry.RowData rowData; + + /** + * 澶翠俊鎭紝 鍖呭惈: sql鎵ц鏃堕棿銆佹暟鎹簱鏃ュ織鏂囦欢鍚嶃�佹暟鎹簱鏃ュ織鏂囦欢鍋忕Щ閲� 绛変俊鎭� + */ + private CanalEntry.Header entryHeader; + + /** + * 鏋勯�犺繑鍥炴暟鎹俊鎭� + * + * @param entryHeader 澶翠俊鎭� + * @param eventType 浜嬩欢绫诲瀷 + * @param rowData 鍙樺寲鐨勬暟鎹� + * @author wuqiong 2022/8/15 16:18 + */ + public CanalMessage(CanalEntry.Header entryHeader, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + this.entryHeader = entryHeader; + this.eventType = eventType; + this.rowData = rowData; + this.dataBaseName = getEntryHeader().getSchemaName(); + this.tableName = getEntryHeader().getTableName(); + } + + public String getDataBaseName() { + return dataBaseName; + } + + public String getTableName() { + return tableName; + } + + public CanalEntry.EventType getEventType() { + return eventType; + } + + public CanalEntry.RowData getRowData() { + return rowData; + } + + public CanalEntry.Header getEntryHeader() { + return entryHeader; + } +} -- Gitblit v1.8.0