chenjiahe
2023-07-17 d4fa5269e214e8bbe1f7dd79a922cefe9fc9916b
新增线程池工具类
1个文件已添加
131 ■■■■■ 已修改文件
src/main/java/com/hx/util/thread/ExecutorServiceTool.java 131 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/main/java/com/hx/util/thread/ExecutorServiceTool.java
New file
@@ -0,0 +1,131 @@
package com.hx.util.thread;
import lombok.Setter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * 创建线程池的,主要避免线程队列超,导致服务器爆了
 */
public class ExecutorServiceTool {
    /**log4j日志*/
    private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTool.class.getName());
    /**线程池-获取队列的*/
    private ThreadPoolExecutor threadPool;
    /**最大队列*/
    private int maxQueue;
    /**最大线程数*/
    private int maxTread;
    /**创建队列*/
    private ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>();
    public ExecutorServiceTool() {
    }
    public ExecutorServiceTool(int initTread, int maxQueue) {
        createThread(initTread,maxQueue);
    }
    public ExecutorServiceTool(int initTread, int maxTread, int maxQueue) {
        this.maxTread = maxTread;
        createThread(initTread,maxTread,maxQueue);
    }
    /**创建线程池
     * @param initTread 初始化线程池
     * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
     *                  线程池限制队列,因为限制了会报错,导致数据丢失
     * @return 线程池
     */
    public ThreadPoolExecutor createThread(int initTread,int maxQueue){
        this.threadPool = new ThreadPoolExecutor(initTread, initTread,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>());
        this.maxQueue = maxQueue;
        return this.threadPool;
    }
    /**创建线程池
     * @param initTread 初始化线程池
     * @param initTread 最大线程数
     * @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
     *                  线程池限制队列,因为限制了会报错,导致数据丢失
     * @return 线程池
     */
    public ThreadPoolExecutor createThread(int initTread,int maxTread,int maxQueue){
        this.threadPool = new ThreadPoolExecutor(initTread,maxTread,60L,TimeUnit.SECONDS,queue);
        this.maxQueue = maxQueue;
        return this.threadPool;
    }
    /**针对最大的队列,如果没有超过返回是false的,超过就返回是true的
     * 单返回true的时候,就不要传入队列了
     * @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回
     * @return 如果没有超过返回是false的,超过就返回是true的
     */
    public boolean noRund(Integer sleepMillisecond){
        if(threadPool.getQueue().size() > maxQueue){
            if(sleepMillisecond != null && sleepMillisecond > 0){
                try{
                    Thread.sleep(sleepMillisecond);
                }catch (Exception e){
                    logger.error("线程池开启睡眠失败!");
                }
            }
            return true;
        }else{
            return false;
        }
    }
    /**关闭*/
    public void shutdown(){
        this.threadPool.shutdown();
    }
    /**
     * 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
     * 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
     *
     * @author chenjiahe
     */
    @Setter
    private class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> {
        @Override
        public boolean offer(E e) {
            int activeThreadNum = threadPool.getActiveCount();
            if (activeThreadNum < maxTread) {
                return false;
            }
            return offerLast(e);
        }
    }
    /****************************************************************************************/
    public int getMaxQueue() {
        return maxQueue;
    }
    public void setMaxQueue(int maxQueue) {
        this.maxQueue = maxQueue;
    }
    public ThreadPoolExecutor getThreadPool() {
        return threadPool;
    }
    public void setThreadPool(ThreadPoolExecutor threadPool) {
        this.threadPool = threadPool;
    }
}