提交 | 用户 | age
|
de8c2b
|
1 |
# Canal Spring Boot Starter 使用实例
|
D |
2 |
|
|
3 |
### 在spring boot 项目配置文件 application.yml内增加以下内容
|
|
4 |
```yaml
|
|
5 |
spring:
|
|
6 |
canal:
|
|
7 |
instances:
|
|
8 |
example: # 拉取 example 目标的数据
|
|
9 |
host: 192.168.10.179 # canal 所在机器的ip
|
|
10 |
port: 11111 # canal 默认暴露端口
|
|
11 |
user-name: canal # canal 用户名
|
|
12 |
password: canal # canal 密码
|
|
13 |
batch-size: 600 # canal 每次拉取的数据条数
|
|
14 |
retry-count: 5 # 重试次数,如果重试5次后,仍无法连接,则断开
|
|
15 |
cluster-enabled: false # 是否开启集群
|
|
16 |
zookeeper-address: # zookeeper 地址(开启集群的情况下生效), 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
|
|
17 |
acquire-interval: 1000 # 未拉取到消息情况下,获取消息的时间间隔毫秒值
|
|
18 |
subscribe: .*\\..* # 默认情况下拉取所有库、所有表
|
|
19 |
|
|
20 |
```
|
|
21 |
|
|
22 |
### 在spring boot 项目中的代码使用实例
|
|
23 |
```java
|
|
24 |
|
|
25 |
|
|
26 |
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
27 |
import com.duxinglangzi.canal.starter.annotation.CanalListener;
|
|
28 |
import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
|
|
29 |
import org.springframework.stereotype.Service;
|
|
30 |
|
|
31 |
import java.util.stream.Collectors;
|
|
32 |
|
|
33 |
/**
|
|
34 |
* @author wuqiong 2022/4/12
|
|
35 |
* @description
|
|
36 |
*/
|
|
37 |
@Service
|
|
38 |
public class CanalListenerTest {
|
|
39 |
|
|
40 |
/**
|
627420
|
41 |
* 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData
|
de8c2b
|
42 |
* 程序在启动过程中会做检查
|
D |
43 |
*/
|
|
44 |
|
|
45 |
/**
|
|
46 |
* 监控更新操作 ,目标是 example的 books库 users表
|
|
47 |
*/
|
|
48 |
@CanalUpdateListener(destination = "example", database = "books", table = {"users"})
|
|
49 |
public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
|
50 |
printChange("listenerExampleBooksUsers",eventType, rowData);
|
|
51 |
}
|
|
52 |
|
|
53 |
/**
|
|
54 |
* 监控更新操作 ,目标是 example的 books库 books表
|
|
55 |
*/
|
|
56 |
@CanalUpdateListener(destination = "example", database = "books", table = {"books"})
|
|
57 |
public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
|
58 |
printChange("listenerExampleBooksBooks",eventType, rowData);
|
|
59 |
}
|
|
60 |
|
|
61 |
/**
|
|
62 |
* 监控更新操作 ,目标是 example的 books库的所有表
|
|
63 |
*/
|
|
64 |
@CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
|
|
65 |
public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
|
66 |
printChange("listenerExampleBooksAll",eventType, rowData);
|
|
67 |
}
|
|
68 |
|
|
69 |
/**
|
|
70 |
* 监控更新操作 ,目标是 example的 所有库的所有表
|
|
71 |
*/
|
|
72 |
@CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
|
|
73 |
public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
|
74 |
printChange("listenerExampleAll",eventType, rowData);
|
|
75 |
}
|
|
76 |
|
|
77 |
/**
|
|
78 |
* 监控更新、删除、新增操作 ,所有配置的目标下的所有库的所有表
|
|
79 |
*/
|
|
80 |
@CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
|
|
81 |
public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
|
82 |
printChange("listenerAllDml",eventType, rowData);
|
|
83 |
}
|
|
84 |
|
|
85 |
public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
|
|
86 |
if (eventType == CanalEntry.EventType.DELETE) {
|
af6d64
|
87 |
rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
de8c2b
|
88 |
System.out.println("[方法: "+method+" , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
|
D |
89 |
});
|
|
90 |
}
|
|
91 |
|
|
92 |
if (eventType == CanalEntry.EventType.INSERT) {
|
|
93 |
rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
|
94 |
System.out.println("[方法: "+method+" ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
|
|
95 |
});
|
|
96 |
}
|
|
97 |
|
|
98 |
if (eventType == CanalEntry.EventType.UPDATE) {
|
|
99 |
for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
|
|
100 |
CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
|
|
101 |
CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
|
|
102 |
System.out.println("[方法: "+method+" , update 语句 ] -->> 字段名," + afterColumn.getName() +
|
|
103 |
" , 是否修改: " + afterColumn.getUpdated() +
|
|
104 |
" , 修改前的值: " + beforeColumn.getValue() +
|
|
105 |
" , 修改后的值: " + afterColumn.getValue());
|
|
106 |
}
|
|
107 |
}
|
|
108 |
}
|
|
109 |
|
|
110 |
|
|
111 |
}
|
|
112 |
|
|
113 |
|
|
114 |
|
|
115 |
```
|
|
116 |
|
|
117 |
|
|
118 |
|
|
119 |
|