renxue
2022-10-24 0b055a3f554da3a934e79e88c4781705cbab5a21
提交 | 用户 | age
de8c2b 1 # Canal Spring Boot Starter 使用实例
D 2
3 ### 在spring boot 项目配置文件 application.yml内增加以下内容
4 ```yaml
5 spring:
6   canal:
7     instances:
8       example:                  # 拉取 example 目标的数据
9         host: 192.168.10.179    # canal 所在机器的ip
10         port: 11111             # canal 默认暴露端口
11         user-name: canal        # canal 用户名
12         password: canal         # canal 密码
13         batch-size: 600         # canal 每次拉取的数据条数
14         retry-count: 5          # 重试次数,如果重试5次后,仍无法连接,则断开
15         cluster-enabled: false  # 是否开启集群
16         zookeeper-address:      # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
17         acquire-interval: 1000  # 未拉取到消息情况下,获取消息的时间间隔毫秒值
18         subscribe: .*\\..*      # 默认情况下拉取所有库、所有表
c83c1b 19 prod:
D 20   example: example
21   database: books
de8c2b 22
D 23 ```
24
a8f947 25 ### 在spring boot 项目中的代码使用实例 (注意需要使用 EnableCanalListener 注解开启 canal listener )
D 26
de8c2b 27 ```java
D 28
29
30 import com.alibaba.otter.canal.protocol.CanalEntry;
0b055a 31 import CanalInsertListener;
R 32 import CanalListener;
33 import CanalUpdateListener;
34 import EnableCanalListener;
35 import CanalMessage;
de8c2b 36 import org.springframework.stereotype.Service;
D 37
38 import java.util.stream.Collectors;
39
40 /**
41  * @author wuqiong 2022/4/12
42  * @description
43  */
a8f947 44 @EnableCanalListener
de8c2b 45 @Service
D 46 public class CanalListenerTest {
47
48     /**
c83c1b 49      * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
D 50      *
0b055a 51      * 目前 Listener 方法的参数必须为 CanalMessage
de8c2b 52      * 程序在启动过程中会做检查
D 53      */
a8f947 54
a87aa7 55     /**
D 56      * 监控更新操作
57      * 支持动态参数配置,配置项需在 yml 或 properties 进行配置
58      * 目标是 ${prod.example} 的  ${prod.database} 库  users表
59      */
60     @CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
c83c1b 61     public void listenerExampleBooksUsers(CanalMessage message) {
D 62         printChange("listenerExampleBooksUsers", message);
a87aa7 63     }
de8c2b 64
D 65     /**
66      * 监控更新操作 ,目标是 example的  books库  users表
67      */
c83c1b 68     @CanalInsertListener(destination = "example", database = "books", table = {"users"})
D 69     public void listenerExampleBooksUser(CanalMessage message) {
70         printChange("listenerExampleBooksUsers", message);
de8c2b 71     }
D 72
73     /**
74      * 监控更新操作 ,目标是 example的  books库  books表
75      */
76     @CanalUpdateListener(destination = "example", database = "books", table = {"books"})
c83c1b 77     public void listenerExampleBooksBooks(CanalMessage message) {
D 78         printChange("listenerExampleBooksBooks", message);
de8c2b 79     }
D 80
81     /**
82      * 监控更新操作 ,目标是 example的  books库的所有表
83      */
84     @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
c83c1b 85     public void listenerExampleBooksAll(CanalMessage message) {
D 86         printChange("listenerExampleBooksAll", message);
de8c2b 87     }
D 88
89     /**
90      * 监控更新操作 ,目标是 example的  所有库的所有表
91      */
92     @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
c83c1b 93     public void listenerExampleAll(CanalMessage message) {
D 94         printChange("listenerExampleAll", message);
de8c2b 95     }
D 96
97     /**
98      * 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表
99      */
100     @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
c83c1b 101     public void listenerAllDml(CanalMessage message) {
D 102         printChange("listenerAllDml", message);
de8c2b 103     }
D 104
c83c1b 105     public void printChange(String method, CanalMessage message) {
D 106         CanalEntry.EventType eventType = message.getEventType();
107         CanalEntry.RowData rowData = message.getRowData();
108
109
0b055a 110         System.out.println(" >>>>>>>>>>>>>[当前数据库: " + message.getDataBaseName() + " ," +
c83c1b 111                 "数据库表名: " + message.getTableName() + " , " +
0b055a 112                 "方法: " + method);
c83c1b 113
de8c2b 114         if (eventType == CanalEntry.EventType.DELETE) {
af6d64 115             rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
a8f947 116                 System.out.println("[方法: " + method + " ,  delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
de8c2b 117             });
D 118         }
119
120         if (eventType == CanalEntry.EventType.INSERT) {
121             rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
a8f947 122                 System.out.println("[方法: " + method + " ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
de8c2b 123             });
D 124         }
125
126         if (eventType == CanalEntry.EventType.UPDATE) {
127             for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
128                 CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
129                 CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
a8f947 130                 System.out.println("[方法: " + method + " , update 语句 ] -->> 字段名," + afterColumn.getName() +
de8c2b 131                         " , 是否修改: " + afterColumn.getUpdated() +
D 132                         " , 修改前的值: " + beforeColumn.getValue() +
133                         " , 修改后的值: " + afterColumn.getValue());
134             }
135         }
136     }
137
138
139 }
140
141
142
143 ```
144
145
146
147