FutureTask简单分析和用法
Callable、Future和FutureTask,结合Executors和ExecutorService,将某些特定的计算任务交给其他线程(由统一的线程池管理),让主线程进行阻塞和等待执行结果,用在RPC服务比较直观,或者这个计算结果被多个线程所需等情况。
从整体上看,使用FutureTask降低了程序整体的效率,肯定不如在原线程中直接执行的快,而且因为主线程需要阻塞等待计算结果,所以并没有增加并发度。所以FutureTask在项目中一般也比较少见。
public class FutureTaskExample { static class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { return 1; } } public static void main(String[] args) { // 第一种方式, 隐藏了FutureTask ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); Future<Integer> future = executor.submit(task); Integer ret = future.get(); // 第二种方式, 直接使用FutureTask ExecutorService executor = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); executor.submit(futureTask); Integer ret = futureTask.get(); // 第三种方式, 直接使用Thread Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); Thread thread = new Thread(futureTask); // FutureTask实现了Runnable接口 thread.start(); futureTask.get(); executor.shutdown(); } } |
FutureTask的一个实用场景
当一个计算任务非常重量级,非常耗资源和耗时间(比如网络请求或者数据库请求),而且多个线程都需要等待这个计算任务的结果,这个时候FutureTask的优势就体现出来了。只要线程同步的去控制FutureTask的run(),然后所有线程都只要等待FutureTask.get()获取结果就可以了,这样子不会每个线程并发性的去多次重复执行计算任务。
很显然的一个场景就是 数据库查询+缓存。比如查询某个table的id=xxx的对象,查询结果在内存缓存起来,在缓存为空的情况下并发第一次的访问可能引起多次SQL请求,使用FutureTask就可以避免这个问题。
public class MemCache<K, V>{ private final ConcurrentHashMap<K, Future<V>> cache; private V getResultFromDB(K k){return v;} public V get(K k){ while(true){ Future<V> f = cache.get(k); if(f == null){ Callable<V> eval = new Callable<V>(){ public V call() throw InterruptedException{ return getResultFromDB(k); } } FutureTask<V> ft = new FutureTask<V>(eval); // 这一行避免可能的执行并发产生的多个不同的futureTask f = cache.putIfAbsent(arg, ft); if(f == null){ f = ft; // 保证执行的一定是第一个futureTask ft.run(); } } try{ return f.get(); } catch (CancellationException e){ // 避免缓存污染(一个计算被取消或者异常,未来获取不到值),则要从缓存中移除 cache.remove(k, f); } catch (ExecutionException e){ throw e; } } } } |
FutureTask源码分析
也是类似一把共享锁,所有get操作都是试图进行acquireShared加锁,当然在run执行完毕之前,会一直加锁失败并阻塞;run中执行callable.call()结束,设置计算结果值V后,进行releaseShared解锁后,所有get操作被唤醒并拿到结果值V。如果执行中途执行cancel,也会进行releaseShared解锁,所有get操作会拿到null结果值。
state值用来保存task的执行状态值,0是初始态,1表示running,2表示ran done,4表示cancel。