README.md | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/main/resources/META-INF/spring.factories | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
README.md
@@ -22,13 +22,15 @@ ``` ### 在spring boot 项目中的代码使用实例 ### 在spring boot 项目中的代码使用实例 (注意需要使用 EnableCanalListener 注解开启 canal listener ) ```java import com.alibaba.otter.canal.protocol.CanalEntry; import com.duxinglangzi.canal.starter.annotation.CanalListener; import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener; import com.duxinglangzi.canal.starter.annotation.EnableCanalListener; import org.springframework.stereotype.Service; import java.util.stream.Collectors; @@ -37,14 +39,17 @@ * @author wuqiong 2022/4/12 * @description */ @EnableCanalListener @Service public class CanalListenerTest { /** * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener * * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData * 程序在启动过程中会做检查 */ /** * 监控更新操作 * 支持动态参数配置,配置项需在 yml 或 properties 进行配置 @@ -52,7 +57,7 @@ */ @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"}) public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksUsers",eventType, rowData); printChange("listenerExampleBooksUsers", eventType, rowData); } /** @@ -60,7 +65,7 @@ */ @CanalUpdateListener(destination = "example", database = "books", table = {"users"}) public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksUsers",eventType, rowData); printChange("listenerExampleBooksUsers", eventType, rowData); } /** @@ -68,7 +73,7 @@ */ @CanalUpdateListener(destination = "example", database = "books", table = {"books"}) public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksBooks",eventType, rowData); printChange("listenerExampleBooksBooks", eventType, rowData); } /** @@ -76,7 +81,7 @@ */ @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE) public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleBooksAll",eventType, rowData); printChange("listenerExampleBooksAll", eventType, rowData); } /** @@ -84,7 +89,7 @@ */ @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE) public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerExampleAll",eventType, rowData); printChange("listenerExampleAll", eventType, rowData); } /** @@ -92,19 +97,19 @@ */ @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { printChange("listenerAllDml",eventType, rowData); printChange("listenerAllDml", eventType, rowData); } public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) { public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) { if (eventType == CanalEntry.EventType.DELETE) { rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { System.out.println("[方法: "+method+" , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue()); System.out.println("[方法: " + method + " , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue()); }); } if (eventType == CanalEntry.EventType.INSERT) { rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { System.out.println("[方法: "+method+" ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue()); System.out.println("[方法: " + method + " ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue()); }); } @@ -112,7 +117,7 @@ for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) { CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i); CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i); System.out.println("[方法: "+method+" , update 语句 ] -->> 字段名," + afterColumn.getName() + System.out.println("[方法: " + method + " , update 语句 ] -->> 字段名," + afterColumn.getName() + " , 是否修改: " + afterColumn.getUpdated() + " , 修改前的值: " + beforeColumn.getValue() + " , 修改后的值: " + afterColumn.getValue()); src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java
New file @@ -0,0 +1,21 @@ package com.duxinglangzi.canal.starter.annotation; import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; import com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector; import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener; import org.springframework.context.annotation.Import; import java.lang.annotation.*; /** * 开启 canal listener * * @author wuqiong 2022/8/4 */ @Documented @Inherited @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class}) public @interface EnableCanalListener { } src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java
@@ -36,7 +36,7 @@ sleep(5L * SLEEP_TIME_MILLI_SECONDS); initConnect(); while (isRunning() && !Thread.currentThread().isInterrupted()) doStart(); disconnect(); // 线程被终止或者容器已经停止 disconnect(); // 线程被终止或者容器已经停止,需要关闭连接 }).start(); setRunning(true); } src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -23,16 +23,19 @@ private CanalConnector connector; private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>(); private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); private Integer local_retry_count; public void initConnect() { try { // init supportAllTypes registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType()))); registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); connector.connect(); connector.subscribe(endpointInstance.getSubscribe()); connector.rollback(); // 初始化本地重试次数 local_retry_count = endpointInstance.getRetryCount(); } catch (Exception e) { logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); setRunning(false); @@ -52,18 +55,21 @@ message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据 } catch (Exception clientException) { logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1); if (endpointInstance.getRetryCount() < 0) { if (local_retry_count > 0) { // 重试次数减一 local_retry_count = local_retry_count - 1; sleep(endpointInstance.getAcquireInterval()); } else { // 重试次数 <= 0 时,直接终止线程 logger.error("[DmlMessageTransponderContainer] retry count is zero , " + "thread interrupt , current connector host: {} , port: {} ", endpointInstance.getHost(), endpointInstance.getPort()); Thread.currentThread().interrupt(); disconnect(); } else { sleep(endpointInstance.getAcquireInterval()); } return; } // 如果重试次数小于设置的,则修改 if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount(); List<CanalEntry.Entry> entries = message.getEntries(); if (message.getId() == -1 || entries.isEmpty()) { sleep(endpointInstance.getAcquireInterval()); @@ -103,7 +109,7 @@ // 忽略 ddl 语句 if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; CanalEntry.EventType eventType = rowChange.getEventType(); if (!SUPPORT_ALL_TYPES.contains(eventType)) return; if (!support_all_types.contains(eventType)) return; for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { registrars .stream() src/main/resources/META-INF/spring.factories
@@ -1,9 +1,9 @@ # Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\ com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector # org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ # com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\ # com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector # Application Listeners org.springframework.context.ApplicationListener=\ com.duxinglangzi.canal.starter.listener.ApplicationReadyListener # org.springframework.context.ApplicationListener=\ # com.duxinglangzi.canal.starter.listener.ApplicationReadyListener