登录  
 加关注
   显示下一条  |  关闭
温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!立即重新绑定新浪微博》  |  关闭

Code@Pig Home

喜欢背着一袋Code傻笑的Pig .. 忧美.欢笑.记忆.忘却 .之. 角落

 
 
 

日志

 
 

[java][concurrent] Executor, Task, and Future  

2009-07-07 08:45:36|  分类: lang_java |  标签: |举报 |字号 订阅

  下载LOFTER 我的照片书  |
java.util.concurrent 中的 Executor, Future, Callable 作为封装异步行为的先驱,不可不查。

      Executor
              |
   ExecutorService  <---------------------------------- AbstractExecutorService
              |                                                                              |
              |                                                               ThreadPoolExecutor
              |                                                                              |
 ScheduledExecutorService  <------------------ ScheduledThreadPoolExecutor

------------------- Executor ----------------------
Executor, 用于执行提交的任务(submitted Runnable tasks), 将任务的提交方式与运行方式独立开。

Executor executor = anExecutor;
executor.execute(new Task1());
executor.execute(new Task2());

不同的 anExecutor,可以有不同的运行方式,比如:直接运行、异步运行、后台开一个 thread pool 来跑等等。

Executor 只提供了一个 method:
void execute(Runnable command)

-------------------- ExecutorService ---------------------
ExecutorService 在 Executor 基础上,加入了状态管理,比如:
awaitTermination(), 等待所有 task 完成
invokeAll(), 执行所有 task, 全部完成or超时, 则返回
invokeAny(), 执行所有 task, 任何一个task完成or超时, 则返回。其他所有任务则 cancel 掉。
shutdown(), shutdownNow(), 停止当前的任务
submit(), 开始执行所提交的 task, 返回一个 Future<T>, 用于查看 task 的状态。

Future<T> 作为对 task 的状态管理,可以 isDone(), cancel()
T get(), T get(long timeout, TimeUnit unit)
调用 get() 会让当前的 thread block 住,等待 Future<T> 对应的 task 完成

invokeXXX() 是等待所有任务完成才返回 Future<T>, 而 submit() 是直接
返回 Future<T>, 让调用者有权 cancel() 对应的 task。

<T> Future<T> ExecutorService.submit(Callable<T> task)
  ==> T Future.get()
其实 T 就是 task 开始前,我们传入的 arg, 当 task 结束后,我们可根据 arg 来做点事情。这和传统的异步任务处理方式 callback 类似。

传统的做法:
void callback_func(void *arg) { ... }
void main() {
    executor.set_task(task, callback_func, arg);
}

Executor 的做法:
f = executor.submit(task)    // 把 arg 放在 task 里面
arg = f.get()            // block 住,task 结束返回,取得 arg

把异步的行为,弄成看起来同步,仅此而已。

--------------------- 使用的例子 --------------------
import java.util.concurrent.Future;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

class MyFooCall implements Callable<String> {
    final String arg;
    public MyFooCall(String arg) {
        this.arg = arg;
    }
    public String call() {
        System.out.println("MyCall.call(), arg = " + this.arg);
        return this.arg;
    }
}

class MyHello {
    public static void main(String[] args) {
        try {
            final ExecutorService pool = Executors.newFixedThreadPool(4);

            Future<String> f = pool.submit(new MyFooCall("I'm arg!"));
            System.out.println("done: " + f.isDone());

            String arg = f.get();
            System.out.println("done! arg = " + arg);

            pool.shutdown();
        } catch (Exception e) {
        } finally {
        }
    }
}

------------------- ScheduledExecutorService ----------------------
ScheduledExecutorService 在 ExecutorService 基础上,加强了控制力度,可以设定“多久后执行"or"每间隔多久执行"这样的行为。

schedule(Runnable command, long delay, TimeUnit unit)
过了一段时间后,执行一次。

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
initialDelay 之后开始执行,以后每 period 执行一次

下面是 jdk doc 里面的例子。
----------------------
import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
public void beepForAnHour() {
final Runnable beeper = new Runnable() {
public void run() { System.out.println("beep"); }
};
final ScheduledFuture<?> beeperHandle =
scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);
scheduler.schedule(new Runnable() {
public void run() { beeperHandle.cancel(true); }
}, 60 * 60, SECONDS);
}
}

------------------- AbstractExecutorService ------------------
ExecutorService 的实现,其中的 protected newTaskFor() 为 submit() 创建内部需要的 TaskObj。
因为 Executor.execute() 需要的是 Runnable, 而 ExecutorService.submit() 等的返回值为 Future,
所以 RunnableFuture 应运而生。FutureTask 就作为 Runnable Future 的标准实现。

Future<V>
|
RunnableFuture<V>  <-- FutureTask<V>

RunnableFuture 结合了 Runnable, Future<V> 的两大功能,Runnable 用于定义 task 内容本身,而 Future
则控制了任务何时结束,以及 callback 返回值。
AbstractExecutorService 还只是abstract class,主要定义了 invokeAll(), submit() 等行为。而 execute() 的行为,则交给子类 ThreadPoolExecutor 去实现。
  评论这张
 
阅读(2374)| 评论(0)

历史上的今天

评论

<#--最新日志,群博日志--> <#--推荐日志--> <#--引用记录--> <#--博主推荐--> <#--随机阅读--> <#--首页推荐--> <#--历史上的今天--> <#--被推荐日志--> <#--上一篇,下一篇--> <#-- 热度 --> <#-- 网易新闻广告 --> <#--右边模块结构--> <#--评论模块结构--> <#--引用模块结构--> <#--博主发起的投票-->
 
 
 
 
 
 
 
 
 
 
 
 
 
 

页脚

网易公司版权所有 ©1997-2018