From 7cf9781edf66c75570af378a96b4011ff63ada92 Mon Sep 17 00:00:00 2001 From: duxinglangzi <871364441@qq.com> Date: 星期一, 16 五月 2022 13:07:01 +0800 Subject: [PATCH] 格式化 --- src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java | 30 ++++++++++++++++++------------ 1 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java index 9aadc35..4214713 100644 --- a/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java +++ b/src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java @@ -13,6 +13,7 @@ /** * DML 鏁版嵁鎷夊彇銆佽В鏋� + * * @author wuqiong 2022/4/11 */ public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { @@ -26,16 +27,21 @@ public void initConnect() { - // init supportAllTypes - registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll( - Arrays.asList(e.getListenerEntry().getValue().eventType()))); - connector.connect(); - connector.subscribe(endpointInstance.getSubscribe()); - connector.rollback(); + try { + // init supportAllTypes + registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll( + Arrays.asList(e.getListenerEntry().getValue().eventType()))); + connector.connect(); + connector.subscribe(endpointInstance.getSubscribe()); + connector.rollback(); + } catch (Exception e) { + logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); + setRunning(false); + } } - public void disconnect(){ + public void disconnect() { // 鍏抽棴杩炴帴 connector.disconnect(); } @@ -46,10 +52,10 @@ try { message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 鑾峰彇鎸囧畾鏁伴噺鐨勬暟鎹� } catch (Exception clientException) { - logger.error("[MessageTransponderContainer] error msg : ", clientException); + logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1); if (endpointInstance.getRetryCount() < 0) { - logger.error("[MessageTransponderContainer] retry count is zero , " + + logger.error("[DmlMessageTransponderContainer] retry count is zero , " + "thread interrupt , current connector host: {} , port: {} ", endpointInstance.getHost(), endpointInstance.getPort()); Thread.currentThread().interrupt(); @@ -69,7 +75,7 @@ consumer(entry); } catch (Exception e) { e.printStackTrace(); - logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); + logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); // connector.rollback(message.getId()); // 鐩墠鍏堜笉澶勭悊澶辫触, 鏃犻渶鍥炴粴鏁版嵁 // return; } @@ -91,7 +97,7 @@ try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { - logger.error("[MessageTransponderContainer_consumer] RowChange parse has an error ", e); + logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); } @@ -110,7 +116,7 @@ try { element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData); } catch (IllegalAccessException | InvocationTargetException e) { - logger.error("[MessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); + logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); throw new RuntimeException("RowData Callback Method invoke error message锛� " + e.getMessage(), e); } }); -- Gitblit v1.8.0