From 3ad94b9abcbde117c6d6ce43a9d023b7755ec0eb Mon Sep 17 00:00:00 2001 From: 独行的浪子 <871364441@qq.com> Date: 星期二, 26 四月 2022 17:05:11 +0800 Subject: [PATCH] Update DmlMessageTransponderContainer.java --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 24 ++++++++++++++++-------- 1 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index dd426aa..9aadc35 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -12,8 +12,8 @@ import java.util.*; /** + * DML 鏁版嵁鎷夊彇銆佽В鏋� * @author wuqiong 2022/4/11 - * @description */ public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { @@ -35,6 +35,11 @@ } + public void disconnect(){ + // 鍏抽棴杩炴帴 + connector.disconnect(); + } + public void doStart() { Message message = null; @@ -48,6 +53,7 @@ "thread interrupt , current connector host: {} , port: {} ", endpointInstance.getHost(), endpointInstance.getPort()); Thread.currentThread().interrupt(); + disconnect(); } else { sleep(endpointInstance.getAcquireInterval()); } @@ -58,13 +64,15 @@ sleep(endpointInstance.getAcquireInterval()); return; } - try { - entries.forEach(e -> consumer(e)); - } catch (Exception e) { - e.printStackTrace(); - logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); - // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 - // return; + for (CanalEntry.Entry entry : entries) { + try { + consumer(entry); + } catch (Exception e) { + e.printStackTrace(); + logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); + // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 + // return; + } } connector.ack(message.getId()); // 鎻愪氦纭 } -- Gitblit v1.8.0