提交 | 用户 | age
|
de8c2b
|
1 |
# Canal Spring Boot Starter 使用实例
|
D |
2 |
|
|
3 |
### 在spring boot 项目配置文件 application.yml内增加以下内容
|
|
4 |
```yaml
|
|
5 |
spring:
|
|
6 |
canal:
|
|
7 |
instances:
|
|
8 |
example: # 拉取 example 目标的数据
|
|
9 |
host: 192.168.10.179 # canal 所在机器的ip
|
|
10 |
port: 11111 # canal 默认暴露端口
|
|
11 |
user-name: canal # canal 用户名
|
|
12 |
password: canal # canal 密码
|
|
13 |
batch-size: 600 # canal 每次拉取的数据条数
|
|
14 |
retry-count: 5 # 重试次数,如果重试5次后,仍无法连接,则断开
|
|
15 |
cluster-enabled: false # 是否开启集群
|
|
16 |
zookeeper-address: # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
|
|
17 |
acquire-interval: 1000 # 未拉取到消息情况下,获取消息的时间间隔毫秒值
|
|
18 |
subscribe: .*\\..* # 默认情况下拉取所有库、所有表
|
4c9cf7
|
19 |
prod:
|
D |
20 |
example: example1
|
|
21 |
database: books
|
de8c2b
|
22 |
|
D |
23 |
```
|
|
24 |
|
a8f947
|
25 |
### 在spring boot 项目中的代码使用实例 (注意需要使用 EnableCanalListener 注解开启 canal listener )
|
D |
26 |
|
de8c2b
|
27 |
```java
|
D |
28 |
|
|
29 |
|
|
30 |
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
31 |
import com.duxinglangzi.canal.starter.annotation.CanalListener;
|
|
32 |
import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
|
a8f947
|
33 |
import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
|
de8c2b
|
34 |
import org.springframework.stereotype.Service;
|
D |
35 |
|
|
36 |
import java.util.stream.Collectors;
|
|
37 |
|
|
38 |
/**
|
|
39 |
* @author wuqiong 2022/4/12
|
|
40 |
* @description
|
|
41 |
*/
|
a8f947
|
42 |
@EnableCanalListener
|
de8c2b
|
43 |
@Service
|
D |
44 |
public class CanalListenerTest {
|
|
45 |
|
|
46 |
/**
|
a8f947
|
47 |
* 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
|
D |
48 |
*
|
627420
|
49 |
* 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData
|
de8c2b
|
50 |
* 程序在启动过程中会做检查
|
D |
51 |
*/
|
a8f947
|
52 |
|
a87aa7
|
53 |
/**
|
D |
54 |
* 监控更新操作
|
|
55 |
* 支持动态参数配置,配置项需在 yml 或 properties 进行配置
|
|
56 |
* 目标是 ${prod.example} 的 ${prod.database} 库 users表
|
|
57 |
*/
|
|
58 |
@CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
|
|
59 |
public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
a8f947
|
60 |
printChange("listenerExampleBooksUsers", eventType, rowData);
|
a87aa7
|
61 |
}
|
de8c2b
|
62 |
|
D |
63 |
/**
|
|
64 |
* 监控更新操作 ,目标是 example的 books库 users表
|
|
65 |
*/
|
|
66 |
@CanalUpdateListener(destination = "example", database = "books", table = {"users"})
|
|
67 |
public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
a8f947
|
68 |
printChange("listenerExampleBooksUsers", eventType, rowData);
|
de8c2b
|
69 |
}
|
D |
70 |
|
|
71 |
/**
|
|
72 |
* 监控更新操作 ,目标是 example的 books库 books表
|
|
73 |
*/
|
|
74 |
@CanalUpdateListener(destination = "example", database = "books", table = {"books"})
|
|
75 |
public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
a8f947
|
76 |
printChange("listenerExampleBooksBooks", eventType, rowData);
|
de8c2b
|
77 |
}
|
D |
78 |
|
|
79 |
/**
|
|
80 |
* 监控更新操作 ,目标是 example的 books库的所有表
|
|
81 |
*/
|
|
82 |
@CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
|
|
83 |
public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
a8f947
|
84 |
printChange("listenerExampleBooksAll", eventType, rowData);
|
de8c2b
|
85 |
}
|
D |
86 |
|
|
87 |
/**
|
|
88 |
* 监控更新操作 ,目标是 example的 所有库的所有表
|
|
89 |
*/
|
|
90 |
@CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
|
|
91 |
public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
a8f947
|
92 |
printChange("listenerExampleAll", eventType, rowData);
|
de8c2b
|
93 |
}
|
D |
94 |
|
|
95 |
/**
|
|
96 |
* 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表
|
|
97 |
*/
|
|
98 |
@CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
|
|
99 |
public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
a8f947
|
100 |
printChange("listenerAllDml", eventType, rowData);
|
de8c2b
|
101 |
}
|
D |
102 |
|
a8f947
|
103 |
public void printChange(String method, CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
de8c2b
|
104 |
if (eventType == CanalEntry.EventType.DELETE) {
|
af6d64
|
105 |
rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
a8f947
|
106 |
System.out.println("[方法: " + method + " , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
|
de8c2b
|
107 |
});
|
D |
108 |
}
|
|
109 |
|
|
110 |
if (eventType == CanalEntry.EventType.INSERT) {
|
|
111 |
rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
a8f947
|
112 |
System.out.println("[方法: " + method + " ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
|
de8c2b
|
113 |
});
|
D |
114 |
}
|
|
115 |
|
|
116 |
if (eventType == CanalEntry.EventType.UPDATE) {
|
|
117 |
for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
|
|
118 |
CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
|
|
119 |
CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
|
a8f947
|
120 |
System.out.println("[方法: " + method + " , update 语句 ] -->> 字段名," + afterColumn.getName() +
|
de8c2b
|
121 |
" , 是否修改: " + afterColumn.getUpdated() +
|
D |
122 |
" , 修改前的值: " + beforeColumn.getValue() +
|
|
123 |
" , 修改后的值: " + afterColumn.getValue());
|
|
124 |
}
|
|
125 |
}
|
|
126 |
}
|
|
127 |
|
|
128 |
|
|
129 |
}
|
|
130 |
|
|
131 |
|
|
132 |
|
|
133 |
```
|
|
134 |
|
|
135 |
|
|
136 |
|
|
137 |
|