提交 | 用户 | age
|
de8c2b
|
1 |
# Canal Spring Boot Starter 使用实例
|
D |
2 |
|
|
3 |
### 在spring boot 项目配置文件 application.yml内增加以下内容
|
|
4 |
```yaml
|
|
5 |
spring:
|
|
6 |
canal:
|
|
7 |
instances:
|
|
8 |
example: # 拉取 example 目标的数据
|
|
9 |
host: 192.168.10.179 # canal 所在机器的ip
|
|
10 |
port: 11111 # canal 默认暴露端口
|
|
11 |
user-name: canal # canal 用户名
|
|
12 |
password: canal # canal 密码
|
|
13 |
batch-size: 600 # canal 每次拉取的数据条数
|
|
14 |
retry-count: 5 # 重试次数,如果重试5次后,仍无法连接,则断开
|
|
15 |
cluster-enabled: false # 是否开启集群
|
|
16 |
zookeeper-address: # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
|
|
17 |
acquire-interval: 1000 # 未拉取到消息情况下,获取消息的时间间隔毫秒值
|
|
18 |
subscribe: .*\\..* # 默认情况下拉取所有库、所有表
|
c83c1b
|
19 |
prod:
|
D |
20 |
example: example
|
|
21 |
database: books
|
de8c2b
|
22 |
|
D |
23 |
```
|
|
24 |
|
a8f947
|
25 |
### 在spring boot 项目中的代码使用实例 (注意需要使用 EnableCanalListener 注解开启 canal listener )
|
D |
26 |
|
de8c2b
|
27 |
```java
|
D |
28 |
|
|
29 |
|
|
30 |
import com.alibaba.otter.canal.protocol.CanalEntry;
|
0b055a
|
31 |
import CanalInsertListener;
|
R |
32 |
import CanalListener;
|
|
33 |
import CanalUpdateListener;
|
|
34 |
import EnableCanalListener;
|
|
35 |
import CanalMessage;
|
de8c2b
|
36 |
import org.springframework.stereotype.Service;
|
D |
37 |
|
|
38 |
import java.util.stream.Collectors;
|
|
39 |
|
|
40 |
/**
|
|
41 |
* @author wuqiong 2022/4/12
|
|
42 |
* @description
|
|
43 |
*/
|
a8f947
|
44 |
@EnableCanalListener
|
de8c2b
|
45 |
@Service
|
D |
46 |
public class CanalListenerTest {
|
|
47 |
|
|
48 |
/**
|
c83c1b
|
49 |
* 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
|
D |
50 |
*
|
0b055a
|
51 |
* 目前 Listener 方法的参数必须为 CanalMessage
|
de8c2b
|
52 |
* 程序在启动过程中会做检查
|
D |
53 |
*/
|
a8f947
|
54 |
|
a87aa7
|
55 |
/**
|
D |
56 |
* 监控更新操作
|
|
57 |
* 支持动态参数配置,配置项需在 yml 或 properties 进行配置
|
|
58 |
* 目标是 ${prod.example} 的 ${prod.database} 库 users表
|
|
59 |
*/
|
|
60 |
@CanalUpdateListener(destination = "${prod.example}", database = "${prod.database}", table = {"users"})
|
c83c1b
|
61 |
public void listenerExampleBooksUsers(CanalMessage message) {
|
D |
62 |
printChange("listenerExampleBooksUsers", message);
|
a87aa7
|
63 |
}
|
de8c2b
|
64 |
|
D |
65 |
/**
|
|
66 |
* 监控更新操作 ,目标是 example的 books库 users表
|
|
67 |
*/
|
c83c1b
|
68 |
@CanalInsertListener(destination = "example", database = "books", table = {"users"})
|
D |
69 |
public void listenerExampleBooksUser(CanalMessage message) {
|
|
70 |
printChange("listenerExampleBooksUsers", message);
|
de8c2b
|
71 |
}
|
D |
72 |
|
|
73 |
/**
|
|
74 |
* 监控更新操作 ,目标是 example的 books库 books表
|
|
75 |
*/
|
|
76 |
@CanalUpdateListener(destination = "example", database = "books", table = {"books"})
|
c83c1b
|
77 |
public void listenerExampleBooksBooks(CanalMessage message) {
|
D |
78 |
printChange("listenerExampleBooksBooks", message);
|
de8c2b
|
79 |
}
|
D |
80 |
|
|
81 |
/**
|
|
82 |
* 监控更新操作 ,目标是 example的 books库的所有表
|
|
83 |
*/
|
|
84 |
@CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
|
c83c1b
|
85 |
public void listenerExampleBooksAll(CanalMessage message) {
|
D |
86 |
printChange("listenerExampleBooksAll", message);
|
de8c2b
|
87 |
}
|
D |
88 |
|
|
89 |
/**
|
|
90 |
* 监控更新操作 ,目标是 example的 所有库的所有表
|
|
91 |
*/
|
|
92 |
@CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
|
c83c1b
|
93 |
public void listenerExampleAll(CanalMessage message) {
|
D |
94 |
printChange("listenerExampleAll", message);
|
de8c2b
|
95 |
}
|
D |
96 |
|
|
97 |
/**
|
|
98 |
* 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表
|
|
99 |
*/
|
|
100 |
@CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
|
c83c1b
|
101 |
public void listenerAllDml(CanalMessage message) {
|
D |
102 |
printChange("listenerAllDml", message);
|
de8c2b
|
103 |
}
|
D |
104 |
|
c83c1b
|
105 |
public void printChange(String method, CanalMessage message) {
|
D |
106 |
CanalEntry.EventType eventType = message.getEventType();
|
|
107 |
CanalEntry.RowData rowData = message.getRowData();
|
|
108 |
|
|
109 |
|
0b055a
|
110 |
System.out.println(" >>>>>>>>>>>>>[当前数据库: " + message.getDataBaseName() + " ," +
|
c83c1b
|
111 |
"数据库表名: " + message.getTableName() + " , " +
|
0b055a
|
112 |
"方法: " + method);
|
c83c1b
|
113 |
|
de8c2b
|
114 |
if (eventType == CanalEntry.EventType.DELETE) {
|
af6d64
|
115 |
rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
a8f947
|
116 |
System.out.println("[方法: " + method + " , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
|
de8c2b
|
117 |
});
|
D |
118 |
}
|
|
119 |
|
|
120 |
if (eventType == CanalEntry.EventType.INSERT) {
|
|
121 |
rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
a8f947
|
122 |
System.out.println("[方法: " + method + " ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
|
de8c2b
|
123 |
});
|
D |
124 |
}
|
|
125 |
|
|
126 |
if (eventType == CanalEntry.EventType.UPDATE) {
|
|
127 |
for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
|
|
128 |
CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
|
|
129 |
CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
|
a8f947
|
130 |
System.out.println("[方法: " + method + " , update 语句 ] -->> 字段名," + afterColumn.getName() +
|
de8c2b
|
131 |
" , 是否修改: " + afterColumn.getUpdated() +
|
D |
132 |
" , 修改前的值: " + beforeColumn.getValue() +
|
|
133 |
" , 修改后的值: " + afterColumn.getValue());
|
|
134 |
}
|
|
135 |
}
|
|
136 |
}
|
|
137 |
|
|
138 |
|
|
139 |
}
|
|
140 |
|
|
141 |
|
|
142 |
|
|
143 |
```
|
|
144 |
|
|
145 |
|
|
146 |
|
|
147 |
|