独行的浪子
2022-04-26 5ac2a77c78106bd3786e6579b26962bc4587dc23
src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java
@@ -26,12 +26,17 @@
    public void initConnect() {
        // init supportAllTypes
        registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
                Arrays.asList(e.getListenerEntry().getValue().eventType())));
        connector.connect();
        connector.subscribe(endpointInstance.getSubscribe());
        connector.rollback();
        try {
            // init supportAllTypes
            registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
                    Arrays.asList(e.getListenerEntry().getValue().eventType())));
            connector.connect();
            connector.subscribe(endpointInstance.getSubscribe());
            connector.rollback();
        } catch (Exception e) {
            logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
            setRunning(false);
        }
    }
@@ -46,10 +51,10 @@
        try {
            message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
        } catch (Exception clientException) {
            logger.error("[MessageTransponderContainer] error msg : ", clientException);
            logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
            endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
            if (endpointInstance.getRetryCount() < 0) {
                logger.error("[MessageTransponderContainer] retry count is zero ,  " +
                logger.error("[DmlMessageTransponderContainer] retry count is zero ,  " +
                                "thread interrupt , current connector host: {} , port: {} ",
                        endpointInstance.getHost(), endpointInstance.getPort());
                Thread.currentThread().interrupt();
@@ -69,7 +74,7 @@
                consumer(entry);
            } catch (Exception e) {
                e.printStackTrace();
                logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
                logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
                // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
                // return;
            }
@@ -91,7 +96,7 @@
        try {
            rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
        } catch (Exception e) {
            logger.error("[MessageTransponderContainer_consumer] RowChange parse has an error ", e);
            logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
            throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
        }
@@ -110,7 +115,7 @@
                        try {
                            element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("[MessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
                            throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
                        }
                    });