From a8f947d2bc621051821e0cc57335aa6ca1776a8e Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期四, 04 八月 2022 18:16:29 +0800 Subject: [PATCH] 修改启用方式为EnableCanalListener注解 修改重试次数瑕疵 --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 22 ++++++++++++++-------- 1 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index 0374716..e25a8df 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -23,16 +23,19 @@ private CanalConnector connector; private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); - private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>(); + private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); + private Integer local_retry_count; public void initConnect() { try { // init supportAllTypes - registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType()))); + registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); connector.connect(); connector.subscribe(endpointInstance.getSubscribe()); connector.rollback(); + // 鍒濆鍖栨湰鍦伴噸璇曟鏁� + local_retry_count = endpointInstance.getRetryCount(); } catch (Exception e) { logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); setRunning(false); @@ -52,18 +55,21 @@ message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� } catch (Exception clientException) { logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); - endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1); - if (endpointInstance.getRetryCount() < 0) { + if (local_retry_count > 0) { + // 閲嶈瘯娆℃暟鍑忎竴 + local_retry_count = local_retry_count - 1; + sleep(endpointInstance.getAcquireInterval()); + } else { + // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ logger.error("[DmlMessageTransponderContainer] retry count is zero , " + "thread interrupt , current connector host: {} , port: {} ", endpointInstance.getHost(), endpointInstance.getPort()); Thread.currentThread().interrupt(); - disconnect(); - } else { - sleep(endpointInstance.getAcquireInterval()); } return; } + // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� + if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount(); List<CanalEntry.Entry> entries = message.getEntries(); if (message.getId() == -1 || entries.isEmpty()) { sleep(endpointInstance.getAcquireInterval()); @@ -103,7 +109,7 @@ // 蹇界暐 ddl 璇彞 if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; CanalEntry.EventType eventType = rowChange.getEventType(); - if (!SUPPORT_ALL_TYPES.contains(eventType)) return; + if (!support_all_types.contains(eventType)) return; for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { registrars .stream() -- Gitblit v1.8.0