duxinglangzi
2022-08-04 a8f947d2bc621051821e0cc57335aa6ca1776a8e
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener;
5 import org.springframework.context.SmartLifecycle;
6
7 import java.util.Arrays;
8 import java.util.List;
9 import java.util.concurrent.TimeUnit;
10
11 /**
627420 12  * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理
7cf978 13  *
de8c2b 14  * @author wuqiong 2022/4/11
D 15  */
16 public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
17     protected boolean isRunning = false;
18     protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L;
19     protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES =
20             Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN,
21                     CanalEntry.EntryType.TRANSACTIONEND,
22                     CanalEntry.EntryType.HEARTBEAT);
23
24     protected abstract void doStart();
7cf978 25
de8c2b 26     protected abstract void initConnect();
7cf978 27
929202 28     protected abstract void disconnect();
de8c2b 29
D 30
31     @Override
32     public void start() {
33         new Thread(() -> {
627420 34             // spring 启动后 才会进行canal数据拉取
de8c2b 35             while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
D 36                 sleep(5L * SLEEP_TIME_MILLI_SECONDS);
37             initConnect();
38             while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
a8f947 39             disconnect(); // 线程被终止或者容器已经停止,需要关闭连接
de8c2b 40         }).start();
627420 41         setRunning(true);
de8c2b 42     }
D 43
44     @Override
45     public void stop() {
46         setRunning(false);
47     }
48
49     @Override
50     public void stop(Runnable callback) {
51         callback.run();
52         setRunning(false);
3271ce 53         sleep(SLEEP_TIME_MILLI_SECONDS);
de8c2b 54     }
D 55
56     @Override
57     public boolean isRunning() {
58         return isRunning;
59     }
60
7cf978 61     protected void setRunning(boolean bool) {
de8c2b 62         isRunning = bool;
D 63     }
64
65     protected void sleep(long sleepTimeMilliSeconds) {
66         try {
67             TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds);
68             if (!isRunning()) Thread.currentThread().interrupt();
69         } catch (InterruptedException e) {
70         }
71     }
72
73
74 }