From b9f78f6463160cb30f6cc276531c3ecc4c7834ee Mon Sep 17 00:00:00 2001 From: guang <guang@guang.com> Date: 星期六, 15 六月 2024 10:36:48 +0800 Subject: [PATCH] 切加颜分支 --- README.md | 68 ++++++++++++++++++++++++---------- 1 files changed, 48 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 866e73b..94b83cb 100644 --- a/README.md +++ b/README.md @@ -16,16 +16,23 @@ zookeeper-address: # zookeeper 鍦板潃(寮�鍚泦缇ょ殑鎯呭喌涓嬬敓鏁�), 渚�: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 acquire-interval: 1000 # 鏈媺鍙栧埌娑堟伅鎯呭喌涓�,鑾峰彇娑堟伅鐨勬椂闂撮棿闅旀绉掑�� subscribe: .*\\..* # 榛樿鎯呭喌涓嬫媺鍙栨墍鏈夊簱銆佹墍鏈夎〃 +prod: + example: example + database: books ``` -### 鍦╯pring boot 椤圭洰涓殑浠g爜浣跨敤瀹炰緥 +### 鍦╯pring boot 椤圭洰涓殑浠g爜浣跨敤瀹炰緥 (娉ㄦ剰闇�瑕佷娇鐢� EnableCanalListener 娉ㄨВ寮�鍚� canal listener ) + ```java import com.alibaba.otter.canal.protocol.CanalEntry; -import com.duxinglangzi.canal.starter.annotation.CanalListener; -import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener; +import CanalInsertListener; +import CanalListener; +import CanalUpdateListener; +import EnableCanalListener; +import CanalMessage; import org.springframework.stereotype.Service; import java.util.stream.Collectors; @@ -34,64 +41,85 @@ * @author wuqiong 2022/4/12 * @description */ +@EnableCanalListener @Service public class CanalListenerTest { /** - * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalEntry.EventType , CanalEntry.RowData + * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener + * + * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalMessage * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌� */ /** + * 鐩戞帶鏇存柊鎿嶄綔 + * 鏀寔鍔ㄦ�佸弬鏁伴厤缃紝閰嶇疆椤归渶鍦� yml 鎴� properties 杩涜閰嶇疆 + * 鐩爣鏄� ${prod.example} 鐨� ${prod.database} 搴� users琛� + */ + @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"}) + public void listenerExampleBooksUsers(CanalMessage message) { + printChange("listenerExampleBooksUsers", message); + } + + /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� books搴� users琛� */ - @CanalUpdateListener(destination = "example", database = "books", table = {"users"}) - public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksUsers",eventType, rowData); + @CanalInsertListener(destination = "example", database = "books", table = {"users"}) + public void listenerExampleBooksUser(CanalMessage message) { + printChange("listenerExampleBooksUsers", message); } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� books搴� books琛� */ @CanalUpdateListener(destination = "example", database = "books", table = {"books"}) - public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksBooks",eventType, rowData); + public void listenerExampleBooksBooks(CanalMessage message) { + printChange("listenerExampleBooksBooks", message); } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� books搴撶殑鎵�鏈夎〃 */ @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE) - public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksAll",eventType, rowData); + public void listenerExampleBooksAll(CanalMessage message) { + printChange("listenerExampleBooksAll", message); } /** * 鐩戞帶鏇存柊鎿嶄綔 锛岀洰鏍囨槸 example鐨� 鎵�鏈夊簱鐨勬墍鏈夎〃 */ @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE) - public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleAll",eventType, rowData); + public void listenerExampleAll(CanalMessage message) { + printChange("listenerExampleAll", message); } /** * 鐩戞帶鏇存柊銆佸垹闄ゃ�佹柊澧炴搷浣� 锛屾墍鏈夐厤缃殑鐩爣涓嬬殑鎵�鏈夊簱鐨勬墍鏈夎〃 */ @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) - public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerAllDml",eventType, rowData); + public void listenerAllDml(CanalMessage message) { + printChange("listenerAllDml", message); } - public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + public void printChange(String method, CanalMessage message) { + CanalEntry.EventType eventType = message.getEventType(); + CanalEntry.RowData rowData = message.getRowData(); + + + System.out.println(" >>>>>>>>>>>>>[褰撳墠鏁版嵁搴�: " + message.getDataBaseName() + " ," + + "鏁版嵁搴撹〃鍚�: " + message.getTableName() + " , " + + "鏂规硶: " + method); + if (eventType == CanalEntry.EventType.DELETE) { - rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { - System.out.println("[鏂规硶: "+method+" , delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue()); + rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { + System.out.println("[鏂规硶: " + method + " , delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue()); }); } if (eventType == CanalEntry.EventType.INSERT) { rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { - System.out.println("[鏂规硶: "+method+" ,insert 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鏂板鐨勫�间负: " + ele.getValue()); + System.out.println("[鏂规硶: " + method + " ,insert 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鏂板鐨勫�间负: " + ele.getValue()); }); } @@ -99,7 +127,7 @@ for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) { CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i); CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i); - System.out.println("[鏂规硶: "+method+" , update 璇彞 ] -->> 瀛楁鍚�," + afterColumn.getName() + + System.out.println("[鏂规硶: " + method + " , update 璇彞 ] -->> 瀛楁鍚�," + afterColumn.getName() + " , 鏄惁淇敼: " + afterColumn.getUpdated() + " , 淇敼鍓嶇殑鍊�: " + beforeColumn.getValue() + " , 淇敼鍚庣殑鍊�: " + afterColumn.getValue()); -- Gitblit v1.8.0