File was renamed from src/main/java/com/duxinglangzi/canal/starter/container/DmlMessageTransponderContainer.java |
| | |
| | | package com.duxinglangzi.canal.starter.container;
|
| | |
|
| | | import com.alibaba.otter.canal.client.CanalConnector;
|
| | | import com.alibaba.otter.canal.protocol.CanalEntry;
|
| | | import com.alibaba.otter.canal.protocol.Message;
|
| | | import com.duxinglangzi.canal.starter.configuration.CanalAutoConfigurationProperties;
|
| | | import com.duxinglangzi.canal.starter.configuration.CanalListenerEndpointRegistrar;
|
| | | import com.duxinglangzi.canal.starter.mode.CanalMessage;
|
| | | import org.slf4j.Logger;
|
| | | import org.slf4j.LoggerFactory;
|
| | |
|
| | | import java.lang.reflect.InvocationTargetException;
|
| | | import java.util.*;
|
| | |
|
| | | /**
|
| | | * DML 数据拉取、解析
|
| | | *
|
| | | * @author wuqiong 2022/4/11
|
| | | */
|
| | | public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer {
|
| | |
|
| | | private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class);
|
| | |
|
| | | private CanalConnector connector;
|
| | | private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
|
| | | private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
|
| | | private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
|
| | | private Integer local_retry_count;
|
| | |
|
| | |
|
| | | public void initConnect() {
|
| | | try {
|
| | | // init supportAllTypes
|
| | | registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
|
| | | connector.connect();
|
| | | connector.subscribe(endpointInstance.getSubscribe());
|
| | | connector.rollback();
|
| | | // 初始化本地重试次数
|
| | | local_retry_count = endpointInstance.getRetryCount();
|
| | | } catch (Exception e) {
|
| | | logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
|
| | | setRunning(false);
|
| | | }
|
| | |
|
| | | }
|
| | |
|
| | | public void disconnect() {
|
| | | // 关闭连接
|
| | | connector.disconnect();
|
| | | }
|
| | |
|
| | |
|
| | | public void doStart() {
|
| | | try {
|
| | | Message message = null;
|
| | | try {
|
| | | message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
|
| | | } catch (Exception clientException) {
|
| | | logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
|
| | | if (local_retry_count > 0) {
|
| | | // 重试次数减一
|
| | | local_retry_count = local_retry_count - 1;
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | } else {
|
| | | // 重试次数 <= 0 时,直接终止线程
|
| | | logger.error("[DmlMessageTransponderContainer] retry count is zero , " +
|
| | | "thread interrupt , current connector host: {} , port: {} ",
|
| | | endpointInstance.getHost(), endpointInstance.getPort());
|
| | | Thread.currentThread().interrupt();
|
| | | }
|
| | | return;
|
| | | }
|
| | | // 如果重试次数小于设置的,则修改
|
| | | if (local_retry_count < endpointInstance.getRetryCount())
|
| | | local_retry_count = endpointInstance.getRetryCount();
|
| | | List<CanalEntry.Entry> entries = message.getEntries();
|
| | | if (message.getId() == -1 || entries.isEmpty()) {
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | return;
|
| | | }
|
| | | for (CanalEntry.Entry entry : entries) {
|
| | | try {
|
| | | consumer(entry);
|
| | | } catch (Exception e) {
|
| | | e.printStackTrace();
|
| | | logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e);
|
| | | // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据
|
| | | // return;
|
| | | }
|
| | | }
|
| | | connector.ack(message.getId()); // 提交确认
|
| | | } catch (Exception exc) {
|
| | | logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage());
|
| | | // 防止删除消息时发生错误,或者拉取消息失败等情况
|
| | | exc.printStackTrace();
|
| | | }
|
| | | }
|
| | |
|
| | | public DmlMessageTransponderContainer(
|
| | | CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars,
|
| | | CanalAutoConfigurationProperties.EndpointInstance endpointInstance) {
|
| | | this.connector = connector;
|
| | | this.registrars.addAll(registrars);
|
| | | this.endpointInstance = endpointInstance;
|
| | | }
|
| | |
|
| | | private void consumer(CanalEntry.Entry entry) {
|
| | | if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return;
|
| | | CanalEntry.RowChange rowChange = null;
|
| | | try {
|
| | | rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
|
| | | } catch (Exception e) {
|
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e);
|
| | | throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e);
|
| | | }
|
| | |
|
| | | // 忽略 ddl 语句
|
| | | if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
|
| | | CanalEntry.EventType eventType = rowChange.getEventType();
|
| | | if (!support_all_types.contains(eventType)) return;
|
| | | for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
| | | registrars
|
| | | .stream()
|
| | | .filter(CanalListenerEndpointRegistrar.filterArgs(
|
| | | entry.getHeader().getSchemaName(),
|
| | | entry.getHeader().getTableName(),
|
| | | eventType))
|
| | | .forEach(element -> {
|
| | | try {
|
| | | element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData));
|
| | | } catch (IllegalAccessException | InvocationTargetException e) {
|
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e);
|
| | | throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e);
|
| | | }
|
| | | });
|
| | | }
|
| | | }
|
| | |
|
| | | }
|
| | | package com.hz.canal.starter.container; |
| | | |
| | | import com.alibaba.otter.canal.client.CanalConnector; |
| | | import com.alibaba.otter.canal.protocol.CanalEntry; |
| | | import com.alibaba.otter.canal.protocol.Message; |
| | | import com.hz.canal.starter.configuration.CanalAutoConfigurationProperties; |
| | | import com.hz.canal.starter.configuration.CanalListenerEndpointRegistrar; |
| | | import com.hz.canal.starter.mode.CanalMessage; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | |
| | | import java.lang.reflect.InvocationTargetException; |
| | | import java.util.*; |
| | | |
| | | /** |
| | | * DML 数据拉取、解析 |
| | | * |
| | | * @author wuqiong 2022/4/11 |
| | | */ |
| | | public class DmlMessageTransponderContainer extends AbstractCanalTransponderContainer { |
| | | |
| | | private final static Logger logger = LoggerFactory.getLogger(DmlMessageTransponderContainer.class); |
| | | |
| | | private CanalConnector connector; |
| | | private CanalAutoConfigurationProperties.EndpointInstance endpointInstance; |
| | | private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>(); |
| | | private Set<CanalEntry.EventType> support_all_types = new HashSet<>(); |
| | | private Integer local_retry_count; |
| | | |
| | | |
| | | public void initConnect() { |
| | | try { |
| | | // init supportAllTypes |
| | | registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType()))); |
| | | connector.connect(); |
| | | connector.subscribe(endpointInstance.getSubscribe()); |
| | | connector.rollback(); |
| | | // 初始化本地重试次数 |
| | | local_retry_count = endpointInstance.getRetryCount(); |
| | | } catch (Exception e) { |
| | | logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e); |
| | | setRunning(false); |
| | | } |
| | | |
| | | } |
| | | |
| | | public void disconnect() { |
| | | // 关闭连接 |
| | | connector.disconnect(); |
| | | } |
| | | |
| | | |
| | | public void doStart() { |
| | | try { |
| | | Message message = null; |
| | | try { |
| | | message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据 |
| | | } catch (Exception clientException) { |
| | | logger.error("[DmlMessageTransponderContainer] error msg : ", clientException); |
| | | if (local_retry_count > 0) { |
| | | // 重试次数减一 |
| | | local_retry_count = local_retry_count - 1; |
| | | sleep(endpointInstance.getAcquireInterval()); |
| | | } else { |
| | | // 重试次数 <= 0 时,直接终止线程 |
| | | logger.error("[DmlMessageTransponderContainer] retry count is zero , " + |
| | | "thread interrupt , current connector host: {} , port: {} ", |
| | | endpointInstance.getHost(), endpointInstance.getPort()); |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | return; |
| | | } |
| | | // 如果重试次数小于设置的,则修改 |
| | | if (local_retry_count < endpointInstance.getRetryCount()) |
| | | local_retry_count = endpointInstance.getRetryCount(); |
| | | List<CanalEntry.Entry> entries = message.getEntries(); |
| | | if (message.getId() == -1 || entries.isEmpty()) { |
| | | sleep(endpointInstance.getAcquireInterval()); |
| | | return; |
| | | } |
| | | for (CanalEntry.Entry entry : entries) { |
| | | try { |
| | | consumer(entry); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | logger.error("[DmlMessageTransponderContainer_doStart] CanalEntry.Entry consumer error ", e); |
| | | // connector.rollback(message.getId()); // 目前先不处理失败, 无需回滚数据 |
| | | // return; |
| | | } |
| | | } |
| | | connector.ack(message.getId()); // 提交确认 |
| | | } catch (Exception exc) { |
| | | logger.error("[DmlMessageTransponderContainer_doStart] Pull data message exception,errorMessage:{}", exc.getLocalizedMessage()); |
| | | // 防止删除消息时发生错误,或者拉取消息失败等情况 |
| | | exc.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | public DmlMessageTransponderContainer( |
| | | CanalConnector connector, List<CanalListenerEndpointRegistrar> registrars, |
| | | CanalAutoConfigurationProperties.EndpointInstance endpointInstance) { |
| | | this.connector = connector; |
| | | this.registrars.addAll(registrars); |
| | | this.endpointInstance = endpointInstance; |
| | | } |
| | | |
| | | private void consumer(CanalEntry.Entry entry) { |
| | | if (IGNORE_ENTRY_TYPES.contains(entry.getEntryType())) return; |
| | | CanalEntry.RowChange rowChange = null; |
| | | try { |
| | | rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); |
| | | } catch (Exception e) { |
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowChange parse has an error ", e); |
| | | throw new RuntimeException("RowChange parse has an error , data:" + entry.toString(), e); |
| | | } |
| | | |
| | | // 忽略 ddl 语句 |
| | | if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return; |
| | | CanalEntry.EventType eventType = rowChange.getEventType(); |
| | | if (!support_all_types.contains(eventType)) return; |
| | | for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { |
| | | registrars |
| | | .stream() |
| | | .filter(CanalListenerEndpointRegistrar.filterArgs( |
| | | entry.getHeader().getSchemaName(), |
| | | entry.getHeader().getTableName(), |
| | | eventType)) |
| | | .forEach(element -> { |
| | | try { |
| | | element.getMethod().invoke(element.getBean(), new CanalMessage(entry.getHeader(), eventType, rowData)); |
| | | } catch (IllegalAccessException | InvocationTargetException e) { |
| | | logger.error("[DmlMessageTransponderContainer_consumer] RowData Callback Method invoke error message", e); |
| | | throw new RuntimeException("RowData Callback Method invoke error message: " + e.getMessage(), e); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | |
| | | } |