From c83c1b4ee7ff9c01a7a67855863c281589f39c72 Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期一, 15 八月 2022 19:58:57 +0800 Subject: [PATCH] 修改了listener方法的参数 --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 68 ++++++++++++++++++++++------------ 1 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index d554cad..7aad7f7 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -5,6 +5,7 @@ import com.alibaba.otter.canal.protocol.Message; import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties; import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar; +import com.duxinglangzi.canal.starter.mode.CanalMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,6 +14,7 @@ /** * DML 鏁版嵁鎷夊彇銆佽В鏋� + * * @author wuqiong 2022/4/11 */ public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { @@ -22,17 +24,29 @@ private CanalConnector connector; private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); - private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>(); + private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); + private Integer local_retry_count; public void initConnect() { - // init supportAllTypes - registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll( - Arrays.asList(e.getListenerEntry().getValue().eventType()))); - connector.connect(); - connector.subscribe(endpointInstance.getSubscribe()); - connector.rollback(); + try { + // init supportAllTypes + registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); + connector.connect(); + connector.subscribe(endpointInstance.getSubscribe()); + connector.rollback(); + // 鍒濆鍖栨湰鍦伴噸璇曟鏁� + local_retry_count = endpointInstance.getRetryCount(); + } catch (Exception e) { + logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); + setRunning(false); + } + } + + public void disconnect() { + // 鍏抽棴杩炴帴 + connector.disconnect(); } @@ -41,30 +55,36 @@ try { message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� } catch (Exception clientException) { - logger.error("[MessageTransponderContainer] error msg : ", clientException); - endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1); - if (endpointInstance.getRetryCount() < 0) { - logger.error("[MessageTransponderContainer] retry count is zero , " + + logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); + if (local_retry_count > 0) { + // 閲嶈瘯娆℃暟鍑忎竴 + local_retry_count = local_retry_count - 1; + sleep(endpointInstance.getAcquireInterval()); + } else { + // 閲嶈瘯娆℃暟 <= 0 鏃�,鐩存帴缁堟绾跨▼ + logger.error("[DmlMessageTransponderContainer] retry count is zero , " + "thread interrupt , current connector host: {} , port: {} ", endpointInstance.getHost(), endpointInstance.getPort()); Thread.currentThread().interrupt(); - } else { - sleep(endpointInstance.getAcquireInterval()); } return; } + // 濡傛灉閲嶈瘯娆℃暟灏忎簬璁剧疆鐨�,鍒欎慨鏀� + if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount(); List<CanalEntry.Entry> entries = message.getEntries(); if (message.getId() == -1 || entries.isEmpty()) { sleep(endpointInstance.getAcquireInterval()); return; } - try { - entries.forEach(e -> consumer(e)); - } catch (Exception e) { - e.printStackTrace(); - logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); - // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 - // return; + for (CanalEntry.Entry entry : entries) { + try { + consumer(entry); + } catch (Exception e) { + e.printStackTrace(); + logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); + // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 + // return; + } } connector.ack(message.getId()); // 鎻愪氦纭 } @@ -83,14 +103,14 @@ try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { - logger.error("[MessageTransponderContainer_consumer] RowChange parse has an error ", e); + logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); } // 蹇界暐 ddl 璇彞 if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; CanalEntry.EventType eventType = rowChange.getEventType(); - if (!SUPPORT_ALL_TYPES.contains(eventType)) return; + if (!support_all_types.contains(eventType)) return; for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { registrars .stream() @@ -100,9 +120,9 @@ eventType)) .forEach(element -> { try { - element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData); + element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); } catch (IllegalAccessException | InvocationTargetException e) { - logger.error("[MessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); + logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); } }); -- Gitblit v1.8.0