duxinglangzi
2022-04-24 fa654ac361fcc231bc389ca446f78d501c7060c8
提交 | 用户 | age
de8c2b 1 # Canal Spring Boot Starter 使用实例
D 2
3 ### 在spring boot 项目配置文件 application.yml内增加以下内容
4 ```yaml
5 spring:
6   canal:
7     instances:
8       example:                  # 拉取 example 目标的数据
9         host: 192.168.10.179    # canal 所在机器的ip
10         port: 11111             # canal 默认暴露端口
11         user-name: canal        # canal 用户名
12         password: canal         # canal 密码
13         batch-size: 600         # canal 每次拉取的数据条数
14         retry-count: 5          # 重试次数,如果重试5次后,仍无法连接,则断开
15         cluster-enabled: false  # 是否开启集群
16         zookeeper-address:      # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
17         acquire-interval: 1000  # 未拉取到消息情况下,获取消息的时间间隔毫秒值
18         subscribe: .*\\..*      # 默认情况下拉取所有库、所有表
19
20 ```
21
22 ### 在spring boot 项目中的代码使用实例 
23 ```java
24
25
26 import com.alibaba.otter.canal.protocol.CanalEntry;
27 import com.duxinglangzi.canal.starter.annotation.CanalListener;
28 import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
29 import org.springframework.stereotype.Service;
30
31 import java.util.stream.Collectors;
32
33 /**
34  * @author wuqiong 2022/4/12
35  * @description
36  */
37 @Service
38 public class CanalListenerTest {
39
40     /**
627420 41      * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData 
de8c2b 42      * 程序在启动过程中会做检查
D 43      */
44
45     /**
46      * 监控更新操作 ,目标是 example的  books库  users表
47      */
48     @CanalUpdateListener(destination = "example", database = "books", table = {"users"})
49     public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
50         printChange("listenerExampleBooksUsers",eventType, rowData);
51     }
52
53     /**
54      * 监控更新操作 ,目标是 example的  books库  books表
55      */
56     @CanalUpdateListener(destination = "example", database = "books", table = {"books"})
57     public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
58         printChange("listenerExampleBooksBooks",eventType, rowData);
59     }
60
61     /**
62      * 监控更新操作 ,目标是 example的  books库的所有表
63      */
64     @CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
65     public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
66         printChange("listenerExampleBooksAll",eventType, rowData);
67     }
68
69     /**
70      * 监控更新操作 ,目标是 example的  所有库的所有表
71      */
72     @CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
73     public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
74         printChange("listenerExampleAll",eventType, rowData);
75     }
76
77     /**
78      * 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表
79      */
80     @CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
81     public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
82         printChange("listenerAllDml",eventType, rowData);
83     }
84
85     public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
86         if (eventType == CanalEntry.EventType.DELETE) {
87             rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
88                 System.out.println("[方法: "+method+" ,  delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
89             });
90         }
91
92         if (eventType == CanalEntry.EventType.INSERT) {
93             rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
94                 System.out.println("[方法: "+method+" ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
95             });
96         }
97
98         if (eventType == CanalEntry.EventType.UPDATE) {
99             for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
100                 CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
101                 CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
102                 System.out.println("[方法: "+method+" , update 语句 ] -->> 字段名," + afterColumn.getName() +
103                         " , 是否修改: " + afterColumn.getUpdated() +
104                         " , 修改前的值: " + beforeColumn.getValue() +
105                         " , 修改后的值: " + afterColumn.getValue());
106             }
107         }
108     }
109
110
111 }
112
113
114
115 ```
116
117
118
119