duxinglangzi
2022-04-22 de8c2b2a4654893dc2c80f1fe095c165485bee5f
提交 | 用户 | age
de8c2b 1 package com.duxinglangzi.canal.starter.container;
D 2
3 import com.alibaba.otter.canal.protocol.CanalEntry;
4 import com.duxinglangzi.canal.starter.listener.ApplicationReadyListener;
5 import org.springframework.context.SmartLifecycle;
6
7 import java.util.Arrays;
8 import java.util.List;
9 import java.util.concurrent.TimeUnit;
10
11 /**
12  * @author wuqiong 2022/4/11
13  * @description
14  */
15 public abstract class AbstractCanalTransponderContainer implements SmartLifecycle {
16     protected boolean isRunning = false;
17     protected final Long SLEEP_TIME_MILLI_SECONDS = 1000L;
18     protected List<CanalEntry.EntryType> IGNORE_ENTRY_TYPES =
19             Arrays.asList(CanalEntry.EntryType.TRANSACTIONBEGIN,
20                     CanalEntry.EntryType.TRANSACTIONEND,
21                     CanalEntry.EntryType.HEARTBEAT);
22
23     protected abstract void doStart();
24     protected abstract void initConnect();
25
26
27     @Override
28     public void start() {
29         setRunning(true);
30         new Thread(() -> {
31             while (!ApplicationReadyListener.START_LISTENER_CONTAINER.get())
32                 sleep(5L * SLEEP_TIME_MILLI_SECONDS);
33             initConnect();
34             while (isRunning() && !Thread.currentThread().isInterrupted()) doStart();
35         }).start();
36     }
37
38     @Override
39     public void stop() {
40         setRunning(false);
41     }
42
43     @Override
44     public void stop(Runnable callback) {
45         callback.run();
46         setRunning(false);
47     }
48
49     @Override
50     public boolean isRunning() {
51         return isRunning;
52     }
53
54     protected void setRunning(boolean bool){
55         isRunning = bool;
56     }
57
58     protected void sleep(long sleepTimeMilliSeconds) {
59         try {
60             TimeUnit.MILLISECONDS.sleep(sleepTimeMilliSeconds);
61             if (!isRunning()) Thread.currentThread().interrupt();
62         } catch (InterruptedException e) {
63         }
64     }
65
66
67 }