Andru
2023-10-27 ff57ace362af07655f54eea90c4d2f63b05f5b66
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.hx.util.thread;
 
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
 
/**
 * 创建线程池的,主要避免线程队列超,导致服务器爆了
 */
public class ExecutorServiceTool {
 
    /**log4j日志*/
    private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTool.class.getName());
 
    /**线程池-获取队列的*/
    private ThreadPoolExecutor threadPool;
    /**最大队列*/
    private int maxQueue;
    /**最大线程数*/
    private int maxTread;
    /**创建队列*/
    private ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>();
 
    public ExecutorServiceTool() {
    }
 
    public ExecutorServiceTool(int initTread, int maxQueue) {
        createThread(initTread,maxQueue);
    }
 
    public ExecutorServiceTool(int initTread, int maxTread, int maxQueue) {
        this.maxTread = maxTread;
        createThread(initTread,maxTread,maxQueue);
    }
 
 
    /**创建线程池
     * @param initTread 初始化线程池
     * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
     *                  线程池限制队列,因为限制了会报错,导致数据丢失
     * @return 线程池
     */
    public ThreadPoolExecutor createThread(int initTread,int maxQueue){
        this.threadPool = new ThreadPoolExecutor(initTread, initTread,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>());
        this.maxQueue = maxQueue;
        return this.threadPool;
    }
 
    /**创建线程池
     * @param initTread 初始化线程池
     * @param initTread 最大线程数
     * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
     *                  线程池限制队列,因为限制了会报错,导致数据丢失
     * @return 线程池
     */
    public ThreadPoolExecutor createThread(int initTread,int maxTread,int maxQueue){
        this.threadPool = new ThreadPoolExecutor(initTread,maxTread,60L,TimeUnit.SECONDS,queue);
        this.maxQueue = maxQueue;
        return this.threadPool;
    }
 
    /**针对最大的队列,如果没有超过返回是false的,超过就返回是true的
     * 单返回true的时候,就不要传入队列了
     * @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回
     * @return 如果没有超过返回是false的,超过就返回是true的
     */
    public boolean noRund(Integer sleepMillisecond){
        if(threadPool.getQueue().size() > maxQueue){
            if(sleepMillisecond != null && sleepMillisecond > 0){
                try{
                    Thread.sleep(sleepMillisecond);
                }catch (Exception e){
                    logger.error("线程池开启睡眠失败!");
                }
            }
            return true;
        }else{
            return false;
        }
    }
 
    /**关闭*/
    public void shutdown(){
        this.threadPool.shutdown();
    }
 
    /**
     * 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
     * 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
     *
     * @author chenjiahe
     */
    @Setter
    private class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> {
        @Override
        public boolean offer(E e) {
            int activeThreadNum = threadPool.getActiveCount();
            if (activeThreadNum < maxTread) {
                return false;
            }
            return offerLast(e);
        }
    }
 
    /****************************************************************************************/
 
    public int getMaxQueue() {
        return maxQueue;
    }
 
    public void setMaxQueue(int maxQueue) {
        this.maxQueue = maxQueue;
    }
 
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }
 
    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = threadPool;
    }
 
}