线程池 1、为什么使用线程池 Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行的任务的程序都可以使用线程池。
线程池的使a用能够带来三个好处:
降低资源消耗
。通过重复利用已创建的线程降低线程创建和销毁的消耗。
提高响应速度
。当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性
。线程是稀缺资源,如果无限制的创建,会消耗资源,降低系统的稳定性。
2、线程池的实现原理
当线程池提交一个任务之后,线程池是如何处理这个任务的?
从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:
线程池判断核心线程池
里的线程是否都在执行任务。如果不是,则创建一个新的工作
线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
线程池判断阻塞队列
是否已经满。如果阻塞队列没有满,则将新提交的任务存储在这
个阻塞队列里。如果阻塞队列满了,则进入下个流程。
线程池判断线程池
的线程是否都处于工作状态。如果没有,则创建一个新的工作线程
来执行任务。如果已经满了,则交给饱和策略
来处理这个任务。
对于:ThreadPoolExecutor线程池的原理图如下:
ThreadPoolExecutor执行execute方法分下面4种情况。
如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤 需要获取全局锁)。
如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
RejectedExecutionHandler.rejectedExecution()方法
4、自定义线程池 1、拒绝策略 1 2 3 4 @FunctionalInterface interface RejectPolicy <T> { void reject (BlockingQueue<T> queue, T task) ; }
2、自定义阻塞队列 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 class BlockingQueue <T> { private Deque<T> queue = new ArrayDeque <>(); private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); public BlockingQueue (int capacity) { this .capacity = capacity; } private int capacity; public T take () { lock.lock(); try { while (queue.isEmpty()){ try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T removeFirst = queue.removeFirst(); fullWaitSet.signal(); return removeFirst; }finally { lock.unlock(); } } public void put (T element) { lock.lock(); try { while (queue.size() == capacity){ try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.addLast(element); emptyWaitSet.signal(); }finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); }finally { lock.unlock(); } } public T poll (long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()){ try { if (nanos<=0 ){ return null ; } nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); return t; }finally { lock.unlock(); } } public boolean offer (T task,long timeout,TimeUnit timeUnit) { lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while (queue.size() == capacity){ try { if (nanos <= 0 ){ return false ; } nanos = fullWaitSet.awaitNanos(nanos); }catch (InterruptedException e){ e.printStackTrace(); } } queue.addLast(task); emptyWaitSet.signal(); return true ; }finally { lock.unlock(); } } public void tryPut (RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if (queue.size() == capacity){ rejectPolicy.reject(this ,task); }else { queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }
3、自定义简单的线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 class ThreadPool { private BlockingQueue<Runnable> taskQueue; private HashSet<Worker> workers = new HashSet <>(); private int coreSize; private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQueue.poll(timeout,timeUnit)) != null ){ try { task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null ; } } synchronized (workers){ workers.remove(this ); } } } public ThreadPool (int coreSize,long timeout,TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .timeout = timeout; this .timeUnit = timeUnit; taskQueue = new BlockingQueue <>(queueCapacity); this .rejectPolicy = rejectPolicy; } public void execute (Runnable task) { synchronized (workers){ if (workers.size() < coreSize){ Worker worker = new Worker (task); workers.add(worker); worker.start(); }else { taskQueue.tryPut(rejectPolicy,task); } } } }
3、ThreadPoolExecutor
1、线程池状态 ThreadPoolExecutor 使用int的高三位来表示线程池的状态,低29位表示线程数量。
状态名
高3位
接受新任务
处理阻塞队列任务
说明
RUNNING
111
Y
Y
SHUTDOWN
000
N
Y
不会接受新任务,但会处理阻塞队列剩余任务
STOP
001
N
N
会中断正在执行的任务,并抛弃阻塞队列任务
TIDYING
010
-
-
任务全执行完毕,活动线程为0即将进入终结
TERMINATED
011
-
-
终结状态
将这些信息存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次CAS操作进行赋值。
1 2 3 4 5 6 7 8 ctl compareAndSet (c,ctlof(targetState,workerCountof(c) ));private static int ctlof (int rs,int wc) { return rs | wc; }
2、构造方法 1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize 核心线程数目 (最多保留的线程数)
maximumPoolSize 最大线程数目
keepAliveTime 生存时间 - 针对救急线程
unit 时间单位 - 针对救急线程
workQueue 阻塞队列
threadFactory 线程工厂 - 可以为线程创建时起个好名字
handler 拒绝策略
线程池中的最大线程数 = 核心线程数 + 救急线程数
3、使用
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到 corePoolSize
并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue
队列排队,直到有空闲的线程。
如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize
数目的线程来救急。
如果线程到达 maximumPoolSize
仍然有新任务这时会执行拒绝策略。
当高峰过去后,超过corePoolSize
的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime
和 unit
来控制。
4、拒绝策略 拒绝策略 JDK提供了 4 种实现,其它著名框架也提供了实现
AbortPolicy
让调用者抛出 RejectedExecutionException
异常,这是默认策略
CallerRunsPolicy
让调用者运行任务
DiscardPolicy
放弃本次任务
DiscardOldestPolicy
放弃队列中最早的任务,本任务取而代之
Dubbo
的实现,在抛出 RejectedExecutionException
异常之前会记录日志,并 dump 线程栈信息,方便定位问题
Netty
的实现,是创建一个新线程来执行任务
ActiveMQ
的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
PinPoint
的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
4、几种常见的线程池 1、newFixedThreadPool(固定数目线程的线程池) 1 2 3 4 5 public static ExecutorService new FixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0 L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
特点:
工作机制:
提交任务
如果线程数少于核心线程,创建核心线程执行任务
如果线程数等于核心线程,把任务添加到LinkedBlockingQueue
阻塞队列
如果线程执行完任务,去阻塞队列取任务,继续执行。
适用于任务量已知,相对耗时的任务
2、newCachedThreadPool(可缓存线程的线程池) 1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
特点:
核心线程数是0
,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是60s。
全部都是救急线程(60s 后可以回收)。救急线程可以无限创建
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的
工作机制:
提交任务
因为没有核心线程,所以任务直接加到SynchronousQueue
队列。
判断是否有空闲线程,如果有,就去取出任务执行。
如果没有空闲线程,就新建一个线程执行。
执行完任务的线程,还可以存活60秒,如果在这期间,接到任务,可以继续活下去;否则,被销毁。
当提交任务的速度大于处理任务的速度时,每次提交一个任务,就必然会创建一个线程。
极端情况下会创建过多的线程,耗尽 CPU 和内存资源。
由于空闲 60 秒的线程会被终止,长时间保持空闲的 CachedThreadPool
不会占用任何资源。
适合任务数比较密集,但每个任务执行时间较短的情况
3、newSingleThreadExecutor(单线程的线程池) 1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
特点:
线程数固定为 1。
阻塞队列是LinkedBlockingQueue
keepAliveTime为0
工作流程:
提交任务
线程池是否有一条线程在,如果没有,新建线程执行任务
如果有,将任务加到阻塞队列
当前的唯一线程,从队列取任务,执行完一个,再继续取,一条线程夜以继日地干活。
newSingleThreadExecutor
和我们自己创建一个线程的区别:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施
,而线程池还会新建一个线程,保证池的正常工作。
newSingleThreadExecutor
和Executors.newFixedThreadPool(1)
的区别
适合希望多个任务排队执行。
4、newScheduledThreadPool(定时及周期执行的线程池) 1 2 3 4 public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }
特点:
最大线程数为Integer.MAX_VALUE
阻塞队列是DelayedWorkQueue
keepAliveTime为0
scheduleAtFixedRate() :按某种速率周期执行
scheduleWithFixedDelay():在某个延迟后执行
工作机制:
提交任务
线程池中的线程从 DelayQueue 中取任务
线程从 DelayQueue 中获取 time 大于等于当前时间的task
执行完后修改这个 task 的 time 为下次被执行的时间
这个 task 放回DelayQueue队列中
适用于周期性执行任务的场景,需要限制线程数量的场景
5、线程池的工作队列 1、ArrayBlockingQueue ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。
2、LinkedBlockingQueue LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务。
容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列
3、DelayQueue DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列。
4、PriorityBlockingQueue PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列
5、SynchronousQueue SynchronousQueue(同步队列)一个不存储元素的阻塞队列
,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。
6、提交任务 1、execute 1 void execute (Runnable command )
1 2 3 4 5 6 7 8 9 10 11 12 13 ExecutorService pool = Executors.newFixedThreadPool(2 ); pool.execute(new Runnable () { @Override public void run () { try { Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行任务" ); } });
2、submit 1 2 <T> Future<T> submit (Callable<T> task) ;
1 2 3 4 5 6 7 8 9 10 ExecutorService pool = Executors.newFixedThreadPool(2 ); Future<String> future = pool.submit(new Callable <String>() { @Override public String call () throws Exception { System.out.println("执行任务" ); Thread.sleep(10000 ); return "ok" ; } });
3、invokeAll 1 2 3 <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 ExecutorService pool = Executors.newFixedThreadPool(2 ); List<Future<String>> invokeAll = pool.invokeAll(Arrays.asList( () -> { Thread.sleep(1000 ); return "1" ; }, () -> { Thread.sleep(2000 ); return "2" ; }, () -> { Thread.sleep(3000 ); return "3" ; } )); invokeAll.forEach(f -> { try { System.out.println(f.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } });
4、invokeAny 1 2 3 <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 ExecutorService pool = Executors.newFixedThreadPool(2 );String any = pool.invokeAny(Arrays.asList( () -> { Thread.sleep(10000 ); return "1" ; }, () -> { Thread.sleep(5000 ); return "2" ; }, () -> { return "3" ; } )); System.out.println(any);
7、关闭线程池 1、shutdown
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
2、shutdownNow 1 2 3 4 5 6 7 List<Runnable> shutdownNow () ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
3、其它方法 1 2 3 4 5 6 7 8 9 boolean isShutdown () ;boolean isTerminated () ;boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException;