| | |
| | |
|
| | | ```
|
| | |
|
| | | ### 在spring boot 项目中的代码使用实例 |
| | | ### 在spring boot 项目中的代码使用实例 (注意需要使用 EnableCanalListener 注解开启 canal listener )
|
| | |
|
| | | ```java
|
| | |
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import com.duxinglangzi.canal.starter.annotation.CanalListener;
|
| | | import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
|
| | | import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
|
| | | import org.springframework.stereotype.Service;
|
| | |
|
| | | import java.util.stream.Collectors;
|
| | |
| | | * @author wuqiong 2022/4/12
|
| | | * @description
|
| | | */
|
| | | @EnableCanalListener
|
| | | @Service
|
| | | public class CanalListenerTest {
|
| | |
|
| | | /**
|
| | | * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener |
| | | * |
| | | * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData
|
| | | * 程序在启动过程中会做检查
|
| | | */
|
| | | |
| | |
|
| | | /**
|
| | | * 监控更新操作
|
| | | * 支持动态参数配置,配置项需在 yml 或 properties 进行配置
|
| | |
| | | */
|
| | | @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
|
| | | public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | printChange("listenerExampleBooksUsers",eventType, rowData);
|
| | | printChange("listenerExampleBooksUsers", eventType, rowData);
|
| | | }
|
| | |
|
| | | /**
|
| | |
| | | */
|
| | | @CanalUpdateListener(destination = "example", database = "books", table = {"users"})
|
| | | public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | printChange("listenerExampleBooksUsers",eventType, rowData);
|
| | | printChange("listenerExampleBooksUsers", eventType, rowData);
|
| | | }
|
| | |
|
| | | /**
|
| | |
| | | */
|
| | | @CanalUpdateListener(destination = "example", database = "books", table = {"books"})
|
| | | public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | printChange("listenerExampleBooksBooks",eventType, rowData);
|
| | | printChange("listenerExampleBooksBooks", eventType, rowData);
|
| | | }
|
| | |
|
| | | /**
|
| | |
| | | */
|
| | | @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
|
| | | public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | printChange("listenerExampleBooksAll",eventType, rowData);
|
| | | printChange("listenerExampleBooksAll", eventType, rowData);
|
| | | }
|
| | |
|
| | | /**
|
| | |
| | | */
|
| | | @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
|
| | | public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | printChange("listenerExampleAll",eventType, rowData);
|
| | | printChange("listenerExampleAll", eventType, rowData);
|
| | | }
|
| | |
|
| | | /**
|
| | |
| | | */
|
| | | @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
|
| | | public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | printChange("listenerAllDml",eventType, rowData);
|
| | | printChange("listenerAllDml", eventType, rowData);
|
| | | }
|
| | |
|
| | | public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
| | | if (eventType == CanalEntry.EventType.DELETE) {
|
| | | rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
| | | System.out.println("[方法: "+method+" , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
|
| | | System.out.println("[方法: " + method + " , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
|
| | | });
|
| | | }
|
| | |
|
| | | if (eventType == CanalEntry.EventType.INSERT) {
|
| | | rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
| | | System.out.println("[方法: "+method+" ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
|
| | | System.out.println("[方法: " + method + " ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
|
| | | });
|
| | | }
|
| | |
|
| | |
| | | for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
|
| | | CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
|
| | | CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
|
| | | System.out.println("[方法: "+method+" , update 语句 ] -->> 字段名," + afterColumn.getName() +
|
| | | System.out.println("[方法: " + method + " , update 语句 ] -->> 字段名," + afterColumn.getName() +
|
| | | " , 是否修改: " + afterColumn.getUpdated() +
|
| | | " , 修改前的值: " + beforeColumn.getValue() +
|
| | | " , 修改后的值: " + afterColumn.getValue());
|