独行的浪子
2022-04-26 3ad94b9abcbde117c6d6ce43a9d023b7755ec0eb
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -12,8 +12,8 @@
import java.util.*;
/**
 * DML 数据拉取、解析
 * @author wuqiong 2022/4/11
 * @description
 */
public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
@@ -35,6 +35,11 @@
    }
    public void disconnect(){
        // 关闭连接
        connector.disconnect();
    }
    public void doStart() {
        Message message = null;
@@ -48,6 +53,7 @@
                                "thread interrupt , current connector host: {} , port: {} ",
                        endpointInstance.getHost(), endpointInstance.getPort());
                Thread.currentThread().interrupt();
                disconnect();
            } else {
                sleep(endpointInstance.getAcquireInterval());
            }
@@ -58,13 +64,15 @@
            sleep(endpointInstance.getAcquireInterval());
            return;
        }
        try {
            entries.forEach(e -> consumer(e));
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
            // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
            // return;
        for (CanalEntry.Entry entry : entries) {
            try {
                consumer(entry);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
                // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
                // return;
            }
        }
        connector.ack(message.getId()); // 提交确认
    }