From a8f947d2bc621051821e0cc57335aa6ca1776a8e Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期四, 04 八月 2022 18:16:29 +0800 Subject: [PATCH] 修改启用方式为EnableCanalListener注解 修改重试次数瑕疵 --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 22 +++++++---- src/main/resources/META-INF/spring.factories | 10 ++-- src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java | 2 README.md | 29 ++++++++------ src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java | 21 ++++++++++ 5 files changed, 58 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 30e716e..706c80c 100644 --- a/README.md +++ b/README.md @@ -22,13 +22,15 @@ ``` -### 鍦╯pring boot 椤圭洰涓殑浠g爜浣跨敤瀹炰緥 +### 鍦╯pring boot 椤圭洰涓殑浠g爜浣跨敤瀹炰緥 (娉ㄦ剰闇�瑕佷娇鐢� EnableCanalListener 娉ㄨВ寮�鍚� canal listener ) + ```java import com.alibaba.otter.canal.protocol.CanalEntry; import com.duxinglangzi.canal.starter.annotation.CanalListener; import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener; +import com.duxinglangzi.canal.starter.annotation.EnableCanalListener; import org.springframework.stereotype.Service; import java.util.stream.Collectors; @@ -37,14 +39,17 @@ * @author wuqiong 2022/4/12 * @description */ +@EnableCanalListener @Service public class CanalListenerTest { /** + * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener + * * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalEntry.EventType , CanalEntry.RowData * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌� */ - + /** * 鐩戞帶鏇存柊鎿嶄綔 * 鏀寔鍔ㄦ�佸弬鏁伴厤缃紝閰嶇疆椤归渶鍦� yml 鎴� properties 杩涜閰嶇疆 @@ -52,7 +57,7 @@ */ @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"}) public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksUsers",eventType, rowData); + printChange("listenerExampleBooksUsers", eventType, rowData); } /** @@ -60,7 +65,7 @@ */ @CanalUpdateListener(destination = "example", database = "books", table = {"users"}) public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksUsers",eventType, rowData); + printChange("listenerExampleBooksUsers", eventType, rowData); } /** @@ -68,7 +73,7 @@ */ @CanalUpdateListener(destination = "example", database = "books", table = {"books"}) public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksBooks",eventType, rowData); + printChange("listenerExampleBooksBooks", eventType, rowData); } /** @@ -76,7 +81,7 @@ */ @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE) public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleBooksAll",eventType, rowData); + printChange("listenerExampleBooksAll", eventType, rowData); } /** @@ -84,7 +89,7 @@ */ @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE) public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerExampleAll",eventType, rowData); + printChange("listenerExampleAll", eventType, rowData); } /** @@ -92,19 +97,19 @@ */ @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { - printChange("listenerAllDml",eventType, rowData); + printChange("listenerAllDml", eventType, rowData); } - public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) { + public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { if (eventType == CanalEntry.EventType.DELETE) { rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { - System.out.println("[鏂规硶: "+method+" , delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue()); + System.out.println("[鏂规硶: " + method + " , delete 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鍒犻櫎鐨勫�间负: " + ele.getValue()); }); } if (eventType == CanalEntry.EventType.INSERT) { rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { - System.out.println("[鏂规硶: "+method+" ,insert 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鏂板鐨勫�间负: " + ele.getValue()); + System.out.println("[鏂规硶: " + method + " ,insert 璇彞 ] --->> 瀛楁鍚�: " + ele.getName() + ", 鏂板鐨勫�间负: " + ele.getValue()); }); } @@ -112,7 +117,7 @@ for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) { CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i); CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i); - System.out.println("[鏂规硶: "+method+" , update 璇彞 ] -->> 瀛楁鍚�," + afterColumn.getName() + + System.out.println("[鏂规硶: " + method + " , update 璇彞 ] -->> 瀛楁鍚�," + afterColumn.getName() + " , 鏄惁淇敼: " + afterColumn.getUpdated() + " , 淇敼鍓嶇殑鍊�: " + beforeColumn.getValue() + " , 淇敼鍚庣殑鍊�: " + afterColumn.getValue()); diff --git a/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java b/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java new file mode 100644 index 0000000..aa8208e --- /dev/null +++ b/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java @@ -0,0 +1,21 @@ +package com.duxinglangzi.canal.starter.annotation; + +import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; +import com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector; +import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + +/** + * 寮�鍚� canal listener + * + * @author wuqiong 2022/8/4 + */ +@Documented +@Inherited +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class}) +public @interface EnableCanalListener { +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java index 9cfe177..c0a5094 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java @@ -36,7 +36,7 @@ sleep(5L * SLEEP_TIME_MILLI_SECONDS); initConnect(); while (isRunning() && !Thread.currentThread().isInterrupted()) doStart(); - disconnect(); // 绾跨▼琚粓姝㈡垨鑰呭鍣ㄥ凡缁忓仠姝� + disconnect(); // 绾跨▼琚粓姝㈡垨鑰呭鍣ㄥ凡缁忓仠姝�,闇�瑕佸叧闂繛鎺� }).start(); setRunning(true); } diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index 0374716..e25a8df 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -23,16 +23,19 @@ private CanalConnector connector; private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); - private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>(); + private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); + private Integer local_retry_count; public void initConnect() { try { // init supportAllTypes - registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType()))); + registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); connector.connect(); connector.subscribe(endpointInstance.getSubscribe()); connector.rollback(); + // 鍒濆鍖栨湰鍦伴噸璇曟鏁� + local_retry_count = endpointInstance.getRetryCount(); } catch (Exception e) { logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); setRunning(false); @@ -52,18 +55,21 @@ message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� } catch (Exception clientException) { logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); - endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1); - if (endpointInstance.getRetryCount() < 0) { + if (local_retry_count > 0) { + // 閲嶈瘯娆℃暟鍑忎竴 + local_retry_count = local_retry_count - 1; + sleep(endpointInstance.getAcquireInterval()); + } else { + // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ logger.error("[DmlMessageTransponderContainer] retry count is zero , " + "thread interrupt , current connector host: {} , port: {} ", endpointInstance.getHost(), endpointInstance.getPort()); Thread.currentThread().interrupt(); - disconnect(); - } else { - sleep(endpointInstance.getAcquireInterval()); } return; } + // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� + if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount(); List<CanalEntry.Entry> entries = message.getEntries(); if (message.getId() == -1 || entries.isEmpty()) { sleep(endpointInstance.getAcquireInterval()); @@ -103,7 +109,7 @@ // 蹇界暐 ddl 璇彞 if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; CanalEntry.EventType eventType = rowChange.getEventType(); - if (!SUPPORT_ALL_TYPES.contains(eventType)) return; + if (!support_all_types.contains(eventType)) return; for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { registrars .stream() diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories index 6ded0cf..5fb7ad5 100644 --- a/src/main/resources/META-INF/spring.factories +++ b/src/main/resources/META-INF/spring.factories @@ -1,9 +1,9 @@ # Auto Configure -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\ -com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector +# org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +# com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\ +# com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector # Application Listeners -org.springframework.context.ApplicationListener=\ -com.duxinglangzi.canal.starter.listener.ApplicationReadyListener +# org.springframework.context.ApplicationListener=\ +# com.duxinglangzi.canal.starter.listener.ApplicationReadyListener -- Gitblit v1.8.0