| | |
| | |
|
| | | /**
|
| | | * DML 数据拉取、解析
|
| | | *
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
|
| | |
| | |
|
| | |
|
| | | public void initConnect() {
|
| | | // init supportAllTypes
|
| | | registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
|
| | | Arrays.asList(e.getListenerEntry().getValue().eventType())));
|
| | | connector.connect();
|
| | | connector.subscribe(endpointInstance.getSubscribe());
|
| | | connector.rollback();
|
| | | try {
|
| | | // init supportAllTypes
|
| | | registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(
|
| | | Arrays.asList(e.getListenerEntry().getValue().eventType())));
|
| | | connector.connect();
|
| | | connector.subscribe(endpointInstance.getSubscribe());
|
| | | connector.rollback();
|
| | | } catch (Exception e) {
|
| | | logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
|
| | | setRunning(false);
|
| | | }
|
| | |
|
| | | }
|
| | |
|
| | | public void disconnect() {
|
| | | // 关闭连接
|
| | | connector.disconnect();
|
| | | }
|
| | |
|
| | |
|
| | |
| | | try {
|
| | | message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
|
| | | } catch (Exception clientException) {
|
| | | logger.error("[MessageTransponderContainer] error msg : ", clientException);
|
| | | logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
|
| | | endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
|
| | | if (endpointInstance.getRetryCount() < 0) {
|
| | | logger.error("[MessageTransponderContainer] retry count is zero , " +
|
| | | logger.error("[DmlMessageTransponderContainer] retry count is zero , " +
|
| | | "thread interrupt , current connector host: {} , port: {} ",
|
| | | endpointInstance.getHost(), endpointInstance.getPort());
|
| | | Thread.currentThread().interrupt();
|
| | | connector.disconnect();
|
| | | disconnect();
|
| | | } else {
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | }
|
| | |
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | return;
|
| | | }
|
| | | try {
|
| | | entries.forEach(e -> consumer(e));
|
| | | } catch (Exception e) {
|
| | | e.printStackTrace();
|
| | | logger.error("[MessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
|
| | | // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
|
| | | // return;
|
| | | for (CanalEntry.Entry entry : entries) {
|
| | | try {
|
| | | consumer(entry);
|
| | | } catch (Exception e) {
|
| | | e.printStackTrace();
|
| | | logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
|
| | | // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
|
| | | // return;
|
| | | }
|
| | | }
|
| | | connector.ack(message.getId()); // 提交确认
|
| | | }
|
| | |
| | | try {
|
| | | rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
| | | } catch (Exception e) {
|
| | | logger.error("[MessageTransponderContainer_consumer] RowChange parse has an error ", e);
|
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
|
| | | throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
|
| | | }
|
| | |
|
| | |
| | | try {
|
| | | element.getListenerEntry().getKey().invoke(element.getBean(), eventType, rowData);
|
| | | } catch (IllegalAccessException | InvocationTargetException e) {
|
| | | logger.error("[MessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
|
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
|
| | | throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
|
| | | }
|
| | | });
|