线程池的使用
在任务与执行策略之间的隐性耦合
Executor框架可以将任务的提交与任务的执行策略解耦开来(就是独立化)。虽然Executor框架为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能适用所有的执行策略。比如:
- 依赖性任务 : 比如依赖于执行时序,执行结果或者其他效果,那么任务就带有隐含的依赖性。此时必须小心地维持这些执行策略以避免产生活跃性问题(死锁等造成执行困难的问题)
- 使用线程封闭机制的任务:与线程池相比,单线程的Executor能够对并发性做出更强的承诺,它们能确保任务不会并发地执行。
- 对响应时间敏感的任务:比如GUI任务。如果将一个运行时间较长的任务提交到单线程的Executor中。或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么会降低该 Executor的响应性。
- 使用ThreadLocal的任务:ThreadLocal使每个线程都可以拥有某个变量的私有版本,然而。只要条件允许Executor可以自由地重用这些线程。在标准的Executor中,当执行需求较低时将回收空间线程,而当需求增加时将添加新的线程,并且如果从任务 中招聘了一个未检查异常,那么 将用一个新的线程去替代 这个发生问题的线程。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义 ,而在线程池的线程中不应该使用Threadlocal在任务之间传递值。
最后,只有当任务都是同类型并且相互独立时,线程池的性能才能达到最佳。如果 将运行时间相差较大的任务放在同一个线程池中,那么除非线程池足够大,否则就会造成阻塞。一般的基于网络的典型服务器应用 程序 中,如网页服务器,邮件服务器和文件服务器它们的请求都是同类型并且相互独立的 。
记得讲你的执行策略写入文档,那么 将来的维护人员就不会由于使用了错误的执行策略而破坏了安全性和活跃性。
线程饥饿死锁
每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程“饥饿”死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。
除了线程池大小的显式限制外,还有很多由于外部资源限制而导致的死锁,比如jdbc限制有10个连接 ,那么 每个线程都需要一个连接 ,已经有十个线程在线程池内运行,那么接下来的线程虽然 已经在线程池内了,可是因为无法取得与数据库的连接而阻塞。
运行时间较长的任务
如果任务阻塞时间过长,那么即使不出现死锁 ,线程池的响应性也会变得糟糕。Java早就准备好了解决问题的方法 。大多数可阻塞的方法中java的库内都有限时版本和不限时版本,比如Thread.join、Blockingqueue.put、CountDownLatch.await以及Select.select等。如果等待超时,可以把任务标识为失败,然后中止任务或者将任务重新放回队列,甚至可以加上计数器。如果几次失败仍如此那么 再中止它。如果线程池内总是充满了被阻塞的任务,那么也可能说明是线程池的规模过小。
设置线程池的大小
如果线程池过大,那么可能会耗尽资源 ,如果过小,那么 将导致许多空闲的处理器无法工作,从而降低吞吐率。
要设置正确的线程池大小,需要分析计算环境,资源预算和任务的特性,cpu数量,内存大小,任务是计算密集型还是I/O密集型,还是二者皆可。它们是否需要像JDBC连接这样的稀缺资源,下面给出一个计算公式:
N(threads)=N(cpu)*U(cpu)*(1+w/c);
N(threads)是最后得到的结果大小 。
N(cpu)是cpu数量,我的电脑是双核四线程,cpu的数量会是4,可以通过 System.out.println(Runtime.getRuntime().availableProcessors());来得到cpu数量
U(cpu)是目标cpu使用率,取决于程序员的期望,一般在50%左右。这个值限制在0到1之间
w/c是wait time/compute time的比率。
配置ThreadPoolExecutor
ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由Executors中的newCachedThreadPool,newFixedThreadPool和newScheduledThreadExecutor等工厂方法返回的。ThreadPoolExecutor是一个灵活的、稳定的线程池,允许各种定制。
Executors里的工厂方法不满足需求,你可以直接使用类来创建。它的构造函数如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
我们最常用的newCachedThreadPool。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
线程的创建与销毁
线程池的基本大小、最大大小以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可以同时活动的线程数量上限。如果某个线程的空闲时间超过了存活的时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数指定的值,而且创建的线程池不会超时。newCachedThreadPool将最大大小设置为Integer.MAX_VALUE,基本大小设为0,超时设置1min。
管理队列任务
根据线程池的大小来选择合适的队列有利于充分利用资源和防止耗尽资源。
如果无限制地创建线程,那么将导致不稳定性,并通过采用固定大小的线程池(而不是每一个请求就创建一个线程)来解决这个问题。然而,这个方案并不完整。在高负载的情况下,应用程序仍可能耗尽资源,只是出现问题的概率较小。如果新请求的达到速率超过了线程池的处理速率,那么新来的请求将累积起来。在线程池中,这些请求会在一个由Executor管理的Runnable队列中等待,而不会像线程那样去竞争CPU资源。通过一个Runnable和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但是如果客户提交给服务器请求的速率超过了服务器的处理速率,那么仍可能耗尽资源。
ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本上的任务排队方法有3种:
- 无界队列
- 有界队列
- 同步移交
newFixedThreadPool和newSingleThreadPool在默认情况下将使用一个无界的LinkedBlockingQueue。当所有线程都在忙碌状态时,这个队列将无限制的增加。
一种更稳妥的资源管理策略是使用有界队列:如ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生。但是满了后,新的任务怎么办?(有许多饱和策略可以解决这个问题)。
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队。以及直接将任务从生产者移交到工作者中。它并不是一个真正的队列 ,而一种在线程之交进行移交的机制 。要将一个元素放入SynchronousQueue中,必须有一个线程在等待接受这个元素,如果没有线程在等待,并且线程池当前大小为最大值的时候 ,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将会被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是首先放在队列中,然后由工作线程从队列中提取任务。在newCachedThreadPool中就是使用了SynchronousQueue。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。这种差异是因为使用了SynchronousQueue而不是LinkedBlockingQueu,在Java6中便一个新的非阻塞算法来替代Java5的算法,该算法使用它们吞吐量提高了3倍。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池。比如接受网络客户请求的服务器应用程序。
饱和策略
ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。
AbortPolicy是默认的饱和策略,就是中止任务,该策略将抛出RejectedExecutionException。调用者可以捕获这个异常然后去编写代码处理异常。
当新提交的任务无法保存到队列中等待执行时,DiscardPolicy会稍稍的抛弃该任务,DiscardOldestPolicy则会抛弃最旧的(下一个将被执行的任务),然后尝试重新提交新的任务。如果工作队列是那个优先级队列时,搭配DiscardOldestPolicy饱和策略会导致优先级最高的那个任务被抛弃,所以两者不要组合使用。
CallerRunsPolicy是“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了execute的线程中执行该任务.比如WebService在线程满了后,新任务将交由调用线程池execute方法的主线程执行,而由于主线程在忙碌,所以不会执行accept方法,从而实现了一种平缓的性能降低。
当工作队列被填满后,没有预定义的饱和策略来阻塞execute(除了抛弃就是中止还有去让调用者去执行),但这不并不能阻止任务被提交。然而可以通过Semaphore来限制任务的到达率,就可以实现这个功能。
线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完善的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息,通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory中只定义了一个方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。
然而,在很多情况下都需要使用定制的线程工厂方法。需要定制线程工厂方法的情景 :
- 需要为线程池里面的线程指定 个UncaughtExceptionHandler
- 实例化一个定制的Thread类执行调试信息的记录
- 需要修改线程的优先级或者守护线程的状态(这建设使用这两个功能,线程优先级会增加平台依赖性,并且导致活跃性问题,在大多数并发应用程序中,都可以使用默认的线程优先级)
- 只是希望给线程起个有意义的名字,用来解释线程的转储信息和错误日志
在调用构造函数后再定制ThreadPoolExecutor
在使用ThreadPoolExecutor的构造函数之后,仍然可以通过Set方法来配置线程池。
ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) exec).setCorePoolSize(10);
} else
throw new AssertionError("bad assuption");
}
在Executors中包含一个unconfigurableExecutorService工厂方法, 该方法对一个现有的ExecutorService进行包装,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。对于单线程Executor理论上是不要修改Executor的大小,因为这个单线程Executor是相悖的。
你可以在自己的Executor中使用这项技术以防止执行策略被修改。如果将ExecutorService暴露给不信任的代码,又不希望对其进行修改,就可以通过unconfigurableExecutorService来包装它。
如果我们不想创建出来的线程池被再次修改,可以用这个方法把它包装起来,变成“final"类型.
扩展ThreadPoolExecutor
ThreadPoolExecutor是可扩展的,它提供了几个可以在子类中改写的方法:beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。比如我们可以为线程添加统计信息,重载BeforeExecute和AfterExecute方法用于计时等操作。
递归算法的并行化
其实很简单,对循环内的每个执行都用线程池来解决就好了。(这估计需要创建不少线程吧!所以必须使用线程池)
void processInParallel(Executor exec, List<Element> element) {
for (final Element e : element) {
exec.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
process(e);
}
});
}
}
同时JDK也提供一个专门用于递归的并发计算框架fork-join,用于解决可以分解为固定的独立小问题。比如快递排序,我做过一个测试。当数据量达到10000的时候可以达到近三倍的计算速度