From ddae546da9487622631d47133c962e7d870982de Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期二, 06 九月 2022 15:13:54 +0800 Subject: [PATCH] 增加错误消息日志打印 --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 71 +++++++++++++++++++---------------- 1 files changed, 39 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index 7aad7f7..6865b86 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -51,42 +51,49 @@ public void doStart() { - Message message = null; try { - message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� - } catch (Exception clientException) { - logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); - if (local_retry_count > 0) { - // 閲嶈瘯娆℃暟鍑忎竴 - local_retry_count = local_retry_count - 1; - sleep(endpointInstance.getAcquireInterval()); - } else { - // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ - logger.error("[DmlMessageTransponderContainer] retry count is zero , " + - "thread interrupt , current connector host: {} , port: {} ", - endpointInstance.getHost(), endpointInstance.getPort()); - Thread.currentThread().interrupt(); - } - return; - } - // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� - if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount(); - List<CanalEntry.Entry> entries = message.getEntries(); - if (message.getId() == -1 || entries.isEmpty()) { - sleep(endpointInstance.getAcquireInterval()); - return; - } - for (CanalEntry.Entry entry : entries) { + Message message = null; try { - consumer(entry); - } catch (Exception e) { - e.printStackTrace(); - logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); - // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 - // return; + message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� + } catch (Exception clientException) { + logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); + if (local_retry_count > 0) { + // 閲嶈瘯娆℃暟鍑忎竴 + local_retry_count = local_retry_count - 1; + sleep(endpointInstance.getAcquireInterval()); + } else { + // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ + logger.error("[DmlMessageTransponderContainer] retry count is zero , " + + "thread interrupt , current connector host: {} , port: {} ", + endpointInstance.getHost(), endpointInstance.getPort()); + Thread.currentThread().interrupt(); + } + return; } + // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� + if (local_retry_count < endpointInstance.getRetryCount()) + local_retry_count = endpointInstance.getRetryCount(); + List<CanalEntry.Entry> entries = message.getEntries(); + if (message.getId() == -1 || entries.isEmpty()) { + sleep(endpointInstance.getAcquireInterval()); + return; + } + for (CanalEntry.Entry entry : entries) { + try { + consumer(entry); + } catch (Exception e) { + e.printStackTrace(); + logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); + // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 + // return; + } + } + connector.ack(message.getId()); // 鎻愪氦纭 + } catch (Exception exc) { + logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); + // 闃叉鍒犻櫎娑堟伅鏃跺彂鐢熼敊璇�,鎴栬�呮媺鍙栨秷鎭け璐ョ瓑鎯呭喌 + exc.printStackTrace(); } - connector.ack(message.getId()); // 鎻愪氦纭 } public DmlMessageTransponderContainer( -- Gitblit v1.8.0