15 文件已重命名
1个文件已删除
1个文件已添加
3个文件已修改
| | |
| | |
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import com.duxinglangzi.canal.starter.annotation.CanalInsertListener;
|
| | | import com.duxinglangzi.canal.starter.annotation.CanalListener;
|
| | | import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
|
| | | import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
|
| | | import com.duxinglangzi.canal.starter.mode.CanalMessage;
|
| | | import CanalInsertListener;
|
| | | import CanalListener;
|
| | | import CanalUpdateListener;
|
| | | import EnableCanalListener;
|
| | | import CanalMessage;
|
| | | import org.springframework.stereotype.Service;
|
| | |
|
| | | import java.util.stream.Collectors;
|
| | |
| | | /**
|
| | | * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
|
| | | *
|
| | | * 目前 Listener 方法的参数必须为 com.duxinglangzi.canal.starter.mode.CanalMessage
|
| | | * 目前 Listener 方法的参数必须为 CanalMessage
|
| | | * 程序在启动过程中会做检查
|
| | | */
|
| | |
|
| | |
| | | CanalEntry.RowData rowData = message.getRowData();
|
| | |
|
| | |
|
| | | System.out.println(" >>>>>>>>>>>>>[当前数据库: "+message.getDataBaseName()+" ," +
|
| | | System.out.println(" >>>>>>>>>>>>>[当前数据库: " + message.getDataBaseName() + " ," +
|
| | | "数据库表名: " + message.getTableName() + " , " +
|
| | | "方法: " + method );
|
| | | "方法: " + method);
|
| | |
|
| | | if (eventType == CanalEntry.EventType.DELETE) {
|
| | | rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
|
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalDeleteListener.java |
| | |
| | | package com.duxinglangzi.canal.starter.annotation;
|
| | |
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import org.springframework.core.annotation.AliasFor;
|
| | |
|
| | | import java.lang.annotation.ElementType;
|
| | | import java.lang.annotation.Retention;
|
| | | import java.lang.annotation.RetentionPolicy;
|
| | | import java.lang.annotation.Target;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | @Target(ElementType.METHOD)
|
| | | @Retention(RetentionPolicy.RUNTIME)
|
| | | @CanalListener(eventType = {CanalEntry.EventType.DELETE})
|
| | | public @interface CanalDeleteListener {
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String destination();
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String database();
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String[] table();
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.annotation; |
| | | |
| | | |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | import org.springframework.core.annotation.AliasFor; |
| | | |
| | | import java.lang.annotation.ElementType; |
| | | import java.lang.annotation.Retention; |
| | | import java.lang.annotation.RetentionPolicy; |
| | | import java.lang.annotation.Target; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | @Target(ElementType.METHOD) |
| | | @Retention(RetentionPolicy.RUNTIME) |
| | | @CanalListener(eventType = {CanalEntry.EventType.DELETE}) |
| | | public @interface CanalDeleteListener { |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String destination(); |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String database(); |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String[] table(); |
| | | |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalInsertListener.java |
| | |
| | | package com.duxinglangzi.canal.starter.annotation;
|
| | |
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import org.springframework.core.annotation.AliasFor;
|
| | |
|
| | | import java.lang.annotation.ElementType;
|
| | | import java.lang.annotation.Retention;
|
| | | import java.lang.annotation.RetentionPolicy;
|
| | | import java.lang.annotation.Target;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | @Target(ElementType.METHOD)
|
| | | @Retention(RetentionPolicy.RUNTIME)
|
| | | @CanalListener(eventType = CanalEntry.EventType.INSERT)
|
| | | public @interface CanalInsertListener {
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String destination();
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String database();
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String[] table();
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.annotation; |
| | | |
| | | |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | import org.springframework.core.annotation.AliasFor; |
| | | |
| | | import java.lang.annotation.ElementType; |
| | | import java.lang.annotation.Retention; |
| | | import java.lang.annotation.RetentionPolicy; |
| | | import java.lang.annotation.Target; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | @Target(ElementType.METHOD) |
| | | @Retention(RetentionPolicy.RUNTIME) |
| | | @CanalListener(eventType = CanalEntry.EventType.INSERT) |
| | | public @interface CanalInsertListener { |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String destination(); |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String database(); |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String[] table(); |
| | | |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalListener.java |
| | |
| | | package com.duxinglangzi.canal.starter.annotation;
|
| | |
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | |
|
| | | import java.lang.annotation.ElementType;
|
| | | import java.lang.annotation.Retention;
|
| | | import java.lang.annotation.RetentionPolicy;
|
| | | import java.lang.annotation.Target;
|
| | |
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
|
| | | @Retention(RetentionPolicy.RUNTIME)
|
| | | public @interface CanalListener {
|
| | |
|
| | |
|
| | | /**
|
| | | * 如果未进行配置,则使用配置文件里全部 destination
|
| | | */
|
| | | String destination() default "";
|
| | |
|
| | | /**
|
| | | * 数据库名
|
| | | */
|
| | | String database() default "";
|
| | |
|
| | | /**
|
| | | * 数据表名
|
| | | */
|
| | | String[] table() default "";
|
| | |
|
| | | /**
|
| | | * 数据变动类型,此处请注意,默认不包含 DDL
|
| | | */
|
| | | CanalEntry.EventType[] eventType();
|
| | |
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.annotation; |
| | | |
| | | |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | |
| | | import java.lang.annotation.ElementType; |
| | | import java.lang.annotation.Retention; |
| | | import java.lang.annotation.RetentionPolicy; |
| | | import java.lang.annotation.Target; |
| | | |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) |
| | | @Retention(RetentionPolicy.RUNTIME) |
| | | public @interface CanalListener { |
| | | |
| | | |
| | | /** |
| | | * 如果未进行配置,则使用配置文件里全部 destination |
| | | */ |
| | | String destination() default ""; |
| | | |
| | | /** |
| | | * 数据库名 |
| | | */ |
| | | String database() default ""; |
| | | |
| | | /** |
| | | * 数据表名 |
| | | */ |
| | | String[] table() default ""; |
| | | |
| | | /** |
| | | * 数据变动类型,此处请注意,默认不包含 DDL |
| | | */ |
| | | CanalEntry.EventType[] eventType(); |
| | | |
| | | |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalUpdateListener.java |
| | |
| | | package com.duxinglangzi.canal.starter.annotation;
|
| | |
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import org.springframework.core.annotation.AliasFor;
|
| | |
|
| | | import java.lang.annotation.ElementType;
|
| | | import java.lang.annotation.Retention;
|
| | | import java.lang.annotation.RetentionPolicy;
|
| | | import java.lang.annotation.Target;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | @Target(ElementType.METHOD)
|
| | | @Retention(RetentionPolicy.RUNTIME)
|
| | | @CanalListener(eventType = CanalEntry.EventType.UPDATE)
|
| | | public @interface CanalUpdateListener {
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String destination();
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String database();
|
| | |
|
| | | @AliasFor(annotation = CanalListener.class)
|
| | | String[] table();
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.annotation; |
| | | |
| | | |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | import org.springframework.core.annotation.AliasFor; |
| | | |
| | | import java.lang.annotation.ElementType; |
| | | import java.lang.annotation.Retention; |
| | | import java.lang.annotation.RetentionPolicy; |
| | | import java.lang.annotation.Target; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | @Target(ElementType.METHOD) |
| | | @Retention(RetentionPolicy.RUNTIME) |
| | | @CanalListener(eventType = CanalEntry.EventType.UPDATE) |
| | | public @interface CanalUpdateListener { |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String destination(); |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String database(); |
| | | |
| | | @AliasFor(annotation = CanalListener.class) |
| | | String[] table(); |
| | | |
| | | } |
New file |
| | |
| | | package com.hz.canal.starter.annotation; |
| | | |
| | | import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; |
| | | import com.hz.canal.starter.configuration.CanalConfigurationSelector; |
| | | import com.hz.canal.starter.listener.ApplicationReadyListener; |
| | | import org.springframework.context.annotation.Import; |
| | | |
| | | import java.lang.annotation.*; |
| | | |
| | | /** |
| | | * 开启 canal listener |
| | | * |
| | | * @author wuqiong 2022/8/4 |
| | | */ |
| | | @Documented |
| | | @Inherited |
| | | @Target(ElementType.TYPE) |
| | | @Retention(RetentionPolicy.RUNTIME) |
| | | @Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class}) |
| | | public @interface EnableCanalListener { |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalAutoConfigurationProperties.java |
| | |
| | | package com.duxinglangzi.canal.starter.configuration;
|
| | |
|
| | | import org.springframework.boot.context.properties.ConfigurationProperties;
|
| | | import org.springframework.core.Ordered;
|
| | | import org.springframework.core.annotation.Order;
|
| | |
|
| | | import java.util.LinkedHashMap;
|
| | | import java.util.Map;
|
| | |
|
| | | /**
|
| | | * Canal连接的配置类
|
| | | *
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | @Order(Ordered.HIGHEST_PRECEDENCE)
|
| | | @ConfigurationProperties(prefix = "spring.canal")
|
| | | public class CanalAutoConfigurationProperties {
|
| | |
|
| | | private Map<String, EndpointInstance> instances = new LinkedHashMap<>();
|
| | |
|
| | | public static class EndpointInstance {
|
| | |
|
| | | /**
|
| | | * 是否开启 cluster
|
| | | */
|
| | | private boolean clusterEnabled;
|
| | |
|
| | | /**
|
| | | * zookeeper 地址, 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181
|
| | | */
|
| | | private String zookeeperAddress;
|
| | |
|
| | | /**
|
| | | * 默认 127.0.0.1
|
| | | */
|
| | | private String host = "127.0.0.1";
|
| | |
|
| | | /**
|
| | | * 端口 , 默认: 11111
|
| | | */
|
| | | private int port = 11111;
|
| | |
|
| | | /**
|
| | | * 用户名
|
| | | */
|
| | | private String userName = "";
|
| | |
|
| | | /**
|
| | | * 密码
|
| | | */
|
| | | private String password = "";
|
| | |
|
| | | /**
|
| | | * 每次获取数据条数 , 默认: 200
|
| | | */
|
| | | private int batchSize = 200;
|
| | |
|
| | | /**
|
| | | * 发生错误时重试次数 , 默认: 5
|
| | | */
|
| | | private int retryCount = 5;
|
| | |
|
| | | /**
|
| | | * mysql 数据解析关注的表,Perl正则表达式.
|
| | | * <p>
|
| | | * <p>
|
| | | * 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
|
| | | * <p>
|
| | | * <p>
|
| | | * 常见例子: <p>
|
| | | * 1. 所有库表:.* or .*\\..* <p>
|
| | | * 2. canal_db 下所有表: canal_db\\..* <p>
|
| | | * 3. canal_db 下的以canal打头的表: canal_db\\.canal.* <p>
|
| | | * 4. canal_db 下的一张表: canal_db\\.test1 <p>
|
| | | * 5. 多个规则组合使用:canal_db\\..*,mysql_db.test1,mysql.test2 (逗号分隔) <p>
|
| | | * <p>
|
| | | * 默认: 全库全表(.*\\..*)
|
| | | */
|
| | | private String subscribe = ".*\\..*";
|
| | |
|
| | | /**
|
| | | * 未拉取到消息情况下,获取消息的时间间隔毫秒值 , 默认: 1000
|
| | | */
|
| | | private long acquireInterval = 1000;
|
| | |
|
| | | public EndpointInstance() {
|
| | | }
|
| | |
|
| | | public boolean isClusterEnabled() {
|
| | | return clusterEnabled;
|
| | | }
|
| | |
|
| | | public void setClusterEnabled(boolean clusterEnabled) {
|
| | | this.clusterEnabled = clusterEnabled;
|
| | | }
|
| | |
|
| | | public String getZookeeperAddress() {
|
| | | return zookeeperAddress;
|
| | | }
|
| | |
|
| | | public void setZookeeperAddress(String zookeeperAddress) {
|
| | | this.zookeeperAddress = zookeeperAddress;
|
| | | }
|
| | |
|
| | | public String getHost() {
|
| | | return host;
|
| | | }
|
| | |
|
| | | public void setHost(String host) {
|
| | | this.host = host;
|
| | | }
|
| | |
|
| | | public int getPort() {
|
| | | return port;
|
| | | }
|
| | |
|
| | | public void setPort(int port) {
|
| | | this.port = port;
|
| | | }
|
| | |
|
| | | public String getUserName() {
|
| | | return userName;
|
| | | }
|
| | |
|
| | | public void setUserName(String userName) {
|
| | | this.userName = userName;
|
| | | }
|
| | |
|
| | | public String getPassword() {
|
| | | return password;
|
| | | }
|
| | |
|
| | | public void setPassword(String password) {
|
| | | this.password = password;
|
| | | }
|
| | |
|
| | | public int getBatchSize() {
|
| | | return batchSize;
|
| | | }
|
| | |
|
| | | public void setBatchSize(int batchSize) {
|
| | | this.batchSize = batchSize;
|
| | | }
|
| | |
|
| | | public int getRetryCount() {
|
| | | return retryCount;
|
| | | }
|
| | |
|
| | | public void setRetryCount(int retryCount) {
|
| | | this.retryCount = retryCount;
|
| | | }
|
| | |
|
| | | public long getAcquireInterval() {
|
| | | return acquireInterval;
|
| | | }
|
| | |
|
| | | public void setAcquireInterval(long acquireInterval) {
|
| | | this.acquireInterval = acquireInterval;
|
| | | }
|
| | |
|
| | | public String getSubscribe() {
|
| | | return subscribe;
|
| | | }
|
| | |
|
| | | public void setSubscribe(String subscribe) {
|
| | | this.subscribe = subscribe;
|
| | | }
|
| | | }
|
| | |
|
| | | public Map<String, EndpointInstance> getInstances() {
|
| | | return instances;
|
| | | }
|
| | |
|
| | | public void setInstances(Map<String, EndpointInstance> instances) {
|
| | | this.instances = instances;
|
| | | }
|
| | | }
|
| | | package com.hz.canal.starter.configuration; |
| | | |
| | | import org.springframework.boot.context.properties.ConfigurationProperties; |
| | | import org.springframework.core.Ordered; |
| | | import org.springframework.core.annotation.Order; |
| | | |
| | | import java.util.LinkedHashMap; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * Canal连接的配置类 |
| | | * |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | @Order(Ordered.HIGHEST_PRECEDENCE) |
| | | @ConfigurationProperties(prefix = "spring.canal") |
| | | public class CanalAutoConfigurationProperties { |
| | | |
| | | private Map<String, EndpointInstance> instances = new LinkedHashMap<>(); |
| | | |
| | | public static class EndpointInstance { |
| | | |
| | | /** |
| | | * 是否开启 cluster |
| | | */ |
| | | private boolean clusterEnabled; |
| | | |
| | | /** |
| | | * zookeeper 地址, 例: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 |
| | | */ |
| | | private String zookeeperAddress; |
| | | |
| | | /** |
| | | * 默认 127.0.0.1 |
| | | */ |
| | | private String host = "127.0.0.1"; |
| | | |
| | | /** |
| | | * 端口 , 默认: 11111 |
| | | */ |
| | | private int port = 11111; |
| | | |
| | | /** |
| | | * 用户名 |
| | | */ |
| | | private String userName = ""; |
| | | |
| | | /** |
| | | * 密码 |
| | | */ |
| | | private String password = ""; |
| | | |
| | | /** |
| | | * 每次获取数据条数 , 默认: 200 |
| | | */ |
| | | private int batchSize = 200; |
| | | |
| | | /** |
| | | * 发生错误时重试次数 , 默认: 5 |
| | | */ |
| | | private int retryCount = 5; |
| | | |
| | | /** |
| | | * mysql 数据解析关注的表,Perl正则表达式. |
| | | * <p> |
| | | * <p> |
| | | * 多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) |
| | | * <p> |
| | | * <p> |
| | | * 常见例子: <p> |
| | | * 1. 所有库表:.* or .*\\..* <p> |
| | | * 2. canal_db 下所有表: canal_db\\..* <p> |
| | | * 3. canal_db 下的以canal打头的表: canal_db\\.canal.* <p> |
| | | * 4. canal_db 下的一张表: canal_db\\.test1 <p> |
| | | * 5. 多个规则组合使用:canal_db\\..*,mysql_db.test1,mysql.test2 (逗号分隔) <p> |
| | | * <p> |
| | | * 默认: 全库全表(.*\\..*) |
| | | */ |
| | | private String subscribe = ".*\\..*"; |
| | | |
| | | /** |
| | | * 未拉取到消息情况下,获取消息的时间间隔毫秒值 , 默认: 1000 |
| | | */ |
| | | private long acquireInterval = 1000; |
| | | |
| | | public EndpointInstance() { |
| | | } |
| | | |
| | | public boolean isClusterEnabled() { |
| | | return clusterEnabled; |
| | | } |
| | | |
| | | public void setClusterEnabled(boolean clusterEnabled) { |
| | | this.clusterEnabled = clusterEnabled; |
| | | } |
| | | |
| | | public String getZookeeperAddress() { |
| | | return zookeeperAddress; |
| | | } |
| | | |
| | | public void setZookeeperAddress(String zookeeperAddress) { |
| | | this.zookeeperAddress = zookeeperAddress; |
| | | } |
| | | |
| | | public String getHost() { |
| | | return host; |
| | | } |
| | | |
| | | public void setHost(String host) { |
| | | this.host = host; |
| | | } |
| | | |
| | | public int getPort() { |
| | | return port; |
| | | } |
| | | |
| | | public void setPort(int port) { |
| | | this.port = port; |
| | | } |
| | | |
| | | public String getUserName() { |
| | | return userName; |
| | | } |
| | | |
| | | public void setUserName(String userName) { |
| | | this.userName = userName; |
| | | } |
| | | |
| | | public String getPassword() { |
| | | return password; |
| | | } |
| | | |
| | | public void setPassword(String password) { |
| | | this.password = password; |
| | | } |
| | | |
| | | public int getBatchSize() { |
| | | return batchSize; |
| | | } |
| | | |
| | | public void setBatchSize(int batchSize) { |
| | | this.batchSize = batchSize; |
| | | } |
| | | |
| | | public int getRetryCount() { |
| | | return retryCount; |
| | | } |
| | | |
| | | public void setRetryCount(int retryCount) { |
| | | this.retryCount = retryCount; |
| | | } |
| | | |
| | | public long getAcquireInterval() { |
| | | return acquireInterval; |
| | | } |
| | | |
| | | public void setAcquireInterval(long acquireInterval) { |
| | | this.acquireInterval = acquireInterval; |
| | | } |
| | | |
| | | public String getSubscribe() { |
| | | return subscribe; |
| | | } |
| | | |
| | | public void setSubscribe(String subscribe) { |
| | | this.subscribe = subscribe; |
| | | } |
| | | } |
| | | |
| | | public Map<String, EndpointInstance> getInstances() { |
| | | return instances; |
| | | } |
| | | |
| | | public void setInstances(Map<String, EndpointInstance> instances) { |
| | | this.instances = instances; |
| | | } |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalBootstrapConfiguration.java |
| | |
| | | package com.duxinglangzi.canal.starter.configuration;
|
| | |
|
| | | import org.springframework.beans.factory.support.BeanDefinitionRegistry;
|
| | | import org.springframework.beans.factory.support.RootBeanDefinition;
|
| | | import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
|
| | | import org.springframework.core.type.AnnotationMetadata;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/12
|
| | | */
|
| | | public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar {
|
| | |
|
| | | public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME =
|
| | | "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor";
|
| | |
|
| | | /**
|
| | | * 注册 CanalListenerAnnotationBeanPostProcessor 到spring bean 容器内
|
| | | *
|
| | | * @param importingClassMetadata
|
| | | * @param registry
|
| | | * @return void
|
| | | * @author wuqiong 2022-04-23 20:21
|
| | | */
|
| | | @Override
|
| | | public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
|
| | | if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {
|
| | |
|
| | | registry.registerBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
|
| | | new RootBeanDefinition(CanalListenerAnnotationBeanPostProcessor.class));
|
| | | }
|
| | |
|
| | |
|
| | | }
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.configuration; |
| | | |
| | | import org.springframework.beans.factory.support.BeanDefinitionRegistry; |
| | | import org.springframework.beans.factory.support.RootBeanDefinition; |
| | | import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; |
| | | import org.springframework.core.type.AnnotationMetadata; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/12 |
| | | */ |
| | | public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar { |
| | | |
| | | public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME = |
| | | "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor"; |
| | | |
| | | /** |
| | | * 注册 CanalListenerAnnotationBeanPostProcessor 到spring bean 容器内 |
| | | * |
| | | * @param importingClassMetadata |
| | | * @param registry |
| | | * @return void |
| | | * @author wuqiong 2022-04-23 20:21 |
| | | */ |
| | | @Override |
| | | public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { |
| | | if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) { |
| | | |
| | | registry.registerBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME, |
| | | new RootBeanDefinition(CanalListenerAnnotationBeanPostProcessor.class)); |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalConfigurationSelector.java |
| | |
| | | package com.duxinglangzi.canal.starter.configuration;
|
| | |
|
| | | import org.springframework.context.annotation.DeferredImportSelector;
|
| | | import org.springframework.core.type.AnnotationMetadata;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/12
|
| | | */
|
| | | public class CanalConfigurationSelector implements DeferredImportSelector {
|
| | |
|
| | | /**
|
| | | * 扫描器 导入指定类, 这里导入 CanalBootstrapConfiguration.class
|
| | | *
|
| | | * @param importingClassMetadata
|
| | | * @return java.lang.String[]
|
| | | * @author wuqiong 2022-04-23 20:20
|
| | | */
|
| | | @Override
|
| | | public String[] selectImports(AnnotationMetadata importingClassMetadata) {
|
| | | return new String[]{CanalBootstrapConfiguration.class.getName()};
|
| | | }
|
| | | }
|
| | | package com.hz.canal.starter.configuration; |
| | | |
| | | import org.springframework.context.annotation.DeferredImportSelector; |
| | | import org.springframework.core.type.AnnotationMetadata; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/12 |
| | | */ |
| | | public class CanalConfigurationSelector implements DeferredImportSelector { |
| | | |
| | | /** |
| | | * 扫描器 导入指定类, 这里导入 CanalBootstrapConfiguration.class |
| | | * |
| | | * @param importingClassMetadata |
| | | * @return java.lang.String[] |
| | | * @author wuqiong 2022-04-23 20:20 |
| | | */ |
| | | @Override |
| | | public String[] selectImports(AnnotationMetadata importingClassMetadata) { |
| | | return new String[]{CanalBootstrapConfiguration.class.getName()}; |
| | | } |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java |
| | |
| | | package com.duxinglangzi.canal.starter.configuration;
|
| | |
|
| | |
|
| | | import com.duxinglangzi.canal.starter.annotation.CanalListener;
|
| | | import com.duxinglangzi.canal.starter.factory.TransponderContainerFactory;
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | | import org.springframework.aop.support.AopUtils;
|
| | | import org.springframework.beans.BeansException;
|
| | | import org.springframework.beans.factory.SmartInitializingSingleton;
|
| | | import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
|
| | | import org.springframework.beans.factory.config.BeanPostProcessor;
|
| | | import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
| | | import org.springframework.context.ApplicationContext;
|
| | | import org.springframework.context.ApplicationContextAware;
|
| | | import org.springframework.core.MethodIntrospector;
|
| | | import org.springframework.core.annotation.AnnotatedElementUtils;
|
| | |
|
| | | import java.lang.reflect.Method;
|
| | | import java.util.Collections;
|
| | | import java.util.Map;
|
| | | import java.util.Set;
|
| | | import java.util.concurrent.ConcurrentHashMap;
|
| | | import java.util.stream.Collectors;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | public class CanalListenerAnnotationBeanPostProcessor implements
|
| | | BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware {
|
| | |
|
| | | private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class);
|
| | |
|
| | | private final Set<Class<?>> notAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(256));
|
| | | private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>());
|
| | | private ConfigurableListableBeanFactory configurableListableBeanFactory;
|
| | | private CanalAutoConfigurationProperties canalAutoConfigurationProperties;
|
| | | private ApplicationContext applicationContext;
|
| | |
|
| | | @Override
|
| | | public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
|
| | | if (notAnnotatedClasses.contains(bean.getClass())) return bean;
|
| | | Class<?> targetClass = AopUtils.getTargetClass(bean);
|
| | | // 只扫描类的方法,目前 CanalListener 只支持在方法上
|
| | | Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
|
| | | (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method));
|
| | | if (annotatedMethods.isEmpty()) {
|
| | | this.notAnnotatedClasses.add(bean.getClass());
|
| | | } else {
|
| | | // 先加入到待注册里面
|
| | | annotatedMethods.entrySet().stream()
|
| | | .filter(e -> e != null)
|
| | | .forEach(ele -> registrars.add(createRegistrar(bean, ele)));
|
| | | logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName,
|
| | | annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet()));
|
| | | }
|
| | | return bean;
|
| | | }
|
| | |
|
| | |
|
| | | private CanalListener findListenerAnnotations(Method method) {
|
| | | return AnnotatedElementUtils.findMergedAnnotation(method, CanalListener.class);
|
| | | }
|
| | |
|
| | |
|
| | | @Override
|
| | | public void afterSingletonsInstantiated() {
|
| | | canalAutoConfigurationProperties = configurableListableBeanFactory.getBean(CanalAutoConfigurationProperties.class);
|
| | | TransponderContainerFactory.registerListenerContainer(configurableListableBeanFactory, canalAutoConfigurationProperties, registrars);
|
| | | }
|
| | |
|
| | | @Override
|
| | | public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
|
| | | this.configurableListableBeanFactory = configurableListableBeanFactory;
|
| | | }
|
| | |
|
| | | @Override
|
| | | public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
| | | this.applicationContext = applicationContext;
|
| | | }
|
| | |
|
| | | private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) {
|
| | | return new CanalListenerEndpointRegistrar(bean, entry.getKey(),
|
| | | applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()),
|
| | | applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()),
|
| | | entry.getValue().table(), entry.getValue().eventType());
|
| | | }
|
| | | }
|
| | | package com.hz.canal.starter.configuration; |
| | | |
| | | |
| | | import com.hz.canal.starter.annotation.CanalListener; |
| | | import com.hz.canal.starter.factory.TransponderContainerFactory; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.aop.support.AopUtils; |
| | | import org.springframework.beans.BeansException; |
| | | import org.springframework.beans.factory.SmartInitializingSingleton; |
| | | import org.springframework.beans.factory.config.BeanFactoryPostProcessor; |
| | | import org.springframework.beans.factory.config.BeanPostProcessor; |
| | | import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; |
| | | import org.springframework.context.ApplicationContext; |
| | | import org.springframework.context.ApplicationContextAware; |
| | | import org.springframework.core.MethodIntrospector; |
| | | import org.springframework.core.annotation.AnnotatedElementUtils; |
| | | |
| | | import java.lang.reflect.Method; |
| | | import java.util.Collections; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | public class CanalListenerAnnotationBeanPostProcessor implements |
| | | BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware { |
| | | |
| | | private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class); |
| | | |
| | | private final Set<Class<?>> notAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(256)); |
| | | private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| | | private ConfigurableListableBeanFactory configurableListableBeanFactory; |
| | | private CanalAutoConfigurationProperties canalAutoConfigurationProperties; |
| | | private ApplicationContext applicationContext; |
| | | |
| | | @Override |
| | | public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { |
| | | if (notAnnotatedClasses.contains(bean.getClass())) return bean; |
| | | Class<?> targetClass = AopUtils.getTargetClass(bean); |
| | | // 只扫描类的方法,目前 CanalListener 只支持在方法上 |
| | | Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass, |
| | | (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method)); |
| | | if (annotatedMethods.isEmpty()) { |
| | | this.notAnnotatedClasses.add(bean.getClass()); |
| | | } else { |
| | | // 先加入到待注册里面 |
| | | annotatedMethods.entrySet().stream() |
| | | .filter(e -> e != null) |
| | | .forEach(ele -> registrars.add(createRegistrar(bean, ele))); |
| | | logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName, |
| | | annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet())); |
| | | } |
| | | return bean; |
| | | } |
| | | |
| | | |
| | | private CanalListener findListenerAnnotations(Method method) { |
| | | return AnnotatedElementUtils.findMergedAnnotation(method, CanalListener.class); |
| | | } |
| | | |
| | | |
| | | @Override |
| | | public void afterSingletonsInstantiated() { |
| | | canalAutoConfigurationProperties = configurableListableBeanFactory.getBean(CanalAutoConfigurationProperties.class); |
| | | TransponderContainerFactory.registerListenerContainer(configurableListableBeanFactory, canalAutoConfigurationProperties, registrars); |
| | | } |
| | | |
| | | @Override |
| | | public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { |
| | | this.configurableListableBeanFactory = configurableListableBeanFactory; |
| | | } |
| | | |
| | | @Override |
| | | public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { |
| | | this.applicationContext = applicationContext; |
| | | } |
| | | |
| | | private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { |
| | | return new CanalListenerEndpointRegistrar(bean, entry.getKey(), |
| | | applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()), |
| | | applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()), |
| | | entry.getValue().table(), entry.getValue().eventType()); |
| | | } |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java |
| | |
| | | package com.duxinglangzi.canal.starter.configuration;
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
| | | import com.duxinglangzi.canal.starter.mode.CanalMessage;
|
| | | import org.apache.commons.lang3.StringUtils;
|
| | |
|
| | | import java.lang.reflect.Method;
|
| | | import java.util.Arrays;
|
| | | import java.util.List;
|
| | | import java.util.Set;
|
| | | import java.util.function.Predicate;
|
| | | import java.util.stream.Collectors;
|
| | |
|
| | | /**
|
| | | * 监听的终端注册器
|
| | | *
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | public class CanalListenerEndpointRegistrar {
|
| | |
|
| | | private Object bean;
|
| | | private Method method;
|
| | |
|
| | | /**
|
| | | * 如果未进行配置,则使用配置文件里全部 destination
|
| | | */
|
| | | private String destination;
|
| | |
|
| | | /**
|
| | | * 数据库名
|
| | | */
|
| | | private String database;
|
| | |
|
| | | /**
|
| | | * 数据表名
|
| | | */
|
| | | private String[] table;
|
| | |
|
| | | /**
|
| | | * 数据变动类型,此处请注意,默认不包含 DDL
|
| | | */
|
| | | private CanalEntry.EventType[] eventType;
|
| | |
|
| | | /**
|
| | | * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p>
|
| | | * 2、方法参数必须为: CanalMessage <p>
|
| | | * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p>
|
| | | *
|
| | | * @param sets
|
| | | * @return void
|
| | | * @author wuqiong 2022-04-23 20:27
|
| | | */
|
| | | public void checkParameter(Set<String> sets) {
|
| | | List<Class<?>> classes = parameterTypes();
|
| | | if (classes.size() != 1 || classes.get(0) != CanalMessage.class)
|
| | | throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " +
|
| | | "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check ");
|
| | | if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination()))
|
| | | throw new CanalClientException("@CanalListener Illegal destination " + getDestination() + ", please check ");
|
| | |
|
| | | }
|
| | |
|
| | |
|
| | | public List<Class<?>> parameterTypes() {
|
| | | return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList());
|
| | | }
|
| | |
|
| | | public boolean isContainDestination(String destination) {
|
| | | if (StringUtils.isBlank(getDestination())) return true;
|
| | | return getDestination().equals(destination);
|
| | | }
|
| | |
|
| | | /**
|
| | | * 过滤参数
|
| | | *
|
| | | * @param database
|
| | | * @param tableName
|
| | | * @param eventType
|
| | | * @return java.util.function.Predicate
|
| | | * @author wuqiong 2022-04-23 20:47
|
| | | */
|
| | | public static Predicate<CanalListenerEndpointRegistrar> filterArgs(
|
| | | String database, String tableName, CanalEntry.EventType eventType) {
|
| | | Predicate<CanalListenerEndpointRegistrar> databases =
|
| | | e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database);
|
| | | Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0
|
| | | || (e.getTable().length == 1 && "".equals(e.getTable()[0]))
|
| | | || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName));
|
| | | Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0
|
| | | || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType);
|
| | | return databases.and(table).and(eventTypes);
|
| | | }
|
| | |
|
| | |
|
| | | public CanalListenerEndpointRegistrar(
|
| | | Object bean, Method method, String destination,
|
| | | String database, String[] tables, CanalEntry.EventType[] eventTypes) {
|
| | | this.bean = bean;
|
| | | this.method = method;
|
| | | this.destination = destination;
|
| | | this.database = database;
|
| | | this.table = tables;
|
| | | this.eventType = eventTypes;
|
| | | }
|
| | |
|
| | | public Object getBean() {
|
| | | return bean;
|
| | | }
|
| | |
|
| | | public Method getMethod() {
|
| | | return method;
|
| | | }
|
| | |
|
| | | public String getDestination() {
|
| | | return destination;
|
| | | }
|
| | |
|
| | | public String getDatabase() {
|
| | | return database;
|
| | | }
|
| | |
|
| | | public String[] getTable() {
|
| | | return table;
|
| | | }
|
| | |
|
| | | public CanalEntry.EventType[] getEventType() {
|
| | | return eventType;
|
| | | }
|
| | | }
|
| | | package com.hz.canal.starter.configuration; |
| | | |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | import com.alibaba.otter.canal.protocol.exception.CanalClientException; |
| | | import com.hz.canal.starter.mode.CanalMessage; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | |
| | | import java.lang.reflect.Method; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.function.Predicate; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * 监听的终端注册器 |
| | | * |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | public class CanalListenerEndpointRegistrar { |
| | | |
| | | private Object bean; |
| | | private Method method; |
| | | |
| | | /** |
| | | * 如果未进行配置,则使用配置文件里全部 destination |
| | | */ |
| | | private String destination; |
| | | |
| | | /** |
| | | * 数据库名 |
| | | */ |
| | | private String database; |
| | | |
| | | /** |
| | | * 数据表名 |
| | | */ |
| | | private String[] table; |
| | | |
| | | /** |
| | | * 数据变动类型,此处请注意,默认不包含 DDL |
| | | */ |
| | | private CanalEntry.EventType[] eventType; |
| | | |
| | | /** |
| | | * 1、目前实现的 DML 解析器仅支持1个参数, 该参数对象内包含了: 库名、表名、事件类型、变更的数据 <p> |
| | | * 2、方法参数必须为: CanalMessage <p> |
| | | * 3、如果CanalListener 指定的 destination 不在配置文件内,则直接抛错 <p> |
| | | * |
| | | * @param sets |
| | | * @return void |
| | | * @author wuqiong 2022-04-23 20:27 |
| | | */ |
| | | public void checkParameter(Set<String> sets) { |
| | | List<Class<?>> classes = parameterTypes(); |
| | | if (classes.size() != 1 || classes.get(0) != CanalMessage.class) |
| | | throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + |
| | | "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check "); |
| | | if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) |
| | | throw new CanalClientException("@CanalListener Illegal destination " + getDestination() + ", please check "); |
| | | |
| | | } |
| | | |
| | | |
| | | public List<Class<?>> parameterTypes() { |
| | | return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList()); |
| | | } |
| | | |
| | | public boolean isContainDestination(String destination) { |
| | | if (StringUtils.isBlank(getDestination())) return true; |
| | | return getDestination().equals(destination); |
| | | } |
| | | |
| | | /** |
| | | * 过滤参数 |
| | | * |
| | | * @param database |
| | | * @param tableName |
| | | * @param eventType |
| | | * @return java.util.function.Predicate |
| | | * @author wuqiong 2022-04-23 20:47 |
| | | */ |
| | | public static Predicate<CanalListenerEndpointRegistrar> filterArgs( |
| | | String database, String tableName, CanalEntry.EventType eventType) { |
| | | Predicate<CanalListenerEndpointRegistrar> databases = |
| | | e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database); |
| | | Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0 |
| | | || (e.getTable().length == 1 && "".equals(e.getTable()[0])) |
| | | || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName)); |
| | | Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0 |
| | | || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType); |
| | | return databases.and(table).and(eventTypes); |
| | | } |
| | | |
| | | |
| | | public CanalListenerEndpointRegistrar( |
| | | Object bean, Method method, String destination, |
| | | String database, String[] tables, CanalEntry.EventType[] eventTypes) { |
| | | this.bean = bean; |
| | | this.method = method; |
| | | this.destination = destination; |
| | | this.database = database; |
| | | this.table = tables; |
| | | this.eventType = eventTypes; |
| | | } |
| | | |
| | | public Object getBean() { |
| | | return bean; |
| | | } |
| | | |
| | | public Method getMethod() { |
| | | return method; |
| | | } |
| | | |
| | | public String getDestination() { |
| | | return destination; |
| | | } |
| | | |
| | | public String getDatabase() { |
| | | return database; |
| | | } |
| | | |
| | | public String[] getTable() { |
| | | return table; |
| | | } |
| | | |
| | | public CanalEntry.EventType[] getEventType() { |
| | | return eventType; |
| | | } |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java |
| | |
| | | package com.duxinglangzi.canal.starter.container;
|
| | |
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener;
|
| | | import org.springframework.context.SmartLifecycle;
|
| | |
|
| | | import java.util.Arrays;
|
| | | import java.util.List;
|
| | | import java.util.concurrent.TimeUnit;
|
| | |
|
| | | /**
|
| | | * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理
|
| | | *
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
|
| | | protected boolean isRunning = false;
|
| | | protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L;
|
| | | protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES =
|
| | | Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN,
|
| | | CanalEntry.EntryType.TRANSACTIONEND,
|
| | | CanalEntry.EntryType.HEARTBEAT);
|
| | |
|
| | | protected abstract void doStart();
|
| | |
|
| | | protected abstract void initConnect();
|
| | |
|
| | | protected abstract void disconnect();
|
| | |
|
| | |
|
| | | @Override
|
| | | public void start() {
|
| | | new Thread(() -> {
|
| | | // spring 启动后 才会进行canal数据拉取
|
| | | while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
|
| | | sleep(5L * SLEEP_TIME_MILLI_SECONDS);
|
| | | initConnect();
|
| | | while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
|
| | | disconnect(); // 线程被终止或者容器已经停止,需要关闭连接
|
| | | }).start();
|
| | | setRunning(true);
|
| | | }
|
| | |
|
| | | @Override
|
| | | public void stop() {
|
| | | setRunning(false);
|
| | | }
|
| | |
|
| | | @Override
|
| | | public void stop(Runnable callback) {
|
| | | callback.run();
|
| | | setRunning(false);
|
| | | sleep(SLEEP_TIME_MILLI_SECONDS);
|
| | | }
|
| | |
|
| | | @Override
|
| | | public boolean isRunning() {
|
| | | return isRunning;
|
| | | }
|
| | |
|
| | | protected void setRunning(boolean bool) {
|
| | | isRunning = bool;
|
| | | }
|
| | |
|
| | | protected void sleep(long sleepTimeMilliSeconds) {
|
| | | try {
|
| | | TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds);
|
| | | if (!isRunning()) Thread.currentThread().interrupt();
|
| | | } catch (InterruptedException e) {
|
| | | }
|
| | | }
|
| | |
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.container; |
| | | |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | import com.hz.canal.starter.listener.ApplicationReadyListener; |
| | | import org.springframework.context.SmartLifecycle; |
| | | |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理 |
| | | * |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | public abstract class AbstractCanalTransponderContainer implements SmartLifecycle { |
| | | protected boolean isRunning = false; |
| | | protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L; |
| | | protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES = |
| | | Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, |
| | | CanalEntry.EntryType.TRANSACTIONEND, |
| | | CanalEntry.EntryType.HEARTBEAT); |
| | | |
| | | protected abstract void doStart(); |
| | | |
| | | protected abstract void initConnect(); |
| | | |
| | | protected abstract void disconnect(); |
| | | |
| | | |
| | | @Override |
| | | public void start() { |
| | | new Thread(() -> { |
| | | // spring 启动后 才会进行canal数据拉取 |
| | | while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get()) |
| | | sleep(5L * SLEEP_TIME_MILLI_SECONDS); |
| | | initConnect(); |
| | | while (isRunning() && !Thread.currentThread().isInterrupted()) doStart(); |
| | | disconnect(); // 线程被终止或者容器已经停止,需要关闭连接 |
| | | }).start(); |
| | | setRunning(true); |
| | | } |
| | | |
| | | @Override |
| | | public void stop() { |
| | | setRunning(false); |
| | | } |
| | | |
| | | @Override |
| | | public void stop(Runnable callback) { |
| | | callback.run(); |
| | | setRunning(false); |
| | | sleep(SLEEP_TIME_MILLI_SECONDS); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isRunning() { |
| | | return isRunning; |
| | | } |
| | | |
| | | protected void setRunning(boolean bool) { |
| | | isRunning = bool; |
| | | } |
| | | |
| | | protected void sleep(long sleepTimeMilliSeconds) { |
| | | try { |
| | | TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds); |
| | | if (!isRunning()) Thread.currentThread().interrupt(); |
| | | } catch (InterruptedException e) { |
| | | } |
| | | } |
| | | |
| | | |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java |
| | |
| | | package com.duxinglangzi.canal.starter.container;
|
| | |
|
| | | import com.alibaba.otter.canal.client.CanalConnector;
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import com.alibaba.otter.canal.protocol.Message;
|
| | | import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
|
| | | import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
|
| | | import com.duxinglangzi.canal.starter.mode.CanalMessage;
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | |
|
| | | import java.lang.reflect.InvocationTargetException;
|
| | | import java.util.*;
|
| | |
|
| | | /**
|
| | | * DML 数据拉取、解析
|
| | | *
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
|
| | |
|
| | | private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
|
| | |
|
| | | private CanalConnector connector;
|
| | | private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
|
| | | private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
|
| | | private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
|
| | | private Integer local_retry_count;
|
| | |
|
| | |
|
| | | public void initConnect() {
|
| | | try {
|
| | | // init supportAllTypes
|
| | | registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
|
| | | connector.connect();
|
| | | connector.subscribe(endpointInstance.getSubscribe());
|
| | | connector.rollback();
|
| | | // 初始化本地重试次数
|
| | | local_retry_count = endpointInstance.getRetryCount();
|
| | | } catch (Exception e) {
|
| | | logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
|
| | | setRunning(false);
|
| | | }
|
| | |
|
| | | }
|
| | |
|
| | | public void disconnect() {
|
| | | // 关闭连接
|
| | | connector.disconnect();
|
| | | }
|
| | |
|
| | |
|
| | | public void doStart() {
|
| | | try {
|
| | | Message message = null;
|
| | | try {
|
| | | message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
|
| | | } catch (Exception clientException) {
|
| | | logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
|
| | | if (local_retry_count > 0) {
|
| | | // 重试次数减一
|
| | | local_retry_count = local_retry_count - 1;
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | } else {
|
| | | // 重试次数 <= 0 时,直接终止线程
|
| | | logger.error("[DmlMessageTransponderContainer] retry count is zero , " +
|
| | | "thread interrupt , current connector host: {} , port: {} ",
|
| | | endpointInstance.getHost(), endpointInstance.getPort());
|
| | | Thread.currentThread().interrupt();
|
| | | }
|
| | | return;
|
| | | }
|
| | | // 如果重试次数小于设置的,则修改
|
| | | if (local_retry_count < endpointInstance.getRetryCount())
|
| | | local_retry_count = endpointInstance.getRetryCount();
|
| | | List<CanalEntry.Entry> entries = message.getEntries();
|
| | | if (message.getId() == -1 || entries.isEmpty()) {
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | return;
|
| | | }
|
| | | for (CanalEntry.Entry entry : entries) {
|
| | | try {
|
| | | consumer(entry);
|
| | | } catch (Exception e) {
|
| | | e.printStackTrace();
|
| | | logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
|
| | | // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
|
| | | // return;
|
| | | }
|
| | | }
|
| | | connector.ack(message.getId()); // 提交确认
|
| | | } catch (Exception exc) {
|
| | | logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
|
| | | // 防止删除消息时发生错误,或者拉取消息失败等情况
|
| | | exc.printStackTrace();
|
| | | }
|
| | | }
|
| | |
|
| | | public DmlMessageTransponderContainer(
|
| | | CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
|
| | | CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
|
| | | this.connector = connector;
|
| | | this.registrars.addAll(registrars);
|
| | | this.endpointInstance = endpointInstance;
|
| | | }
|
| | |
|
| | | private void consumer(CanalEntry.Entry entry) {
|
| | | if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
|
| | | CanalEntry.RowChange rowChange = null;
|
| | | try {
|
| | | rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
| | | } catch (Exception e) {
|
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
|
| | | throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
|
| | | }
|
| | |
|
| | | // 忽略 ddl 语句
|
| | | if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
|
| | | CanalEntry.EventType eventType = rowChange.getEventType();
|
| | | if (!support_all_types.contains(eventType)) return;
|
| | | for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
| | | registrars
|
| | | .stream()
|
| | | .filter(CanalListenerEndpointRegistrar.filterArgs(
|
| | | entry.getHeader().getSchemaName(),
|
| | | entry.getHeader().getTableName(),
|
| | | eventType))
|
| | | .forEach(element -> {
|
| | | try {
|
| | | element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
|
| | | } catch (IllegalAccessException | InvocationTargetException e) {
|
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
|
| | | throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
|
| | | }
|
| | | });
|
| | | }
|
| | | }
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.container; |
| | | |
| | | import com.alibaba.otter.canal.client.CanalConnector; |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | import com.alibaba.otter.canal.protocol.Message; |
| | | import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; |
| | | import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; |
| | | import com.hz.canal.starter.mode.CanalMessage; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | |
| | | import java.lang.reflect.InvocationTargetException; |
| | | import java.util.*; |
| | | |
| | | /** |
| | | * DML 数据拉取、解析 |
| | | * |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); |
| | | |
| | | private CanalConnector connector; |
| | | private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; |
| | | private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); |
| | | private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); |
| | | private Integer local_retry_count; |
| | | |
| | | |
| | | public void initConnect() { |
| | | try { |
| | | // init supportAllTypes |
| | | registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); |
| | | connector.connect(); |
| | | connector.subscribe(endpointInstance.getSubscribe()); |
| | | connector.rollback(); |
| | | // 初始化本地重试次数 |
| | | local_retry_count = endpointInstance.getRetryCount(); |
| | | } catch (Exception e) { |
| | | logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); |
| | | setRunning(false); |
| | | } |
| | | |
| | | } |
| | | |
| | | public void disconnect() { |
| | | // 关闭连接 |
| | | connector.disconnect(); |
| | | } |
| | | |
| | | |
| | | public void doStart() { |
| | | try { |
| | | Message message = null; |
| | | try { |
| | | message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据 |
| | | } catch (Exception clientException) { |
| | | logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); |
| | | if (local_retry_count > 0) { |
| | | // 重试次数减一 |
| | | local_retry_count = local_retry_count - 1; |
| | | sleep(endpointInstance.getAcquireInterval()); |
| | | } else { |
| | | // 重试次数 <= 0 时,直接终止线程 |
| | | logger.error("[DmlMessageTransponderContainer] retry count is zero , " + |
| | | "thread interrupt , current connector host: {} , port: {} ", |
| | | endpointInstance.getHost(), endpointInstance.getPort()); |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | return; |
| | | } |
| | | // 如果重试次数小于设置的,则修改 |
| | | if (local_retry_count < endpointInstance.getRetryCount()) |
| | | local_retry_count = endpointInstance.getRetryCount(); |
| | | List<CanalEntry.Entry> entries = message.getEntries(); |
| | | if (message.getId() == -1 || entries.isEmpty()) { |
| | | sleep(endpointInstance.getAcquireInterval()); |
| | | return; |
| | | } |
| | | for (CanalEntry.Entry entry : entries) { |
| | | try { |
| | | consumer(entry); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); |
| | | // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据 |
| | | // return; |
| | | } |
| | | } |
| | | connector.ack(message.getId()); // 提交确认 |
| | | } catch (Exception exc) { |
| | | logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); |
| | | // 防止删除消息时发生错误,或者拉取消息失败等情况 |
| | | exc.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | public DmlMessageTransponderContainer( |
| | | CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars, |
| | | CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { |
| | | this.connector = connector; |
| | | this.registrars.addAll(registrars); |
| | | this.endpointInstance = endpointInstance; |
| | | } |
| | | |
| | | private void consumer(CanalEntry.Entry entry) { |
| | | if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; |
| | | CanalEntry.RowChange rowChange = null; |
| | | try { |
| | | rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); |
| | | } catch (Exception e) { |
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); |
| | | throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); |
| | | } |
| | | |
| | | // 忽略 ddl 语句 |
| | | if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; |
| | | CanalEntry.EventType eventType = rowChange.getEventType(); |
| | | if (!support_all_types.contains(eventType)) return; |
| | | for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { |
| | | registrars |
| | | .stream() |
| | | .filter(CanalListenerEndpointRegistrar.filterArgs( |
| | | entry.getHeader().getSchemaName(), |
| | | entry.getHeader().getTableName(), |
| | | eventType)) |
| | | .forEach(element -> { |
| | | try { |
| | | element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); |
| | | } catch (IllegalAccessException | InvocationTargetException e) { |
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); |
| | | throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/factory/CanalConnectorFactory.java |
| | |
| | | package com.duxinglangzi.canal.starter.factory;
|
| | |
|
| | | import com.alibaba.otter.canal.client.CanalConnector;
|
| | | import com.alibaba.otter.canal.client.CanalConnectors;
|
| | | import com.alibaba.otter.canal.protocol.exception.CanalClientException;
|
| | | import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
|
| | | import org.springframework.util.Assert;
|
| | | import org.springframework.util.StringUtils;
|
| | |
|
| | | import java.net.InetSocketAddress;
|
| | | import java.net.SocketAddress;
|
| | | import java.util.ArrayList;
|
| | | import java.util.List;
|
| | |
|
| | |
|
| | | public class CanalConnectorFactory {
|
| | |
|
| | | /**
|
| | | * 创建 CanalConnector
|
| | | *
|
| | | * @param destination
|
| | | * @param endpointInstance
|
| | | * @return CanalConnector
|
| | | * @author wuqiong 2022-04-23 20:36
|
| | | */
|
| | | public static synchronized CanalConnector createConnector(
|
| | | String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
|
| | | Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check ");
|
| | | Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check ");
|
| | | CanalConnector connector;
|
| | | if (endpointInstance.isClusterEnabled()) {
|
| | | if (!StringUtils.hasText(endpointInstance.getZookeeperAddress()))
|
| | | throw new CanalClientException("zookeeper address is null");
|
| | | List<SocketAddress> addresses = new ArrayList<>();
|
| | | for (String s : endpointInstance.getZookeeperAddress().split(",")) {
|
| | | String[] split = s.split(":");
|
| | | if (split.length != 2)
|
| | | throw new CanalClientException("error parsing zookeeper address:" + s);
|
| | | addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1])));
|
| | | }
|
| | | connector = CanalConnectors.newClusterConnector(
|
| | | addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword());
|
| | | } else {
|
| | | connector = CanalConnectors.newSingleConnector(
|
| | | new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()),
|
| | | destination, endpointInstance.getUserName(), endpointInstance.getPassword());
|
| | | }
|
| | | return connector;
|
| | | }
|
| | | }
|
| | | package com.hz.canal.starter.factory; |
| | | |
| | | import com.alibaba.otter.canal.client.CanalConnector; |
| | | import com.alibaba.otter.canal.client.CanalConnectors; |
| | | import com.alibaba.otter.canal.protocol.exception.CanalClientException; |
| | | import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; |
| | | import org.springframework.util.Assert; |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.net.InetSocketAddress; |
| | | import java.net.SocketAddress; |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | |
| | | public class CanalConnectorFactory { |
| | | |
| | | /** |
| | | * 创建 CanalConnector |
| | | * |
| | | * @param destination |
| | | * @param endpointInstance |
| | | * @return CanalConnector |
| | | * @author wuqiong 2022-04-23 20:36 |
| | | */ |
| | | public static synchronized CanalConnector createConnector( |
| | | String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { |
| | | Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check "); |
| | | Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check "); |
| | | CanalConnector connector; |
| | | if (endpointInstance.isClusterEnabled()) { |
| | | if (!StringUtils.hasText(endpointInstance.getZookeeperAddress())) |
| | | throw new CanalClientException("zookeeper address is null"); |
| | | List<SocketAddress> addresses = new ArrayList<>(); |
| | | for (String s : endpointInstance.getZookeeperAddress().split(",")) { |
| | | String[] split = s.split(":"); |
| | | if (split.length != 2) |
| | | throw new CanalClientException("error parsing zookeeper address:" + s); |
| | | addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1]))); |
| | | } |
| | | connector = CanalConnectors.newClusterConnector( |
| | | addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword()); |
| | | } else { |
| | | connector = CanalConnectors.newSingleConnector( |
| | | new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()), |
| | | destination, endpointInstance.getUserName(), endpointInstance.getPassword()); |
| | | } |
| | | return connector; |
| | | } |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/factory/TransponderContainerFactory.java |
| | |
| | | package com.duxinglangzi.canal.starter.factory;
|
| | |
|
| | | import com.alibaba.otter.canal.client.CanalConnector;
|
| | | import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
|
| | | import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
|
| | | import com.duxinglangzi.canal.starter.container.DmlMessageTransponderContainer;
|
| | | import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
| | |
|
| | | import java.util.ArrayList;
|
| | | import java.util.List;
|
| | | import java.util.Map;
|
| | | import java.util.Set;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/18
|
| | | * @description
|
| | | */
|
| | | public class TransponderContainerFactory {
|
| | |
|
| | | private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#";
|
| | |
|
| | |
|
| | | /**
|
| | | * 将所有待注册的端点,注册到spring中
|
| | | *
|
| | | * @param beanFactory
|
| | | * @param canalConfig
|
| | | * @param registrars
|
| | | * @return void
|
| | | * @author wuqiong 2022-04-23 20:34
|
| | | */
|
| | | public static void registerListenerContainer(
|
| | | ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig,
|
| | | Set<CanalListenerEndpointRegistrar> registrars) {
|
| | | if (registrars == null || registrars.isEmpty()) return;
|
| | | if (canalConfig == null || canalConfig.getInstances().isEmpty()) return;
|
| | |
|
| | | for (Map.Entry<String, CanalAutoConfigurationProperties.EndpointInstance> endpointInstance : canalConfig.getInstances().entrySet()) {
|
| | | if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 如果已经存在则不在创建
|
| | | List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>();
|
| | | for (CanalListenerEndpointRegistrar registrar : registrars) {
|
| | | registrar.checkParameter(canalConfig.getInstances().keySet());
|
| | | if (!registrar.isContainDestination(endpointInstance.getKey())) continue;
|
| | | registrarList.add(registrar);
|
| | | }
|
| | | if (registrarList.isEmpty()) continue;
|
| | | registerTransponderContainer(
|
| | | endpointInstance.getKey(), endpointInstance.getValue(), beanFactory, registrarList);
|
| | | }
|
| | |
|
| | | }
|
| | |
|
| | | private static void registerTransponderContainer(
|
| | | String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance,
|
| | | ConfigurableListableBeanFactory beanFactory, List<CanalListenerEndpointRegistrar> registrarList) {
|
| | | CanalConnector connector = CanalConnectorFactory.createConnector(destination, endpointInstance);
|
| | | beanFactory.registerSingleton(getContainerID(destination),
|
| | | new DmlMessageTransponderContainer(connector, registrarList, endpointInstance));
|
| | | }
|
| | |
|
| | |
|
| | | private static String getContainerID(String destination) {
|
| | | return CONTAINER_ID_PREFIX + "#" + destination;
|
| | | }
|
| | |
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.factory; |
| | | |
| | | import com.alibaba.otter.canal.client.CanalConnector; |
| | | import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; |
| | | import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; |
| | | import com.hz.canal.starter.container.DmlMessageTransponderContainer; |
| | | import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/18 |
| | | * @description |
| | | */ |
| | | public class TransponderContainerFactory { |
| | | |
| | | private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#"; |
| | | |
| | | |
| | | /** |
| | | * 将所有待注册的端点,注册到spring中 |
| | | * |
| | | * @param beanFactory |
| | | * @param canalConfig |
| | | * @param registrars |
| | | * @return void |
| | | * @author wuqiong 2022-04-23 20:34 |
| | | */ |
| | | public static void registerListenerContainer( |
| | | ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig, |
| | | Set<CanalListenerEndpointRegistrar> registrars) { |
| | | if (registrars == null || registrars.isEmpty()) return; |
| | | if (canalConfig == null || canalConfig.getInstances().isEmpty()) return; |
| | | |
| | | for (Map.Entry<String, CanalAutoConfigurationProperties.EndpointInstance> endpointInstance : canalConfig.getInstances().entrySet()) { |
| | | if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 如果已经存在则不在创建 |
| | | List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>(); |
| | | for (CanalListenerEndpointRegistrar registrar : registrars) { |
| | | registrar.checkParameter(canalConfig.getInstances().keySet()); |
| | | if (!registrar.isContainDestination(endpointInstance.getKey())) continue; |
| | | registrarList.add(registrar); |
| | | } |
| | | if (registrarList.isEmpty()) continue; |
| | | registerTransponderContainer( |
| | | endpointInstance.getKey(), endpointInstance.getValue(), beanFactory, registrarList); |
| | | } |
| | | |
| | | } |
| | | |
| | | private static void registerTransponderContainer( |
| | | String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance, |
| | | ConfigurableListableBeanFactory beanFactory, List<CanalListenerEndpointRegistrar> registrarList) { |
| | | CanalConnector connector = CanalConnectorFactory.createConnector(destination, endpointInstance); |
| | | beanFactory.registerSingleton(getContainerID(destination), |
| | | new DmlMessageTransponderContainer(connector, registrarList, endpointInstance)); |
| | | } |
| | | |
| | | |
| | | private static String getContainerID(String destination) { |
| | | return CONTAINER_ID_PREFIX + "#" + destination; |
| | | } |
| | | |
| | | |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/listener/ApplicationReadyListener.java |
| | |
| | | package com.duxinglangzi.canal.starter.listener;
|
| | |
|
| | | import org.springframework.boot.context.event.ApplicationReadyEvent;
|
| | | import org.springframework.context.ApplicationListener;
|
| | |
|
| | | import java.util.concurrent.atomic.AtomicBoolean;
|
| | |
|
| | | /**
|
| | | * @author wuqiong 2022/4/16
|
| | | */
|
| | | public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent> {
|
| | |
|
| | | public static final AtomicBoolean START_LISTENER_CONTAINER = new AtomicBoolean(false);
|
| | |
|
| | | @Override
|
| | | public void onApplicationEvent(ApplicationReadyEvent event) {
|
| | | // 确保程序启动之后,再放行所有的 canal transponder
|
| | | if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true);
|
| | | }
|
| | | }
|
| | | package com.hz.canal.starter.listener; |
| | | |
| | | import org.springframework.boot.context.event.ApplicationReadyEvent; |
| | | import org.springframework.context.ApplicationListener; |
| | | |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | /** |
| | | * @author wuqiong 2022/4/16 |
| | | */ |
| | | public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent> { |
| | | |
| | | public static final AtomicBoolean START_LISTENER_CONTAINER = new AtomicBoolean(false); |
| | | |
| | | @Override |
| | | public void onApplicationEvent(ApplicationReadyEvent event) { |
| | | // 确保程序启动之后,再放行所有的 canal transponder |
| | | if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true); |
| | | } |
| | | } |
File was renamed from src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java |
| | |
| | | package com.duxinglangzi.canal.starter.mode; |
| | | package com.hz.canal.starter.mode; |
| | | |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | |
| | |
| | | {
|
| | | "properties": [
|
| | | {
|
| | | "sourceType": "com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties",
|
| | | "sourceType": "com.hz.canal.starter.configuration.CanalAutoConfigurationProperties",
|
| | | "name": "spring.canal.instances",
|
| | | "type": "java.util.Map<java.lang.String,com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties.EndpointInstance>",
|
| | | "type": "java.util.Map<java.lang.String,com.hz.canal.starter.configuration.CanalAutoConfigurationProperties.EndpointInstance>",
|
| | | "description": "canal config "
|
| | | }
|
| | | ]
|
| | |
| | | |
| | | # Auto Configure |
| | | # org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ |
| | | # com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\ |
| | | # com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector |
| | | # CanalAutoConfigurationProperties,\ |
| | | # CanalConfigurationSelector |
| | | |
| | | # Application Listeners |
| | | # org.springframework.context.ApplicationListener=\ |
| | | # com.duxinglangzi.canal.starter.listener.ApplicationReadyListener |
| | | # ApplicationReadyListener |