package com.hx.util.thread;
|
|
import lombok.Setter;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.util.concurrent.LinkedBlockingDeque;
|
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.TimeUnit;
|
|
|
/**
|
* 创建线程池的,主要避免线程队列超,导致服务器爆了
|
*/
|
public class ExecutorServiceTool {
|
|
/**log4j日志*/
|
private static final Logger logger = LoggerFactory.getLogger(ExecutorServiceTool.class.getName());
|
|
/**线程池-获取队列的*/
|
private ThreadPoolExecutor threadPool;
|
/**最大队列*/
|
private int maxQueue;
|
/**最大线程数*/
|
private int maxTread;
|
/**创建队列*/
|
private ThreadTaskLinkedBlockingDeque<Runnable> queue = new ThreadTaskLinkedBlockingDeque<>();
|
|
public ExecutorServiceTool() {
|
}
|
|
public ExecutorServiceTool(int initTread, int maxQueue) {
|
createThread(initTread,maxQueue);
|
}
|
|
public ExecutorServiceTool(int initTread, int maxTread, int maxQueue) {
|
this.maxTread = maxTread;
|
createThread(initTread,maxTread,maxQueue);
|
}
|
|
|
/**创建线程池
|
* @param initTread 初始化线程池
|
* @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
|
* 线程池限制队列,因为限制了会报错,导致数据丢失
|
* @return 线程池
|
*/
|
public ThreadPoolExecutor createThread(int initTread,int maxQueue){
|
this.threadPool = new ThreadPoolExecutor(initTread, initTread,
|
60L, TimeUnit.SECONDS,
|
new LinkedBlockingQueue<Runnable>());
|
this.maxQueue = maxQueue;
|
return this.threadPool;
|
}
|
|
/**创建线程池
|
* @param initTread 初始化线程池
|
* @param initTread 最大线程数
|
* @param maxQueue 最大队列新增队列,注意:这里只做传值,不限制真对
|
* 线程池限制队列,因为限制了会报错,导致数据丢失
|
* @return 线程池
|
*/
|
public ThreadPoolExecutor createThread(int initTread,int maxTread,int maxQueue){
|
this.threadPool = new ThreadPoolExecutor(initTread,maxTread,60L,TimeUnit.SECONDS,queue);
|
this.maxQueue = maxQueue;
|
return this.threadPool;
|
}
|
|
/**循环校验队列数量模式,针对最大的队列,返回true就是可以加入队列
|
* 避免死循环,设置了循环校验次数
|
* @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回
|
* @param frequency 循环校验次数,空值默认100次数
|
* @return 最后都是返回true
|
*/
|
public boolean whileCheckQueue(Integer sleepMillisecond,Integer frequency){
|
if(frequency == null){
|
frequency = 100;
|
}
|
while (frequency>0){
|
if(!noRund(sleepMillisecond)){
|
return true;
|
}
|
frequency--;
|
}
|
return true;
|
}
|
|
/**针对最大的队列,如果没有超过返回是false的,超过就返回是true的
|
* 单返回true的时候,就不要传入队列了
|
* @param sleepMillisecond 睡眠,毫秒秒,如果是空的,那么直接返回
|
* @return 如果没有超过返回是false的,超过就返回是true的
|
*/
|
public boolean noRund(Integer sleepMillisecond){
|
if(threadPool.getQueue().size() > maxQueue){
|
if(sleepMillisecond != null && sleepMillisecond > 0){
|
try{
|
Thread.sleep(sleepMillisecond);
|
}catch (Exception e){
|
logger.error("线程池开启睡眠失败!");
|
}
|
}
|
return true;
|
}else{
|
return false;
|
}
|
}
|
|
/**关闭*/
|
public void shutdown(){
|
this.threadPool.shutdown();
|
}
|
|
/**
|
* 自定义线程任务阻塞队列. 在活跃线程数小于最大支持线程数的情况下,新任务不放到队列从而激发线程池创建新线程及时处理.
|
* 解决使用LinkedBlockingDeque无限队列,线程池只有核心线程在处理。maximumPoolSize未启作用的问题。
|
*
|
* @author chenjiahe
|
*/
|
@Setter
|
private class ThreadTaskLinkedBlockingDeque<E> extends LinkedBlockingDeque<E> {
|
@Override
|
public boolean offer(E e) {
|
int activeThreadNum = threadPool.getActiveCount();
|
if (activeThreadNum < maxTread) {
|
return false;
|
}
|
return offerLast(e);
|
}
|
}
|
|
/****************************************************************************************/
|
|
public int getMaxQueue() {
|
return maxQueue;
|
}
|
|
public void setMaxQueue(int maxQueue) {
|
this.maxQueue = maxQueue;
|
}
|
|
public ThreadPoolExecutor getThreadPool() {
|
return threadPool;
|
}
|
|
public void setThreadPool(ThreadPoolExecutor threadPool) {
|
this.threadPool = threadPool;
|
}
|
|
}
|