java.util.concurrent 中的 Executor, Future, Callable 作为封装异步行为的先驱,不可不查。
Executor
|
ExecutorService <---------------------------------- AbstractExecutorService
| |
| ThreadPoolExecutor
| |
ScheduledExecutorService <------------------ ScheduledThreadPoolExecutor
-------------------
Executor ----------------------
Executor, 用于执行提交的任务(submitted Runnable tasks), 将任务的提交方式与运行方式独立开。
Executor executor = anExecutor;
executor.execute(new Task1());
executor.execute(new Task2());
不同的 anExecutor,可以有不同的运行方式,比如:直接运行、异步运行、后台开一个 thread pool 来跑等等。
Executor 只提供了一个 method:
void execute(Runnable command)
--------------------
ExecutorService ---------------------
ExecutorService 在 Executor 基础上,加入了状态管理,比如:
awaitTermination(), 等待所有 task 完成
invokeAll(), 执行所有 task, 全部完成or超时, 则返回
invokeAny(), 执行所有 task, 任何一个task完成or超时, 则返回。其他所有任务则 cancel 掉。
shutdown(), shutdownNow(), 停止当前的任务
submit(), 开始执行所提交的 task, 返回一个 Future<T>, 用于查看 task 的状态。
Future<T> 作为对 task 的状态管理,可以 isDone(), cancel()
T get(), T get(long timeout, TimeUnit unit)
调用 get() 会让当前的 thread block 住,等待 Future<T> 对应的 task 完成
invokeXXX() 是等待所有任务完成才返回 Future<T>, 而 submit() 是直接
返回 Future<T>, 让调用者有权 cancel() 对应的 task。
<T> Future<T> ExecutorService.submit(Callable<T> task)
==> T Future.get()
其实 T 就是 task 开始前,我们传入的 arg, 当 task 结束后,我们可根据 arg 来做点事情。这和传统的异步任务处理方式 callback 类似。
传统的做法:
void callback_func(void *arg) { ... }
void main() {
executor.set_task(task, callback_func, arg);
}
Executor 的做法:
f = executor.submit(task) // 把 arg 放在 task 里面
arg = f.get() // block 住,task 结束返回,取得 arg
把异步的行为,弄成看起来同步,仅此而已。
--------------------- 使用的例子 --------------------
imp
ort java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
class MyFooCall implements Callable<String> {
final String arg;
public MyFooCall(String arg) {
this.arg = arg;
}
public String call() {
System.out.println("MyCall.call(), arg = " + this.arg);
return this.arg;
}
}
class MyHello {
public static void main(String[] args) {
try {
final ExecutorService pool = Executors.newFixedThreadPool(4);
Future<String> f = pool.submit(new MyFooCall("I'm arg!"));
System.out.println("done: " + f.isDone());
String arg = f.get();
System.out.println("done! arg = " + arg);
pool.shutdown();
} catch (Exception e) {
} finally {
}
}
}
------------------- ScheduledExecutorService ----------------------
ScheduledExecutorService 在 ExecutorService 基础上,加强了控制力度,可以设定“多久后执行"or"每间隔多久执行"这样的行为。
schedule(Runnable command, long delay, TimeUnit unit)
过了一段时间后,执行一次。
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
initialDelay 之后开始执行,以后每 period 执行一次
下面是 jdk doc 里面的例子。
----------------------
import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
public void beepForAnHour() {
final Runnable beeper = new Runnable() {
public void run() { System.out.println("beep"); }
};
final ScheduledFuture<?> beeperHandle =
scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
scheduler.schedule(new Runnable() {
public void run() { beeperHandle.cancel(true); }
}, 60 * 60, SECONDS);
}
}
------------------- AbstractExecutorService ------------------
ExecutorService 的实现,其中的 protected newTaskFor() 为 submit() 创建内部需要的 TaskObj。
因为 Executor.execute() 需要的是 Runnable, 而 ExecutorService.submit() 等的返回值为 Future,
所以 RunnableFuture 应运而生。FutureTask 就作为 Runnable Future 的标准实现。
Future<V>
|
RunnableFuture<V> <-- FutureTask<V>
RunnableFuture 结合了 Runnable, Future<V> 的两大功能,Runnable 用于定义 task 内容本身,而 Future
则控制了任务何时结束,以及 callback 返回值。
AbstractExecutorService 还只是abstract class,主要定义了 invokeAll(), submit() 等行为。而 execute() 的行为,则交给子类 ThreadPoolExecutor 去实现。
评论