From 0b055a3f554da3a934e79e88c4781705cbab5a21 Mon Sep 17 00:00:00 2001 From: renxue <auster_i@163.com> Date: 星期一, 24 十月 2022 11:40:26 +0800 Subject: [PATCH] 修改包下的类名 --- src/main/java/com/hz/canal/starter/configuration/CanalBootstrapConfiguration.java | 70 +- src/main/java/com/hz/canal/starter/annotation/CanalDeleteListener.java | 58 src/main/resources/META-INF/spring-configuration-metadata.json | 4 src/main/java/com/hz/canal/starter/listener/ApplicationReadyListener.java | 40 src/main/java/com/hz/canal/starter/factory/CanalConnectorFactory.java | 100 +- src/main/java/com/hz/canal/starter/annotation/CanalUpdateListener.java | 58 src/main/java/com/hz/canal/starter/factory/TransponderContainerFactory.java | 134 ++-- src/main/java/com/hz/canal/starter/configuration/CanalAutoConfigurationProperties.java | 354 +++++----- src/main/java/com/hz/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java | 176 ++-- src/main/java/com/hz/canal/starter/container/AbstractCanalTransponderContainer.java | 148 ++-- src/main/java/com/hz/canal/starter/configuration/CanalListenerEndpointRegistrar.java | 260 +++--- README.md | 16 /dev/null | 21 src/main/java/com/hz/canal/starter/annotation/CanalInsertListener.java | 58 src/main/java/com/hz/canal/starter/configuration/CanalConfigurationSelector.java | 44 src/main/resources/META-INF/spring.factories | 6 src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java | 278 ++++---- src/main/java/com/hz/canal/starter/mode/CanalMessage.java | 2 src/main/java/com/hz/canal/starter/annotation/CanalListener.java | 82 +- src/main/java/com/hz/canal/starter/annotation/EnableCanalListener.java | 21 20 files changed, 965 insertions(+), 965 deletions(-) diff --git a/README.md b/README.md index 2877874..94b83cb 100644 --- a/README.md +++ b/README.md @@ -28,11 +28,11 @@ import com.alibaba.otter.canal.protocol.CanalEntry; -import com.duxinglangzi.canal.starter.annotation.CanalInsertListener; -import com.duxinglangzi.canal.starter.annotation.CanalListener; -import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener; -import com.duxinglangzi.canal.starter.annotation.EnableCanalListener; -import com.duxinglangzi.canal.starter.mode.CanalMessage; +import CanalInsertListener; +import CanalListener; +import CanalUpdateListener; +import EnableCanalListener; +import CanalMessage; import org.springframework.stereotype.Service; import java.util.stream.Collectors; @@ -48,7 +48,7 @@ /** * 蹇呴』鍦ㄧ被涓� 浣跨敤 EnableCanalListener 娉ㄨВ鎵嶈兘寮�鍚� canal listener * - * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 com.duxinglangzi.canal.starter.mode.CanalMessage + * 鐩墠 Listener 鏂规硶鐨勫弬鏁板繀椤讳负 CanalMessage * 绋嬪簭鍦ㄥ惎鍔ㄨ繃绋嬩腑浼氬仛妫�鏌� */ @@ -107,9 +107,9 @@ CanalEntry.RowData rowData = message.getRowData(); - System.out.println(" >>>>>>>>>>>>>[褰撳墠鏁版嵁搴�: "+message.getDataBaseName()+" ," + + System.out.println(" >>>>>>>>>>>>>[褰撳墠鏁版嵁搴�: " + message.getDataBaseName() + " ," + "鏁版嵁搴撹〃鍚�: " + message.getTableName() + " , " + - "鏂规硶: " + method ); + "鏂规硶: " + method); if (eventType == CanalEntry.EventType.DELETE) { rowData.getBeforeColumnsList().stream().collect(Collectors.toList()).forEach(ele -> { diff --git a/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java b/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java deleted file mode 100644 index aa8208e..0000000 --- a/src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.duxinglangzi.canal.starter.annotation; - -import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; -import com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector; -import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener; -import org.springframework.context.annotation.Import; - -import java.lang.annotation.*; - -/** - * 寮�鍚� canal listener - * - * @author wuqiong 2022/8/4 - */ -@Documented -@Inherited -@Target(ElementType.TYPE) -@Retention(RetentionPolicy.RUNTIME) -@Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class}) -public @interface EnableCanalListener { -} diff --git a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalDeleteListener.java b/src/main/java/com/hz/canal/starter/annotation/CanalDeleteListener.java similarity index 92% rename from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalDeleteListener.java rename to src/main/java/com/hz/canal/starter/annotation/CanalDeleteListener.java index 8d274f6..302d3b7 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalDeleteListener.java +++ b/src/main/java/com/hz/canal/starter/annotation/CanalDeleteListener.java @@ -1,29 +1,29 @@ -package com.duxinglangzi.canal.starter.annotation; - - -import com.alibaba.otter.canal.protocol.CanalEntry; -import org.springframework.core.annotation.AliasFor; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * @author wuqiong 2022/4/11 - */ -@Target(ElementType.METHOD) -@Retention(RetentionPolicy.RUNTIME) -@CanalListener(eventType = {CanalEntry.EventType.DELETE}) -public @interface CanalDeleteListener { - - @AliasFor(annotation = CanalListener.class) - String destination(); - - @AliasFor(annotation = CanalListener.class) - String database(); - - @AliasFor(annotation = CanalListener.class) - String[] table(); - -} +package com.hz.canal.starter.annotation; + + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author wuqiong 2022/4/11 + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@CanalListener(eventType = {CanalEntry.EventType.DELETE}) +public @interface CanalDeleteListener { + + @AliasFor(annotation = CanalListener.class) + String destination(); + + @AliasFor(annotation = CanalListener.class) + String database(); + + @AliasFor(annotation = CanalListener.class) + String[] table(); + +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalInsertListener.java b/src/main/java/com/hz/canal/starter/annotation/CanalInsertListener.java similarity index 92% rename from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalInsertListener.java rename to src/main/java/com/hz/canal/starter/annotation/CanalInsertListener.java index e3cde56..3001e1d 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalInsertListener.java +++ b/src/main/java/com/hz/canal/starter/annotation/CanalInsertListener.java @@ -1,29 +1,29 @@ -package com.duxinglangzi.canal.starter.annotation; - - -import com.alibaba.otter.canal.protocol.CanalEntry; -import org.springframework.core.annotation.AliasFor; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * @author wuqiong 2022/4/11 - */ -@Target(ElementType.METHOD) -@Retention(RetentionPolicy.RUNTIME) -@CanalListener(eventType = CanalEntry.EventType.INSERT) -public @interface CanalInsertListener { - - @AliasFor(annotation = CanalListener.class) - String destination(); - - @AliasFor(annotation = CanalListener.class) - String database(); - - @AliasFor(annotation = CanalListener.class) - String[] table(); - -} +package com.hz.canal.starter.annotation; + + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author wuqiong 2022/4/11 + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@CanalListener(eventType = CanalEntry.EventType.INSERT) +public @interface CanalInsertListener { + + @AliasFor(annotation = CanalListener.class) + String destination(); + + @AliasFor(annotation = CanalListener.class) + String database(); + + @AliasFor(annotation = CanalListener.class) + String[] table(); + +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalListener.java b/src/main/java/com/hz/canal/starter/annotation/CanalListener.java similarity index 93% rename from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalListener.java rename to src/main/java/com/hz/canal/starter/annotation/CanalListener.java index 6d0e7f9..14b081d 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalListener.java +++ b/src/main/java/com/hz/canal/starter/annotation/CanalListener.java @@ -1,41 +1,41 @@ -package com.duxinglangzi.canal.starter.annotation; - - -import com.alibaba.otter.canal.protocol.CanalEntry; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - - -/** - * @author wuqiong 2022/4/11 - */ -@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) -@Retention(RetentionPolicy.RUNTIME) -public @interface CanalListener { - - - /** - * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination - */ - String destination() default ""; - - /** - * 鏁版嵁搴撳悕 - */ - String database() default ""; - - /** - * 鏁版嵁琛ㄥ悕 - */ - String[] table() default ""; - - /** - * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL - */ - CanalEntry.EventType[] eventType(); - - -} +package com.hz.canal.starter.annotation; + + +import com.alibaba.otter.canal.protocol.CanalEntry; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +/** + * @author wuqiong 2022/4/11 + */ +@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE}) +@Retention(RetentionPolicy.RUNTIME) +public @interface CanalListener { + + + /** + * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination + */ + String destination() default ""; + + /** + * 鏁版嵁搴撳悕 + */ + String database() default ""; + + /** + * 鏁版嵁琛ㄥ悕 + */ + String[] table() default ""; + + /** + * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL + */ + CanalEntry.EventType[] eventType(); + + +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalUpdateListener.java b/src/main/java/com/hz/canal/starter/annotation/CanalUpdateListener.java similarity index 92% rename from src/main/java/com/duxinglangzi/canal/starter/annotation/CanalUpdateListener.java rename to src/main/java/com/hz/canal/starter/annotation/CanalUpdateListener.java index 1032c47..28d4213 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/annotation/CanalUpdateListener.java +++ b/src/main/java/com/hz/canal/starter/annotation/CanalUpdateListener.java @@ -1,29 +1,29 @@ -package com.duxinglangzi.canal.starter.annotation; - - -import com.alibaba.otter.canal.protocol.CanalEntry; -import org.springframework.core.annotation.AliasFor; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * @author wuqiong 2022/4/11 - */ -@Target(ElementType.METHOD) -@Retention(RetentionPolicy.RUNTIME) -@CanalListener(eventType = CanalEntry.EventType.UPDATE) -public @interface CanalUpdateListener { - - @AliasFor(annotation = CanalListener.class) - String destination(); - - @AliasFor(annotation = CanalListener.class) - String database(); - - @AliasFor(annotation = CanalListener.class) - String[] table(); - -} +package com.hz.canal.starter.annotation; + + +import com.alibaba.otter.canal.protocol.CanalEntry; +import org.springframework.core.annotation.AliasFor; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * @author wuqiong 2022/4/11 + */ +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +@CanalListener(eventType = CanalEntry.EventType.UPDATE) +public @interface CanalUpdateListener { + + @AliasFor(annotation = CanalListener.class) + String destination(); + + @AliasFor(annotation = CanalListener.class) + String database(); + + @AliasFor(annotation = CanalListener.class) + String[] table(); + +} diff --git a/src/main/java/com/hz/canal/starter/annotation/EnableCanalListener.java b/src/main/java/com/hz/canal/starter/annotation/EnableCanalListener.java new file mode 100644 index 0000000..871d711 --- /dev/null +++ b/src/main/java/com/hz/canal/starter/annotation/EnableCanalListener.java @@ -0,0 +1,21 @@ +package com.hz.canal.starter.annotation; + +import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; +import com.hz.canal.starter.configuration.CanalConfigurationSelector; +import com.hz.canal.starter.listener.ApplicationReadyListener; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.*; + +/** + * 寮�鍚� canal listener + * + * @author wuqiong 2022/8/4 + */ +@Documented +@Inherited +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class}) +public @interface EnableCanalListener { +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalAutoConfigurationProperties.java b/src/main/java/com/hz/canal/starter/configuration/CanalAutoConfigurationProperties.java similarity index 98% rename from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalAutoConfigurationProperties.java rename to src/main/java/com/hz/canal/starter/configuration/CanalAutoConfigurationProperties.java index 4e1266d..8dea5bc 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalAutoConfigurationProperties.java +++ b/src/main/java/com/hz/canal/starter/configuration/CanalAutoConfigurationProperties.java @@ -1,177 +1,177 @@ -package com.duxinglangzi.canal.starter.configuration; - -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; - -import java.util.LinkedHashMap; -import java.util.Map; - -/** - * Canal杩炴帴鐨勯厤缃被 - * - * @author wuqiong 2022/4/11 - */ -@Order(Ordered.HIGHEST_PRECEDENCE) -@ConfigurationProperties(prefix = "spring.canal") -public class CanalAutoConfigurationProperties { - - private Map<String, EndpointInstance> instances = new LinkedHashMap<>(); - - public static class EndpointInstance { - - /** - * 鏄惁寮�鍚� cluster - */ - private boolean clusterEnabled; - - /** - * zookeeper 鍦板潃, 渚�: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 - */ - private String zookeeperAddress; - - /** - * 榛樿 127.0.0.1 - */ - private String host = "127.0.0.1"; - - /** - * 绔彛 , 榛樿: 11111 - */ - private int port = 11111; - - /** - * 鐢ㄦ埛鍚� - */ - private String userName = ""; - - /** - * 瀵嗙爜 - */ - private String password = ""; - - /** - * 姣忔鑾峰彇鏁版嵁鏉℃暟 , 榛樿: 200 - */ - private int batchSize = 200; - - /** - * 鍙戠敓閿欒鏃堕噸璇曟鏁� , 榛樿: 5 - */ - private int retryCount = 5; - - /** - * mysql 鏁版嵁瑙f瀽鍏虫敞鐨勮〃锛孭erl姝e垯琛ㄨ揪寮�. - * <p> - * <p> - * 澶氫釜姝e垯涔嬮棿浠ラ�楀彿(,)鍒嗛殧锛岃浆涔夌闇�瑕佸弻鏂滄潬(\\) - * <p> - * <p> - * 甯歌渚嬪瓙锛� <p> - * 1. 鎵�鏈夊簱琛細.* or .*\\..* <p> - * 2. canal_db 涓嬫墍鏈夎〃锛� canal_db\\..* <p> - * 3. canal_db 涓嬬殑浠anal鎵撳ご鐨勮〃锛� canal_db\\.canal.* <p> - * 4. canal_db 涓嬬殑涓�寮犺〃锛� canal_db\\.test1 <p> - * 5. 澶氫釜瑙勫垯缁勫悎浣跨敤锛歝anal_db\\..*,mysql_db.test1,mysql.test2 (閫楀彿鍒嗛殧) <p> - * <p> - * 榛樿: 鍏ㄥ簱鍏ㄨ〃(.*\\..*) - */ - private String subscribe = ".*\\..*"; - - /** - * 鏈媺鍙栧埌娑堟伅鎯呭喌涓�,鑾峰彇娑堟伅鐨勬椂闂撮棿闅旀绉掑�� , 榛樿: 1000 - */ - private long acquireInterval = 1000; - - public EndpointInstance() { - } - - public boolean isClusterEnabled() { - return clusterEnabled; - } - - public void setClusterEnabled(boolean clusterEnabled) { - this.clusterEnabled = clusterEnabled; - } - - public String getZookeeperAddress() { - return zookeeperAddress; - } - - public void setZookeeperAddress(String zookeeperAddress) { - this.zookeeperAddress = zookeeperAddress; - } - - public String getHost() { - return host; - } - - public void setHost(String host) { - this.host = host; - } - - public int getPort() { - return port; - } - - public void setPort(int port) { - this.port = port; - } - - public String getUserName() { - return userName; - } - - public void setUserName(String userName) { - this.userName = userName; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public int getBatchSize() { - return batchSize; - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - public int getRetryCount() { - return retryCount; - } - - public void setRetryCount(int retryCount) { - this.retryCount = retryCount; - } - - public long getAcquireInterval() { - return acquireInterval; - } - - public void setAcquireInterval(long acquireInterval) { - this.acquireInterval = acquireInterval; - } - - public String getSubscribe() { - return subscribe; - } - - public void setSubscribe(String subscribe) { - this.subscribe = subscribe; - } - } - - public Map<String, EndpointInstance> getInstances() { - return instances; - } - - public void setInstances(Map<String, EndpointInstance> instances) { - this.instances = instances; - } -} +package com.hz.canal.starter.configuration; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Canal杩炴帴鐨勯厤缃被 + * + * @author wuqiong 2022/4/11 + */ +@Order(Ordered.HIGHEST_PRECEDENCE) +@ConfigurationProperties(prefix = "spring.canal") +public class CanalAutoConfigurationProperties { + + private Map<String, EndpointInstance> instances = new LinkedHashMap<>(); + + public static class EndpointInstance { + + /** + * 鏄惁寮�鍚� cluster + */ + private boolean clusterEnabled; + + /** + * zookeeper 鍦板潃, 渚�: 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181 + */ + private String zookeeperAddress; + + /** + * 榛樿 127.0.0.1 + */ + private String host = "127.0.0.1"; + + /** + * 绔彛 , 榛樿: 11111 + */ + private int port = 11111; + + /** + * 鐢ㄦ埛鍚� + */ + private String userName = ""; + + /** + * 瀵嗙爜 + */ + private String password = ""; + + /** + * 姣忔鑾峰彇鏁版嵁鏉℃暟 , 榛樿: 200 + */ + private int batchSize = 200; + + /** + * 鍙戠敓閿欒鏃堕噸璇曟鏁� , 榛樿: 5 + */ + private int retryCount = 5; + + /** + * mysql 鏁版嵁瑙f瀽鍏虫敞鐨勮〃锛孭erl姝e垯琛ㄨ揪寮�. + * <p> + * <p> + * 澶氫釜姝e垯涔嬮棿浠ラ�楀彿(,)鍒嗛殧锛岃浆涔夌闇�瑕佸弻鏂滄潬(\\) + * <p> + * <p> + * 甯歌渚嬪瓙锛� <p> + * 1. 鎵�鏈夊簱琛細.* or .*\\..* <p> + * 2. canal_db 涓嬫墍鏈夎〃锛� canal_db\\..* <p> + * 3. canal_db 涓嬬殑浠anal鎵撳ご鐨勮〃锛� canal_db\\.canal.* <p> + * 4. canal_db 涓嬬殑涓�寮犺〃锛� canal_db\\.test1 <p> + * 5. 澶氫釜瑙勫垯缁勫悎浣跨敤锛歝anal_db\\..*,mysql_db.test1,mysql.test2 (閫楀彿鍒嗛殧) <p> + * <p> + * 榛樿: 鍏ㄥ簱鍏ㄨ〃(.*\\..*) + */ + private String subscribe = ".*\\..*"; + + /** + * 鏈媺鍙栧埌娑堟伅鎯呭喌涓�,鑾峰彇娑堟伅鐨勬椂闂撮棿闅旀绉掑�� , 榛樿: 1000 + */ + private long acquireInterval = 1000; + + public EndpointInstance() { + } + + public boolean isClusterEnabled() { + return clusterEnabled; + } + + public void setClusterEnabled(boolean clusterEnabled) { + this.clusterEnabled = clusterEnabled; + } + + public String getZookeeperAddress() { + return zookeeperAddress; + } + + public void setZookeeperAddress(String zookeeperAddress) { + this.zookeeperAddress = zookeeperAddress; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public long getAcquireInterval() { + return acquireInterval; + } + + public void setAcquireInterval(long acquireInterval) { + this.acquireInterval = acquireInterval; + } + + public String getSubscribe() { + return subscribe; + } + + public void setSubscribe(String subscribe) { + this.subscribe = subscribe; + } + } + + public Map<String, EndpointInstance> getInstances() { + return instances; + } + + public void setInstances(Map<String, EndpointInstance> instances) { + this.instances = instances; + } +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalBootstrapConfiguration.java b/src/main/java/com/hz/canal/starter/configuration/CanalBootstrapConfiguration.java similarity index 95% rename from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalBootstrapConfiguration.java rename to src/main/java/com/hz/canal/starter/configuration/CanalBootstrapConfiguration.java index 4ec1a93..345097d 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalBootstrapConfiguration.java +++ b/src/main/java/com/hz/canal/starter/configuration/CanalBootstrapConfiguration.java @@ -1,35 +1,35 @@ -package com.duxinglangzi.canal.starter.configuration; - -import org.springframework.beans.factory.support.BeanDefinitionRegistry; -import org.springframework.beans.factory.support.RootBeanDefinition; -import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; -import org.springframework.core.type.AnnotationMetadata; - -/** - * @author wuqiong 2022/4/12 - */ -public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar { - - public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME = - "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor"; - - /** - * 娉ㄥ唽 CanalListenerAnnotationBeanPostProcessor 鍒皊pring bean 瀹瑰櫒鍐� - * - * @param importingClassMetadata - * @param registry - * @return void - * @author wuqiong 2022-04-23 20:21 - */ - @Override - public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { - if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) { - - registry.registerBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME, - new RootBeanDefinition(CanalListenerAnnotationBeanPostProcessor.class)); - } - - - } - -} +package com.hz.canal.starter.configuration; + +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.context.annotation.ImportBeanDefinitionRegistrar; +import org.springframework.core.type.AnnotationMetadata; + +/** + * @author wuqiong 2022/4/12 + */ +public class CanalBootstrapConfiguration implements ImportBeanDefinitionRegistrar { + + public static final String CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME = + "com.duxinglangzi.canal.starter.configuration.CanalListenerAnnotationBeanPostProcessor"; + + /** + * 娉ㄥ唽 CanalListenerAnnotationBeanPostProcessor 鍒皊pring bean 瀹瑰櫒鍐� + * + * @param importingClassMetadata + * @param registry + * @return void + * @author wuqiong 2022-04-23 20:21 + */ + @Override + public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { + if (!registry.containsBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) { + + registry.registerBeanDefinition(CANAL_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME, + new RootBeanDefinition(CanalListenerAnnotationBeanPostProcessor.class)); + } + + + } + +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalConfigurationSelector.java b/src/main/java/com/hz/canal/starter/configuration/CanalConfigurationSelector.java similarity index 91% rename from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalConfigurationSelector.java rename to src/main/java/com/hz/canal/starter/configuration/CanalConfigurationSelector.java index 0c9da65..0a90b4b 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalConfigurationSelector.java +++ b/src/main/java/com/hz/canal/starter/configuration/CanalConfigurationSelector.java @@ -1,22 +1,22 @@ -package com.duxinglangzi.canal.starter.configuration; - -import org.springframework.context.annotation.DeferredImportSelector; -import org.springframework.core.type.AnnotationMetadata; - -/** - * @author wuqiong 2022/4/12 - */ -public class CanalConfigurationSelector implements DeferredImportSelector { - - /** - * 鎵弿鍣� 瀵煎叆鎸囧畾绫伙紝 杩欓噷瀵煎叆 CanalBootstrapConfiguration.class - * - * @param importingClassMetadata - * @return java.lang.String[] - * @author wuqiong 2022-04-23 20:20 - */ - @Override - public String[] selectImports(AnnotationMetadata importingClassMetadata) { - return new String[]{CanalBootstrapConfiguration.class.getName()}; - } -} +package com.hz.canal.starter.configuration; + +import org.springframework.context.annotation.DeferredImportSelector; +import org.springframework.core.type.AnnotationMetadata; + +/** + * @author wuqiong 2022/4/12 + */ +public class CanalConfigurationSelector implements DeferredImportSelector { + + /** + * 鎵弿鍣� 瀵煎叆鎸囧畾绫伙紝 杩欓噷瀵煎叆 CanalBootstrapConfiguration.class + * + * @param importingClassMetadata + * @return java.lang.String[] + * @author wuqiong 2022-04-23 20:20 + */ + @Override + public String[] selectImports(AnnotationMetadata importingClassMetadata) { + return new String[]{CanalBootstrapConfiguration.class.getName()}; + } +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java b/src/main/java/com/hz/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java similarity index 95% rename from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java rename to src/main/java/com/hz/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java index 64915e4..3d07141 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java +++ b/src/main/java/com/hz/canal/starter/configuration/CanalListenerAnnotationBeanPostProcessor.java @@ -1,88 +1,88 @@ -package com.duxinglangzi.canal.starter.configuration; - - -import com.duxinglangzi.canal.starter.annotation.CanalListener; -import com.duxinglangzi.canal.starter.factory.TransponderContainerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.aop.support.AopUtils; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.SmartInitializingSingleton; -import org.springframework.beans.factory.config.BeanFactoryPostProcessor; -import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.core.MethodIntrospector; -import org.springframework.core.annotation.AnnotatedElementUtils; - -import java.lang.reflect.Method; -import java.util.Collections; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; - -/** - * @author wuqiong 2022/4/11 - */ -public class CanalListenerAnnotationBeanPostProcessor implements - BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware { - - private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class); - - private final Set<Class<?>> notAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(256)); - private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private ConfigurableListableBeanFactory configurableListableBeanFactory; - private CanalAutoConfigurationProperties canalAutoConfigurationProperties; - private ApplicationContext applicationContext; - - @Override - public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { - if (notAnnotatedClasses.contains(bean.getClass())) return bean; - Class<?> targetClass = AopUtils.getTargetClass(bean); - // 鍙壂鎻忕被鐨勬柟娉曪紝鐩墠 CanalListener 鍙敮鎸佸湪鏂规硶涓� - Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass, - (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method)); - if (annotatedMethods.isEmpty()) { - this.notAnnotatedClasses.add(bean.getClass()); - } else { - // 鍏堝姞鍏ュ埌寰呮敞鍐岄噷闈� - annotatedMethods.entrySet().stream() - .filter(e -> e != null) - .forEach(ele -> registrars.add(createRegistrar(bean, ele))); - logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName, - annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet())); - } - return bean; - } - - - private CanalListener findListenerAnnotations(Method method) { - return AnnotatedElementUtils.findMergedAnnotation(method, CanalListener.class); - } - - - @Override - public void afterSingletonsInstantiated() { - canalAutoConfigurationProperties = configurableListableBeanFactory.getBean(CanalAutoConfigurationProperties.class); - TransponderContainerFactory.registerListenerContainer(configurableListableBeanFactory, canalAutoConfigurationProperties, registrars); - } - - @Override - public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { - this.configurableListableBeanFactory = configurableListableBeanFactory; - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } - - private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { - return new CanalListenerEndpointRegistrar(bean, entry.getKey(), - applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()), - applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()), - entry.getValue().table(), entry.getValue().eventType()); - } -} +package com.hz.canal.starter.configuration; + + +import com.hz.canal.starter.annotation.CanalListener; +import com.hz.canal.starter.factory.TransponderContainerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.aop.support.AopUtils; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.config.BeanFactoryPostProcessor; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.MethodIntrospector; +import org.springframework.core.annotation.AnnotatedElementUtils; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * @author wuqiong 2022/4/11 + */ +public class CanalListenerAnnotationBeanPostProcessor implements + BeanPostProcessor, SmartInitializingSingleton, BeanFactoryPostProcessor, ApplicationContextAware { + + private static final Logger logger = LoggerFactory.getLogger(CanalListenerAnnotationBeanPostProcessor.class); + + private final Set<Class<?>> notAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap<>(256)); + private Set<CanalListenerEndpointRegistrar> registrars = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private ConfigurableListableBeanFactory configurableListableBeanFactory; + private CanalAutoConfigurationProperties canalAutoConfigurationProperties; + private ApplicationContext applicationContext; + + @Override + public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { + if (notAnnotatedClasses.contains(bean.getClass())) return bean; + Class<?> targetClass = AopUtils.getTargetClass(bean); + // 鍙壂鎻忕被鐨勬柟娉曪紝鐩墠 CanalListener 鍙敮鎸佸湪鏂规硶涓� + Map<Method, CanalListener> annotatedMethods = MethodIntrospector.selectMethods(targetClass, + (MethodIntrospector.MetadataLookup<CanalListener>) method -> findListenerAnnotations(method)); + if (annotatedMethods.isEmpty()) { + this.notAnnotatedClasses.add(bean.getClass()); + } else { + // 鍏堝姞鍏ュ埌寰呮敞鍐岄噷闈� + annotatedMethods.entrySet().stream() + .filter(e -> e != null) + .forEach(ele -> registrars.add(createRegistrar(bean, ele))); + logger.info("Registered @CanalListener methods processed on bean:{} , Methods :{} ", beanName, + annotatedMethods.keySet().stream().map(e -> e.getName()).collect(Collectors.toSet())); + } + return bean; + } + + + private CanalListener findListenerAnnotations(Method method) { + return AnnotatedElementUtils.findMergedAnnotation(method, CanalListener.class); + } + + + @Override + public void afterSingletonsInstantiated() { + canalAutoConfigurationProperties = configurableListableBeanFactory.getBean(CanalAutoConfigurationProperties.class); + TransponderContainerFactory.registerListenerContainer(configurableListableBeanFactory, canalAutoConfigurationProperties, registrars); + } + + @Override + public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { + this.configurableListableBeanFactory = configurableListableBeanFactory; + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + private CanalListenerEndpointRegistrar createRegistrar(Object bean, Map.Entry<Method, CanalListener> entry) { + return new CanalListenerEndpointRegistrar(bean, entry.getKey(), + applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().destination()), + applicationContext.getEnvironment().resolvePlaceholders(entry.getValue().database()), + entry.getValue().table(), entry.getValue().eventType()); + } +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java b/src/main/java/com/hz/canal/starter/configuration/CanalListenerEndpointRegistrar.java similarity index 97% rename from src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java rename to src/main/java/com/hz/canal/starter/configuration/CanalListenerEndpointRegistrar.java index 6dd0e43..a2144a9 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/configuration/CanalListenerEndpointRegistrar.java +++ b/src/main/java/com/hz/canal/starter/configuration/CanalListenerEndpointRegistrar.java @@ -1,130 +1,130 @@ -package com.duxinglangzi.canal.starter.configuration; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.duxinglangzi.canal.starter.mode.CanalMessage; -import org.apache.commons.lang3.StringUtils; - -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.function.Predicate; -import java.util.stream.Collectors; - -/** - * 鐩戝惉鐨勭粓绔敞鍐屽櫒 - * - * @author wuqiong 2022/4/11 - */ -public class CanalListenerEndpointRegistrar { - - private Object bean; - private Method method; - - /** - * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination - */ - private String destination; - - /** - * 鏁版嵁搴撳悕 - */ - private String database; - - /** - * 鏁版嵁琛ㄥ悕 - */ - private String[] table; - - /** - * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL - */ - private CanalEntry.EventType[] eventType; - - /** - * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔1涓弬鏁�, 璇ュ弬鏁板璞″唴鍖呭惈浜�: 搴撳悕銆佽〃鍚嶃�佷簨浠剁被鍨嬨�佸彉鏇寸殑鏁版嵁 <p> - * 2銆佹柟娉曞弬鏁板繀椤讳负: CanalMessage <p> - * 3銆佸鏋淐analListener 鎸囧畾鐨� destination 涓嶅湪閰嶇疆鏂囦欢鍐咃紝鍒欑洿鎺ユ姏閿� <p> - * - * @param sets - * @return void - * @author wuqiong 2022-04-23 20:27 - */ - public void checkParameter(Set<String> sets) { - List<Class<?>> classes = parameterTypes(); - if (classes.size() != 1 || classes.get(0) != CanalMessage.class) - throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + - "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check "); - if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) - throw new CanalClientException("@CanalListener Illegal destination " + getDestination() + ", please check "); - - } - - - public List<Class<?>> parameterTypes() { - return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList()); - } - - public boolean isContainDestination(String destination) { - if (StringUtils.isBlank(getDestination())) return true; - return getDestination().equals(destination); - } - - /** - * 杩囨护鍙傛暟 - * - * @param database - * @param tableName - * @param eventType - * @return java.util.function.Predicate - * @author wuqiong 2022-04-23 20:47 - */ - public static Predicate<CanalListenerEndpointRegistrar> filterArgs( - String database, String tableName, CanalEntry.EventType eventType) { - Predicate<CanalListenerEndpointRegistrar> databases = - e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database); - Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0 - || (e.getTable().length == 1 && "".equals(e.getTable()[0])) - || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName)); - Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0 - || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType); - return databases.and(table).and(eventTypes); - } - - - public CanalListenerEndpointRegistrar( - Object bean, Method method, String destination, - String database, String[] tables, CanalEntry.EventType[] eventTypes) { - this.bean = bean; - this.method = method; - this.destination = destination; - this.database = database; - this.table = tables; - this.eventType = eventTypes; - } - - public Object getBean() { - return bean; - } - - public Method getMethod() { - return method; - } - - public String getDestination() { - return destination; - } - - public String getDatabase() { - return database; - } - - public String[] getTable() { - return table; - } - - public CanalEntry.EventType[] getEventType() { - return eventType; - } -} +package com.hz.canal.starter.configuration; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.hz.canal.starter.mode.CanalMessage; +import org.apache.commons.lang3.StringUtils; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * 鐩戝惉鐨勭粓绔敞鍐屽櫒 + * + * @author wuqiong 2022/4/11 + */ +public class CanalListenerEndpointRegistrar { + + private Object bean; + private Method method; + + /** + * 濡傛灉鏈繘琛岄厤缃紝鍒欎娇鐢ㄩ厤缃枃浠堕噷鍏ㄩ儴 destination + */ + private String destination; + + /** + * 鏁版嵁搴撳悕 + */ + private String database; + + /** + * 鏁版嵁琛ㄥ悕 + */ + private String[] table; + + /** + * 鏁版嵁鍙樺姩绫诲瀷锛屾澶勮娉ㄦ剰锛岄粯璁や笉鍖呭惈 DDL + */ + private CanalEntry.EventType[] eventType; + + /** + * 1銆佺洰鍓嶅疄鐜扮殑 DML 瑙f瀽鍣ㄤ粎鏀寔1涓弬鏁�, 璇ュ弬鏁板璞″唴鍖呭惈浜�: 搴撳悕銆佽〃鍚嶃�佷簨浠剁被鍨嬨�佸彉鏇寸殑鏁版嵁 <p> + * 2銆佹柟娉曞弬鏁板繀椤讳负: CanalMessage <p> + * 3銆佸鏋淐analListener 鎸囧畾鐨� destination 涓嶅湪閰嶇疆鏂囦欢鍐咃紝鍒欑洿鎺ユ姏閿� <p> + * + * @param sets + * @return void + * @author wuqiong 2022-04-23 20:27 + */ + public void checkParameter(Set<String> sets) { + List<Class<?>> classes = parameterTypes(); + if (classes.size() != 1 || classes.get(0) != CanalMessage.class) + throw new IllegalArgumentException("@CanalListener Method Parameter Type Invalid, " + + "Need Parameter Type [ com.duxinglangzi.canal.starter.mode.CanalMessage ] please check "); + if (StringUtils.isNotBlank(getDestination()) && !sets.contains(getDestination())) + throw new CanalClientException("@CanalListener Illegal destination " + getDestination() + ", please check "); + + } + + + public List<Class<?>> parameterTypes() { + return Arrays.stream(getMethod().getParameterTypes()).collect(Collectors.toList()); + } + + public boolean isContainDestination(String destination) { + if (StringUtils.isBlank(getDestination())) return true; + return getDestination().equals(destination); + } + + /** + * 杩囨护鍙傛暟 + * + * @param database + * @param tableName + * @param eventType + * @return java.util.function.Predicate + * @author wuqiong 2022-04-23 20:47 + */ + public static Predicate<CanalListenerEndpointRegistrar> filterArgs( + String database, String tableName, CanalEntry.EventType eventType) { + Predicate<CanalListenerEndpointRegistrar> databases = + e -> StringUtils.isBlank(e.getDatabase()) || e.getDatabase().equals(database); + Predicate<CanalListenerEndpointRegistrar> table = e -> e.getTable().length == 0 + || (e.getTable().length == 1 && "".equals(e.getTable()[0])) + || Arrays.stream(e.getTable()).anyMatch(s -> s.equals(tableName)); + Predicate<CanalListenerEndpointRegistrar> eventTypes = e -> e.getEventType().length == 0 + || Arrays.stream(e.getEventType()).anyMatch(eve -> eve == eventType); + return databases.and(table).and(eventTypes); + } + + + public CanalListenerEndpointRegistrar( + Object bean, Method method, String destination, + String database, String[] tables, CanalEntry.EventType[] eventTypes) { + this.bean = bean; + this.method = method; + this.destination = destination; + this.database = database; + this.table = tables; + this.eventType = eventTypes; + } + + public Object getBean() { + return bean; + } + + public Method getMethod() { + return method; + } + + public String getDestination() { + return destination; + } + + public String getDatabase() { + return database; + } + + public String[] getTable() { + return table; + } + + public CanalEntry.EventType[] getEventType() { + return eventType; + } +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java b/src/main/java/com/hz/canal/starter/container/AbstractCanalTransponderContainer.java similarity index 93% rename from src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java rename to src/main/java/com/hz/canal/starter/container/AbstractCanalTransponderContainer.java index c0a5094..13060be 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java +++ b/src/main/java/com/hz/canal/starter/container/AbstractCanalTransponderContainer.java @@ -1,74 +1,74 @@ -package com.duxinglangzi.canal.starter.container; - -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener; -import org.springframework.context.SmartLifecycle; - -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * 鎶借薄鐨刢anal transponder ,瀹炵幇SmartLifecycle鎺ュ彛,澹版槑鍛ㄦ湡鐢眘pring杩涜绠$悊 - * - * @author wuqiong 2022/4/11 - */ -public abstract class AbstractCanalTransponderContainer implements SmartLifecycle { - protected boolean isRunning = false; - protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L; - protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES = - Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, - CanalEntry.EntryType.TRANSACTIONEND, - CanalEntry.EntryType.HEARTBEAT); - - protected abstract void doStart(); - - protected abstract void initConnect(); - - protected abstract void disconnect(); - - - @Override - public void start() { - new Thread(() -> { - // spring 鍚姩鍚� 鎵嶄細杩涜canal鏁版嵁鎷夊彇 - while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get()) - sleep(5L * SLEEP_TIME_MILLI_SECONDS); - initConnect(); - while (isRunning() && !Thread.currentThread().isInterrupted()) doStart(); - disconnect(); // 绾跨▼琚粓姝㈡垨鑰呭鍣ㄥ凡缁忓仠姝�,闇�瑕佸叧闂繛鎺� - }).start(); - setRunning(true); - } - - @Override - public void stop() { - setRunning(false); - } - - @Override - public void stop(Runnable callback) { - callback.run(); - setRunning(false); - sleep(SLEEP_TIME_MILLI_SECONDS); - } - - @Override - public boolean isRunning() { - return isRunning; - } - - protected void setRunning(boolean bool) { - isRunning = bool; - } - - protected void sleep(long sleepTimeMilliSeconds) { - try { - TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds); - if (!isRunning()) Thread.currentThread().interrupt(); - } catch (InterruptedException e) { - } - } - - -} +package com.hz.canal.starter.container; + +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.hz.canal.starter.listener.ApplicationReadyListener; +import org.springframework.context.SmartLifecycle; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * 鎶借薄鐨刢anal transponder ,瀹炵幇SmartLifecycle鎺ュ彛,澹版槑鍛ㄦ湡鐢眘pring杩涜绠$悊 + * + * @author wuqiong 2022/4/11 + */ +public abstract class AbstractCanalTransponderContainer implements SmartLifecycle { + protected boolean isRunning = false; + protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L; + protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES = + Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN, + CanalEntry.EntryType.TRANSACTIONEND, + CanalEntry.EntryType.HEARTBEAT); + + protected abstract void doStart(); + + protected abstract void initConnect(); + + protected abstract void disconnect(); + + + @Override + public void start() { + new Thread(() -> { + // spring 鍚姩鍚� 鎵嶄細杩涜canal鏁版嵁鎷夊彇 + while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get()) + sleep(5L * SLEEP_TIME_MILLI_SECONDS); + initConnect(); + while (isRunning() && !Thread.currentThread().isInterrupted()) doStart(); + disconnect(); // 绾跨▼琚粓姝㈡垨鑰呭鍣ㄥ凡缁忓仠姝�,闇�瑕佸叧闂繛鎺� + }).start(); + setRunning(true); + } + + @Override + public void stop() { + setRunning(false); + } + + @Override + public void stop(Runnable callback) { + callback.run(); + setRunning(false); + sleep(SLEEP_TIME_MILLI_SECONDS); + } + + @Override + public boolean isRunning() { + return isRunning; + } + + protected void setRunning(boolean bool) { + isRunning = bool; + } + + protected void sleep(long sleepTimeMilliSeconds) { + try { + TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds); + if (!isRunning()) Thread.currentThread().interrupt(); + } catch (InterruptedException e) { + } + } + + +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java similarity index 95% rename from src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java rename to src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java index 6865b86..480c158 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/hz/canal/starter/container/DmlMessageTransponderContainer.java @@ -1,139 +1,139 @@ -package com.duxinglangzi.canal.starter.container; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.alibaba.otter.canal.protocol.CanalEntry; -import com.alibaba.otter.canal.protocol.Message; -import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; -import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar; -import com.duxinglangzi.canal.starter.mode.CanalMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationTargetException; -import java.util.*; - -/** - * DML 鏁版嵁鎷夊彇銆佽В鏋� - * - * @author wuqiong 2022/4/11 - */ -public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { - - private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); - - private CanalConnector connector; - private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; - private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); - private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); - private Integer local_retry_count; - - - public void initConnect() { - try { - // init supportAllTypes - registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); - connector.connect(); - connector.subscribe(endpointInstance.getSubscribe()); - connector.rollback(); - // 鍒濆鍖栨湰鍦伴噸璇曟鏁� - local_retry_count = endpointInstance.getRetryCount(); - } catch (Exception e) { - logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); - setRunning(false); - } - - } - - public void disconnect() { - // 鍏抽棴杩炴帴 - connector.disconnect(); - } - - - public void doStart() { - try { - Message message = null; - try { - message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� - } catch (Exception clientException) { - logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); - if (local_retry_count > 0) { - // 閲嶈瘯娆℃暟鍑忎竴 - local_retry_count = local_retry_count - 1; - sleep(endpointInstance.getAcquireInterval()); - } else { - // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ - logger.error("[DmlMessageTransponderContainer] retry count is zero , " + - "thread interrupt , current connector host: {} , port: {} ", - endpointInstance.getHost(), endpointInstance.getPort()); - Thread.currentThread().interrupt(); - } - return; - } - // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� - if (local_retry_count < endpointInstance.getRetryCount()) - local_retry_count = endpointInstance.getRetryCount(); - List<CanalEntry.Entry> entries = message.getEntries(); - if (message.getId() == -1 || entries.isEmpty()) { - sleep(endpointInstance.getAcquireInterval()); - return; - } - for (CanalEntry.Entry entry : entries) { - try { - consumer(entry); - } catch (Exception e) { - e.printStackTrace(); - logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); - // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 - // return; - } - } - connector.ack(message.getId()); // 鎻愪氦纭 - } catch (Exception exc) { - logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); - // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌 - exc.printStackTrace(); - } - } - - public DmlMessageTransponderContainer( - CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars, - CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { - this.connector = connector; - this.registrars.addAll(registrars); - this.endpointInstance = endpointInstance; - } - - private void consumer(CanalEntry.Entry entry) { - if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; - CanalEntry.RowChange rowChange = null; - try { - rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); - } catch (Exception e) { - logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); - throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); - } - - // 蹇界暐 ddl 璇彞 - if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; - CanalEntry.EventType eventType = rowChange.getEventType(); - if (!support_all_types.contains(eventType)) return; - for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { - registrars - .stream() - .filter(CanalListenerEndpointRegistrar.filterArgs( - entry.getHeader().getSchemaName(), - entry.getHeader().getTableName(), - eventType)) - .forEach(element -> { - try { - element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); - } catch (IllegalAccessException | InvocationTargetException e) { - logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); - throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); - } - }); - } - } - -} +package com.hz.canal.starter.container; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.protocol.CanalEntry; +import com.alibaba.otter.canal.protocol.Message; +import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; +import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; +import com.hz.canal.starter.mode.CanalMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.util.*; + +/** + * DML 鏁版嵁鎷夊彇銆佽В鏋� + * + * @author wuqiong 2022/4/11 + */ +public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { + + private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); + + private CanalConnector connector; + private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; + private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); + private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); + private Integer local_retry_count; + + + public void initConnect() { + try { + // init supportAllTypes + registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); + connector.connect(); + connector.subscribe(endpointInstance.getSubscribe()); + connector.rollback(); + // 鍒濆鍖栨湰鍦伴噸璇曟鏁� + local_retry_count = endpointInstance.getRetryCount(); + } catch (Exception e) { + logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); + setRunning(false); + } + + } + + public void disconnect() { + // 鍏抽棴杩炴帴 + connector.disconnect(); + } + + + public void doStart() { + try { + Message message = null; + try { + message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� + } catch (Exception clientException) { + logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); + if (local_retry_count > 0) { + // 閲嶈瘯娆℃暟鍑忎竴 + local_retry_count = local_retry_count - 1; + sleep(endpointInstance.getAcquireInterval()); + } else { + // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ + logger.error("[DmlMessageTransponderContainer] retry count is zero , " + + "thread interrupt , current connector host: {} , port: {} ", + endpointInstance.getHost(), endpointInstance.getPort()); + Thread.currentThread().interrupt(); + } + return; + } + // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� + if (local_retry_count < endpointInstance.getRetryCount()) + local_retry_count = endpointInstance.getRetryCount(); + List<CanalEntry.Entry> entries = message.getEntries(); + if (message.getId() == -1 || entries.isEmpty()) { + sleep(endpointInstance.getAcquireInterval()); + return; + } + for (CanalEntry.Entry entry : entries) { + try { + consumer(entry); + } catch (Exception e) { + e.printStackTrace(); + logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); + // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 + // return; + } + } + connector.ack(message.getId()); // 鎻愪氦纭 + } catch (Exception exc) { + logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); + // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌 + exc.printStackTrace(); + } + } + + public DmlMessageTransponderContainer( + CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars, + CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { + this.connector = connector; + this.registrars.addAll(registrars); + this.endpointInstance = endpointInstance; + } + + private void consumer(CanalEntry.Entry entry) { + if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; + CanalEntry.RowChange rowChange = null; + try { + rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); + } catch (Exception e) { + logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); + throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); + } + + // 蹇界暐 ddl 璇彞 + if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; + CanalEntry.EventType eventType = rowChange.getEventType(); + if (!support_all_types.contains(eventType)) return; + for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { + registrars + .stream() + .filter(CanalListenerEndpointRegistrar.filterArgs( + entry.getHeader().getSchemaName(), + entry.getHeader().getTableName(), + eventType)) + .forEach(element -> { + try { + element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); + } catch (IllegalAccessException | InvocationTargetException e) { + logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); + throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); + } + }); + } + } + +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/factory/CanalConnectorFactory.java b/src/main/java/com/hz/canal/starter/factory/CanalConnectorFactory.java similarity index 93% rename from src/main/java/com/duxinglangzi/canal/starter/factory/CanalConnectorFactory.java rename to src/main/java/com/hz/canal/starter/factory/CanalConnectorFactory.java index 0ce17db..13c9b6f 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/factory/CanalConnectorFactory.java +++ b/src/main/java/com/hz/canal/starter/factory/CanalConnectorFactory.java @@ -1,50 +1,50 @@ -package com.duxinglangzi.canal.starter.factory; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.alibaba.otter.canal.client.CanalConnectors; -import com.alibaba.otter.canal.protocol.exception.CanalClientException; -import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; -import org.springframework.util.Assert; -import org.springframework.util.StringUtils; - -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.List; - - -public class CanalConnectorFactory { - - /** - * 鍒涘缓 CanalConnector - * - * @param destination - * @param endpointInstance - * @return CanalConnector - * @author wuqiong 2022-04-23 20:36 - */ - public static synchronized CanalConnector createConnector( - String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { - Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check "); - Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check "); - CanalConnector connector; - if (endpointInstance.isClusterEnabled()) { - if (!StringUtils.hasText(endpointInstance.getZookeeperAddress())) - throw new CanalClientException("zookeeper address is null"); - List<SocketAddress> addresses = new ArrayList<>(); - for (String s : endpointInstance.getZookeeperAddress().split(",")) { - String[] split = s.split(":"); - if (split.length != 2) - throw new CanalClientException("error parsing zookeeper address:" + s); - addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1]))); - } - connector = CanalConnectors.newClusterConnector( - addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword()); - } else { - connector = CanalConnectors.newSingleConnector( - new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()), - destination, endpointInstance.getUserName(), endpointInstance.getPassword()); - } - return connector; - } -} +package com.hz.canal.starter.factory; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.alibaba.otter.canal.client.CanalConnectors; +import com.alibaba.otter.canal.protocol.exception.CanalClientException; +import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.List; + + +public class CanalConnectorFactory { + + /** + * 鍒涘缓 CanalConnector + * + * @param destination + * @param endpointInstance + * @return CanalConnector + * @author wuqiong 2022-04-23 20:36 + */ + public static synchronized CanalConnector createConnector( + String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { + Assert.isTrue(StringUtils.hasText(destination), "destination is null , please check "); + Assert.isTrue(endpointInstance != null, "endpoint instance is null , please check "); + CanalConnector connector; + if (endpointInstance.isClusterEnabled()) { + if (!StringUtils.hasText(endpointInstance.getZookeeperAddress())) + throw new CanalClientException("zookeeper address is null"); + List<SocketAddress> addresses = new ArrayList<>(); + for (String s : endpointInstance.getZookeeperAddress().split(",")) { + String[] split = s.split(":"); + if (split.length != 2) + throw new CanalClientException("error parsing zookeeper address:" + s); + addresses.add(new InetSocketAddress(split[0], Integer.parseInt(split[1]))); + } + connector = CanalConnectors.newClusterConnector( + addresses, destination, endpointInstance.getUserName(), endpointInstance.getPassword()); + } else { + connector = CanalConnectors.newSingleConnector( + new InetSocketAddress(endpointInstance.getHost(), endpointInstance.getPort()), + destination, endpointInstance.getUserName(), endpointInstance.getPassword()); + } + return connector; + } +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/factory/TransponderContainerFactory.java b/src/main/java/com/hz/canal/starter/factory/TransponderContainerFactory.java similarity index 89% rename from src/main/java/com/duxinglangzi/canal/starter/factory/TransponderContainerFactory.java rename to src/main/java/com/hz/canal/starter/factory/TransponderContainerFactory.java index bc051cd..26b4ba7 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/factory/TransponderContainerFactory.java +++ b/src/main/java/com/hz/canal/starter/factory/TransponderContainerFactory.java @@ -1,67 +1,67 @@ -package com.duxinglangzi.canal.starter.factory; - -import com.alibaba.otter.canal.client.CanalConnector; -import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; -import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar; -import com.duxinglangzi.canal.starter.container.DmlMessageTransponderContainer; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * @author wuqiong 2022/4/18 - * @description - */ -public class TransponderContainerFactory { - - private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#"; - - - /** - * 灏嗘墍鏈夊緟娉ㄥ唽鐨勭鐐癸紝娉ㄥ唽鍒皊pring涓� - * - * @param beanFactory - * @param canalConfig - * @param registrars - * @return void - * @author wuqiong 2022-04-23 20:34 - */ - public static void registerListenerContainer( - ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig, - Set<CanalListenerEndpointRegistrar> registrars) { - if (registrars == null || registrars.isEmpty()) return; - if (canalConfig == null || canalConfig.getInstances().isEmpty()) return; - - for (Map.Entry<String, CanalAutoConfigurationProperties.EndpointInstance> endpointInstance : canalConfig.getInstances().entrySet()) { - if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 濡傛灉宸茬粡瀛樺湪鍒欎笉鍦ㄥ垱寤� - List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>(); - for (CanalListenerEndpointRegistrar registrar : registrars) { - registrar.checkParameter(canalConfig.getInstances().keySet()); - if (!registrar.isContainDestination(endpointInstance.getKey())) continue; - registrarList.add(registrar); - } - if (registrarList.isEmpty()) continue; - registerTransponderContainer( - endpointInstance.getKey(), endpointInstance.getValue(), beanFactory, registrarList); - } - - } - - private static void registerTransponderContainer( - String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance, - ConfigurableListableBeanFactory beanFactory, List<CanalListenerEndpointRegistrar> registrarList) { - CanalConnector connector = CanalConnectorFactory.createConnector(destination, endpointInstance); - beanFactory.registerSingleton(getContainerID(destination), - new DmlMessageTransponderContainer(connector, registrarList, endpointInstance)); - } - - - private static String getContainerID(String destination) { - return CONTAINER_ID_PREFIX + "#" + destination; - } - - -} +package com.hz.canal.starter.factory; + +import com.alibaba.otter.canal.client.CanalConnector; +import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; +import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; +import com.hz.canal.starter.container.DmlMessageTransponderContainer; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * @author wuqiong 2022/4/18 + * @description + */ +public class TransponderContainerFactory { + + private static final String CONTAINER_ID_PREFIX = "com.duxinglangzi.canal.starter.container.MessageTransponderContainer#"; + + + /** + * 灏嗘墍鏈夊緟娉ㄥ唽鐨勭鐐癸紝娉ㄥ唽鍒皊pring涓� + * + * @param beanFactory + * @param canalConfig + * @param registrars + * @return void + * @author wuqiong 2022-04-23 20:34 + */ + public static void registerListenerContainer( + ConfigurableListableBeanFactory beanFactory, CanalAutoConfigurationProperties canalConfig, + Set<CanalListenerEndpointRegistrar> registrars) { + if (registrars == null || registrars.isEmpty()) return; + if (canalConfig == null || canalConfig.getInstances().isEmpty()) return; + + for (Map.Entry<String, CanalAutoConfigurationProperties.EndpointInstance> endpointInstance : canalConfig.getInstances().entrySet()) { + if (beanFactory.containsBean(getContainerID(endpointInstance.getKey()))) continue; // 濡傛灉宸茬粡瀛樺湪鍒欎笉鍦ㄥ垱寤� + List<CanalListenerEndpointRegistrar> registrarList = new ArrayList<>(); + for (CanalListenerEndpointRegistrar registrar : registrars) { + registrar.checkParameter(canalConfig.getInstances().keySet()); + if (!registrar.isContainDestination(endpointInstance.getKey())) continue; + registrarList.add(registrar); + } + if (registrarList.isEmpty()) continue; + registerTransponderContainer( + endpointInstance.getKey(), endpointInstance.getValue(), beanFactory, registrarList); + } + + } + + private static void registerTransponderContainer( + String destination, CanalAutoConfigurationProperties.EndpointInstance endpointInstance, + ConfigurableListableBeanFactory beanFactory, List<CanalListenerEndpointRegistrar> registrarList) { + CanalConnector connector = CanalConnectorFactory.createConnector(destination, endpointInstance); + beanFactory.registerSingleton(getContainerID(destination), + new DmlMessageTransponderContainer(connector, registrarList, endpointInstance)); + } + + + private static String getContainerID(String destination) { + return CONTAINER_ID_PREFIX + "#" + destination; + } + + +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/listener/ApplicationReadyListener.java b/src/main/java/com/hz/canal/starter/listener/ApplicationReadyListener.java similarity index 92% rename from src/main/java/com/duxinglangzi/canal/starter/listener/ApplicationReadyListener.java rename to src/main/java/com/hz/canal/starter/listener/ApplicationReadyListener.java index 161742f..e25d3f4 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/listener/ApplicationReadyListener.java +++ b/src/main/java/com/hz/canal/starter/listener/ApplicationReadyListener.java @@ -1,20 +1,20 @@ -package com.duxinglangzi.canal.starter.listener; - -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.ApplicationListener; - -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * @author wuqiong 2022/4/16 - */ -public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent> { - - public static final AtomicBoolean START_LISTENER_CONTAINER = new AtomicBoolean(false); - - @Override - public void onApplicationEvent(ApplicationReadyEvent event) { - // 纭繚绋嬪簭鍚姩涔嬪悗锛屽啀鏀捐鎵�鏈夌殑 canal transponder - if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true); - } -} +package com.hz.canal.starter.listener; + +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ApplicationListener; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author wuqiong 2022/4/16 + */ +public class ApplicationReadyListener implements ApplicationListener<ApplicationReadyEvent> { + + public static final AtomicBoolean START_LISTENER_CONTAINER = new AtomicBoolean(false); + + @Override + public void onApplicationEvent(ApplicationReadyEvent event) { + // 纭繚绋嬪簭鍚姩涔嬪悗锛屽啀鏀捐鎵�鏈夌殑 canal transponder + if (!START_LISTENER_CONTAINER.get()) START_LISTENER_CONTAINER.set(true); + } +} diff --git a/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java b/src/main/java/com/hz/canal/starter/mode/CanalMessage.java similarity index 97% rename from src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java rename to src/main/java/com/hz/canal/starter/mode/CanalMessage.java index 4e65b18..b2637fe 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/mode/CanalMessage.java +++ b/src/main/java/com/hz/canal/starter/mode/CanalMessage.java @@ -1,4 +1,4 @@ -package com.duxinglangzi.canal.starter.mode; +package com.hz.canal.starter.mode; import com.alibaba.otter.canal.protocol.CanalEntry; diff --git a/src/main/resources/META-INF/spring-configuration-metadata.json b/src/main/resources/META-INF/spring-configuration-metadata.json index aa915c8..fe2ad6f 100644 --- a/src/main/resources/META-INF/spring-configuration-metadata.json +++ b/src/main/resources/META-INF/spring-configuration-metadata.json @@ -1,9 +1,9 @@ { "properties": [ { - "sourceType": "com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties", + "sourceType": "com.hz.canal.starter.configuration.CanalAutoConfigurationProperties", "name": "spring.canal.instances", - "type": "java.util.Map<java.lang.String,com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties.EndpointInstance>", + "type": "java.util.Map<java.lang.String,com.hz.canal.starter.configuration.CanalAutoConfigurationProperties.EndpointInstance>", "description": "canal config " } ] diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories index 5fb7ad5..6ca3d0d 100644 --- a/src/main/resources/META-INF/spring.factories +++ b/src/main/resources/META-INF/spring.factories @@ -1,9 +1,9 @@ # Auto Configure # org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -# com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\ -# com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector +# CanalAutoConfigurationProperties,\ +# CanalConfigurationSelector # Application Listeners # org.springframework.context.ApplicationListener=\ -# com.duxinglangzi.canal.starter.listener.ApplicationReadyListener +# ApplicationReadyListener -- Gitblit v1.8.0