任务执行
大多数的并发程序都是围绕“任务执行”来构造的;任务通常是一些抽象的且离散的工作单元。通常把应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,以及提供一种自然的并行工作结构来提升并发性。
在线程中执行任务
当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。独立性有助于实现并发,因为存在足够多的处理资源,那么这些独立的任务都可以并行执行。为了在调度与负载均衡等过程中实现更高的灵活性,每项任务还应该表示应用程序的一小部分处理能力。
大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。Web服务器、邮件服务器、文件服务器、EJB容器以及数据库服务器等,这些服务器都能通过网络接受远程客户的连接请求。将独立的请求作为任务边界,既可以实现任务的独立性,又可以实现合理的任务规模。
串行地执行任务
在应用程序中可以通过多种策略来调度任务,而其中一些策略能够更好地利用潜在的并发性。最简单的策略就是在单个线程中串行地执行各项任务。这样并发效率根本不高。
显示地为任务创建线程
通过为每个请求创建一个线程来提供服务,从而实现更高的响应性。
无限制创建线程的不足
1.线程生命周期的开销非常高。
2.资源消耗:活跃的线程会消耗系统资源,尤其是内存。
3.稳定性:在可创建线程的数量上存在一个限制,这个限制视具体情况而定。
Executor框架
线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在java类库中,任务执行的主要抽象不是Thread,而是Executor。
虽然Executor是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础,该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable表示任务。Executor的实现还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
Executor基于生产者--消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。方法可以查看Executors类。其实它的JDK实现就是这么简单。
除了使用Executors这个工具类,我们还可以使用Executor的实现了,实在不行自己定义一个类实现Executor,加上队列实现生产者消费者模式。
执行策略
通过将任务的提交与执行解耦开来,从而无需太大的困难就可以为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的“What、Where、When、How”等方面,包括:
- 在什么(what)线程中执行任务
- 任务按照什么(what)顺序执行(FIFO、LIFO、优先级等)?
- 有多少个(how many)任务能并发执行?
- 在队列中有多少个(how many)任务在等待执行?
- 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个(which)任务?另外,如何(how)通知应用程序有任务被拒绝?
- 在执行一个任务之前或之后,应该进行哪些(what)动作?
各种执行策略都是一种资源管理工具,最佳策略取决于可用的计算资源以及对服务质量的需求。通过限制并发任务的数量,可以确保应用程序不会由于资源耗尽而失败,或者由于在稀缺资源上发生竞争而严重影响性能。通过将任务的提交与任务的执行策略分离开来,有助于在部署阶段选择与可用硬件资源最匹配的执行策略。
线程池
线程池和工作对列密切相关,其中在工作队列中保存了所有等待执行的任务。工作者线程的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。在线程池中执行比每个任务分配一个线程优势更多。通过重用现有的线程而不是创建线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。另外一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待线程创建而延迟任务的执行,从而提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。
类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态方法之一来创建一个线程池:
- newFixedThreadPool
- newCachedThreadPool:没有限制,会缓存。
- newSingleThreadExecutor
- newScheduledThreadPool:
从“为每任务分配一个线程”策略变成基于线程池的策略,将对应用程序的稳定性产生重大影响:web服务器不会再在高负载的情况下失败。由于服务器不会创建数千个线程来挣夺有限的CPU和内存资源,因此服务器的性能将平缓地降低。通过使用Executor,可以实现各种调优、管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。
Executor的生命周期
Executor的线程都是非守护线程,所以如果Executor不能安全退出,JVM就不能安全退出。
由于Executor以异步的方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。当关闭应用程序时,可能采用最平缓的关闭形式(完成所有已经启动的任务,并且不再接受任何新的任务),也可以采用粗暴的关闭形式(直接关闭电源),以及其他可能的形式。既然Executor是为应用程序提供服务的,因而他们是可关闭的,并将在关闭操作中受影响的任务的状态反馈给应用程序。
为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。
ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成---包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程。
在ExecutorService关闭后提交的任务,将抛出RejectedExecutionException。等所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown方法,从而产生同步地关闭ExecutorService的效果。
延迟任务与周期任务
Timer类负责管理延迟任务(“在100ms后执行该任务”)以及周期任务(“没10ms执行一次该任务”)。然而,Timer存在一些缺陷,因此应该考虑使用ScheduledThreadPoolExecutor来代替它。可以通过ScheduledThreadPoolExecutor的构造函数或newScheduledThreadPool工厂方法来创建该类的对象。
Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。线程池能弥补这个缺陷,它可以提供多个线程来执行延时任务和周期任务。
Timer的另一个问题是,如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。Timer线程并不捕获异常,因此当TimerTask抛出未检异常时,将终止定时线程。这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。因此,已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度。(这个问题称为“线程泄漏”)。
如果要构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。
找出可利用的并行性
Executor框架帮助指定执行策略,但如果要使用Executor,必须将任务表述为一个Runnable。在大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。但有时候,任务边界并非是显而易见的,例如在很多桌面应用程序中。即使是服务器应用程序,在单个客户请求中任可能存在可发掘的并行性。
携带结果的任务Callable与Future
Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值 或抛出一个受检异常。
许多任务实际上都是存在延迟的计算--执行数据库的查询,从网络上获取资源,或者计算某个复杂的功能。对于这些任务,Callable是一种更好的抽象:它认为主入口点(即call)将返回一个值,并可能抛出一个异常。在Executor中包含了一些辅助方法能将其他类型的任务封装为一个Callable,例如Runnable和java.security.PrivilegedAction.
Runnable和Callable描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。由于有些任务可能要执行很长时间,因此通常希望能够取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或者取消,以及获取任务的结果和取消任务等。在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,就像ExecutorService的生命周期一样。当某个任务完成后,它永远停留在“完成”状态上。
get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。
可以通过许多方法创建一个Future来描述任务。ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获得任务的执行结果或者取消任务。还可以显式地为某个指定的Runnable或Callable实例化一个FutureTask。(由于FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用它的run方法。)
从Java6开始,ExecutorService实现可以改写AbstractExecutorService中的newTaskFor方法,从而根据已提交的Runnable或Callable来控制Future的实例化过程。在默认实现中仅创建了一个新的FutureTask。
在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程,即将Runnable或Callable从提交线程发布到最终执行任务的线程。类似地,在设置Future结果的过程中也包含了一个安全发布,即将这个结果从计算它的线程发布到任何通过get获得它的线程。
在异构任务并行化中存在的局限
如果分的任务不是细颗粒度的,一个任务任然很艰巨那么和串行执行没啥区别,还增加了并发的复杂度。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能的提升。
CompletionService:Executor与BlockingQueue
如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:完成服务(CompletionService)。
CompletionService将Executor和BlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
ExecutorCompletionService的实现非常简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用FutureTask中的done方法。当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的子类,然后再改写子类的done方法,并将结果放入BlockingQueue中。(注意这里不是采用轮询的方式了)。
为任务设置时限
在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的Future.get中支持这种需求:当结果可用时,它将立即返回,如果在指定的时间内没有计算出结果,那么将抛出TimeoutException。
在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后终止执行或取消任务。此时可再次使用Future,如果一个限时的get方法抛出了TimeoutException,那么可以通过Future来取消任务。如果编写的任务是可取消的,那么可以提前终止它,以免消耗过多的资源。