FutureTask简单分析和用法

6月 21st, 2016 9,028 留下评论 阅读评论

Callable、Future和FutureTask,结合Executors和ExecutorService,将某些特定的计算任务交给其他线程(由统一的线程池管理),让主线程进行阻塞和等待执行结果,用在RPC服务比较直观,或者这个计算结果被多个线程所需等情况。

从整体上看,使用FutureTask降低了程序整体的效率,肯定不如在原线程中直接执行的快,而且因为主线程需要阻塞等待计算结果,所以并没有增加并发度。所以FutureTask在项目中一般也比较少见。

public class FutureTaskExample {
 
    static class Task implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            return 1;
        }
    }
 
    public static void main(String[] args) {
        // 第一种方式, 隐藏了FutureTask
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> future = executor.submit(task);
        Integer ret = future.get();
 
        // 第二种方式, 直接使用FutureTask
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        Integer ret = futureTask.get();
 
        // 第三种方式, 直接使用Thread
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask); // FutureTask实现了Runnable接口
        thread.start();
        futureTask.get();
 
        executor.shutdown();
    }
}

FutureTask的一个实用场景

当一个计算任务非常重量级,非常耗资源和耗时间(比如网络请求或者数据库请求),而且多个线程都需要等待这个计算任务的结果,这个时候FutureTask的优势就体现出来了。只要线程同步的去控制FutureTask的run(),然后所有线程都只要等待FutureTask.get()获取结果就可以了,这样子不会每个线程并发性的去多次重复执行计算任务。

很显然的一个场景就是 数据库查询+缓存。比如查询某个table的id=xxx的对象,查询结果在内存缓存起来,在缓存为空的情况下并发第一次的访问可能引起多次SQL请求,使用FutureTask就可以避免这个问题。

public class MemCache<K, V>{
    private final ConcurrentHashMap<K, Future<V>> cache;
 
    private V getResultFromDB(K k){return v;}
 
    public V get(K k){
        while(true){
            Future<V> f = cache.get(k);
            if(f == null){
                Callable<V> eval = new Callable<V>(){
                    public V call() throw InterruptedException{
                        return getResultFromDB(k);
                    }
                }
                FutureTask<V> ft = new FutureTask<V>(eval);
                // 这一行避免可能的执行并发产生的多个不同的futureTask
                f = cache.putIfAbsent(arg, ft);
                if(f == null){
                    f = ft;
                    // 保证执行的一定是第一个futureTask
                    ft.run();
                }
            }
            try{
                return f.get();
            } catch (CancellationException e){
                // 避免缓存污染(一个计算被取消或者异常,未来获取不到值),则要从缓存中移除
                cache.remove(k, f);
            } catch (ExecutionException e){
                throw e;
            }
        }
    }
}

FutureTask源码分析

也是类似一把共享锁,所有get操作都是试图进行acquireShared加锁,当然在run执行完毕之前,会一直加锁失败并阻塞;run中执行callable.call()结束,设置计算结果值V后,进行releaseShared解锁后,所有get操作被唤醒并拿到结果值V。如果执行中途执行cancel,也会进行releaseShared解锁,所有get操作会拿到null结果值。

state值用来保存task的执行状态值,0是初始态,1表示running,2表示ran done,4表示cancel。

Categories: Java 标签:
  1. 还没有评论呢。