| | |
| | | private CanalConnector connector;
|
| | | private CanalAutoConfigurationProperties.EndpointInstance endpointInstance;
|
| | | private List<CanalListenerEndpointRegistrar> registrars = new ArrayList<>();
|
| | | private Set<CanalEntry.EventType> SUPPORT_ALL_TYPES = new HashSet<>();
|
| | | private Set<CanalEntry.EventType> support_all_types = new HashSet<>();
|
| | | private Integer local_retry_count;
|
| | |
|
| | |
|
| | | public void initConnect() {
|
| | | try {
|
| | | // init supportAllTypes
|
| | | registrars.forEach(e -> SUPPORT_ALL_TYPES.addAll(Arrays.asList(e.getEventType())));
|
| | | registrars.forEach(e -> support_all_types.addAll(Arrays.asList(e.getEventType())));
|
| | | connector.connect();
|
| | | connector.subscribe(endpointInstance.getSubscribe());
|
| | | connector.rollback();
|
| | | // 初始化本地重试次数
|
| | | local_retry_count = endpointInstance.getRetryCount();
|
| | | } catch (Exception e) {
|
| | | logger.error("[DmlMessageTransponderContainer_initConnect] canal client connect error .", e);
|
| | | setRunning(false);
|
| | |
| | | message = connector.getWithoutAck(endpointInstance.getBatchSize()); // 获取指定数量的数据
|
| | | } catch (Exception clientException) {
|
| | | logger.error("[DmlMessageTransponderContainer] error msg : ", clientException);
|
| | | endpointInstance.setRetryCount(endpointInstance.getRetryCount() - 1);
|
| | | if (endpointInstance.getRetryCount() < 0) {
|
| | | if (local_retry_count > 0) {
|
| | | // 重试次数减一
|
| | | local_retry_count = local_retry_count - 1;
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | } else {
|
| | | // 重试次数 <= 0 时,直接终止线程
|
| | | logger.error("[DmlMessageTransponderContainer] retry count is zero , " +
|
| | | "thread interrupt , current connector host: {} , port: {} ",
|
| | | endpointInstance.getHost(), endpointInstance.getPort());
|
| | | Thread.currentThread().interrupt();
|
| | | disconnect();
|
| | | } else {
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | | }
|
| | | return;
|
| | | }
|
| | | // 如果重试次数小于设置的,则修改
|
| | | if (local_retry_count < endpointInstance.getRetryCount()) local_retry_count = endpointInstance.getRetryCount();
|
| | | List<CanalEntry.Entry> entries = message.getEntries();
|
| | | if (message.getId() == -1 || entries.isEmpty()) {
|
| | | sleep(endpointInstance.getAcquireInterval());
|
| | |
| | | // 忽略 ddl 语句
|
| | | if (rowChange.hasIsDdl() && rowChange.getIsDdl()) return;
|
| | | CanalEntry.EventType eventType = rowChange.getEventType();
|
| | | if (!SUPPORT_ALL_TYPES.contains(eventType)) return;
|
| | | if (!support_all_types.contains(eventType)) return;
|
| | | for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
|
| | | registrars
|
| | | .stream()
|