duxinglangzi
2022-08-04 a8f947d2bc621051821e0cc57335aa6ca1776a8e
修改启用方式为EnableCanalListener注解
修改重试次数瑕疵
1个文件已添加
4个文件已修改
62 ■■■■ 已修改文件
README.md 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/resources/META-INF/spring.factories 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -22,13 +22,15 @@
```
### 在spring boot 项目中的代码使用实例
### 在spring boot 项目中的代码使用实例 (注意需要使用 EnableCanalListener 注解开启 canal listener )
```java
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
import com.duxinglangzi.canal.starter.annotation.EnableCanalListener;
import org.springframework.stereotype.Service;
import java.util.stream.Collectors;
@@ -37,10 +39,13 @@
 * @author wuqiong 2022/4/12
 * @description
 */
@EnableCanalListener
@Service
public class CanalListenerTest {
    /**
     * 必须在类上 使用 EnableCanalListener 注解才能开启 canal listener
     *
     * 目前 Listener 方法的参数必须为 CanalEntry.EventType , CanalEntry.RowData 
     * 程序在启动过程中会做检查
     */
src/main/java/com/duxinglangzi/canal/starter/annotation/EnableCanalListener.java
New file
@@ -0,0 +1,21 @@
package com.duxinglangzi.canal.starter.annotation;
import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
import com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector;
import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener;
import org.springframework.context.annotation.Import;
import java.lang.annotation.*;
/**
 * 开启 canal listener
 *
 * @author wuqiong 2022/8/4
 */
@Documented
@Inherited
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({CanalAutoConfigurationProperties.class, CanalConfigurationSelector.class, ApplicationReadyListener.class})
public @interface EnableCanalListener {
}
src/main/java/com/duxinglangzi/canal/starter/container/AbstractCanalTransponderContainer.java
@@ -36,7 +36,7 @@
                sleep(5L * SLEEP_TIME_MILLI_SECONDS);
            initConnect();
            while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
            disconnect(); // 线程被终止或者容器已经停止
            disconnect(); // 线程被终止或者容器已经停止,需要关闭连接
        }).start();
        setRunning(true);
    }
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -23,16 +23,19 @@
    private CanalConnector connector;
    private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
    private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
    private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
    private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
    private Integer local_retry_count;
    public void initConnect() {
        try {
            // init supportAllTypes
            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
            registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
            connector.connect();
            connector.subscribe(endpointInstance.getSubscribe());
            connector.rollback();
            // 初始化本地重试次数
            local_retry_count = endpointInstance.getRetryCount();
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
            setRunning(false);
@@ -52,18 +55,21 @@
            message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
        } catch (Exception clientException) {
            logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
            endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
            if (endpointInstance.getRetryCount() < 0) {
            if (local_retry_count > 0) {
                // 重试次数减一
                local_retry_count = local_retry_count - 1;
                sleep(endpointInstance.getAcquireInterval());
            } else {
                // 重试次数 <= 0 时,直接终止线程
                logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                "thread interrupt , current connector host: {} , port: {} ",
                        endpointInstance.getHost(), endpointInstance.getPort());
                Thread.currentThread().interrupt();
                disconnect();
            } else {
                sleep(endpointInstance.getAcquireInterval());
            }
            return;
        }
        // 如果重试次数小于设置的,则修改
        if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount();
        List<CanalEntry.Entry> entries = message.getEntries();
        if (message.getId() == -1 || entries.isEmpty()) {
            sleep(endpointInstance.getAcquireInterval());
@@ -103,7 +109,7 @@
        // 忽略 ddl 语句
        if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
        CanalEntry.EventType eventType = rowChange.getEventType();
        if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
        if (!support_all_types.contains(eventType)) return;
        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
            registrars
                    .stream()
src/main/resources/META-INF/spring.factories
@@ -1,9 +1,9 @@
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\
com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector
# org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
# com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties,\
# com.duxinglangzi.canal.starter.configuration.CanalConfigurationSelector
# Application Listeners
org.springframework.context.ApplicationListener=\
com.duxinglangzi.canal.starter.listener.ApplicationReadyListener
# org.springframework.context.ApplicationListener=\
# com.duxinglangzi.canal.starter.listener.ApplicationReadyListener