独行的浪子
2022-04-25 3271cea1b986ee24c5bfaf2b15326554fb56090c
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener;
5 import org.springframework.context.SmartLifecycle;
6
7 import java.util.Arrays;
8 import java.util.List;
9 import java.util.concurrent.TimeUnit;
10
11 /**
627420 12  * 抽象的canal transponder ,实现SmartLifecycle接口,声明周期由spring进行管理
de8c2b 13  * @author wuqiong 2022/4/11
D 14  */
15 public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
16     protected boolean isRunning = false;
17     protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L;
18     protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES =
19             Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN,
20                     CanalEntry.EntryType.TRANSACTIONEND,
21                     CanalEntry.EntryType.HEARTBEAT);
22
23     protected abstract void doStart();
24     protected abstract void initConnect();
929202 25     protected abstract void disconnect();
de8c2b 26
D 27
28     @Override
29     public void start() {
30         new Thread(() -> {
627420 31             // spring 启动后 才会进行canal数据拉取
de8c2b 32             while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
D 33                 sleep(5L * SLEEP_TIME_MILLI_SECONDS);
34             initConnect();
35             while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
929202 36             disconnect(); // 线程被终止或者容器已经停止
de8c2b 37         }).start();
627420 38         setRunning(true);
de8c2b 39     }
D 40
41     @Override
42     public void stop() {
43         setRunning(false);
44     }
45
46     @Override
47     public void stop(Runnable callback) {
48         callback.run();
49         setRunning(false);
3271ce 50         sleep(SLEEP_TIME_MILLI_SECONDS);
de8c2b 51     }
D 52
53     @Override
54     public boolean isRunning() {
55         return isRunning;
56     }
57
58     protected void setRunning(boolean bool){
59         isRunning = bool;
60     }
61
62     protected void sleep(long sleepTimeMilliSeconds) {
63         try {
64             TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds);
65             if (!isRunning()) Thread.currentThread().interrupt();
66         } catch (InterruptedException e) {
67         }
68     }
69
70
71 }