Java ThreadPoolExecutor,Callable,Future,FutureTask 详解

目 录

一、ThreadPoolExecutor类讲解

1、线程池状态

五种状态

2、ThreadPoolExecutor构造函数

2.1)线程池工作原理

2.2)KeepAliveTime

2.3)workQueue 任务队列

2.4)threadFactory

2.5)handler 拒绝策略

3、常用方法:

二、线程池相关接口介绍

1、ExecutorService接口:

1.1)submit方法示例:

1.2)ExecutionException

1.3)submit()和execute()方法区别

1.4)ScheduledExecutorService接口

2、Callable接口

3、Future接口

3.1)Future接口方法

3.2)FutureTask类

4、FutureTask,Runable,Future关系​编辑

 三 、AbstractExecutorService介绍

1、submit方法

2、Executors callable 方法

3、invokeAll 方法

4、invokeAny方法

参考文献:​


一、ThreadPoolExecutor类讲解


1、线程池状态


五种状态

线程池 的状态

说明

RUNNING

允许提交并处理任务

SHUTDOWN

不允许提交新的任务,但是会处理完已提交的任务

STOP

不允许提交新的任务,也不会处理阻塞队列中未执行的任务,

并设置正在执行的线程的中断标志位

TIDYING

所有任务执行完毕,池中工作的线程数为0,等待执行terminated()勾子方法

TERMINATED

terminated()勾子方法执行完毕

线程池的shutdown() 方法,将线程池由 RUNNING(运行状态)转换为 SHUTDOWN状态
线程池的shutdownNow()方法,将线程池由RUNNING 或 SHUTDOWN 状态转换为 STOP 状态。
注:SHUTDOWN 状态 和 STOP 状态 先会转变为 TIDYING 状态,最终都会变为 TERMINATED

2、ThreadPoolExecutor构造函数


ThreadPoolExecutor继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。

接下来我们分别讲解这些参数的含义。

2.1)线程池工作原理

corePoolSize :线程池中核心线程数的最大值
maximumPoolSize :线程池中能拥有最多线程数
workQueue:用于缓存任务的阻塞队列
当调用线程池execute() 方法添加一个任务时,线程池会做如下判断:

如果有空闲线程,则直接执行该任务;
如果没有空闲线程,且当前运行的线程数少于corePoolSize,则创建新的线程执行该任务;
如果没有空闲线程,且当前的线程数等于corePoolSize,同时阻塞队列未满,则将任务入队列,而不添加新的线程;
如果没有空闲线程,且阻塞队列已满,同时池中的线程数小于maximumPoolSize ,则创建新的线程执行任务;
如果没有空闲线程,且阻塞队列已满,同时池中的线程数等于maximumPoolSize ,则根据构造函数中的 handler 指定的策略来拒绝新的任务。


2.2)KeepAliveTime

keepAliveTime :表示空闲线程的存活时间
TimeUnit unit :表示keepAliveTime的单位
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

注:如果线程池设置了allowCoreThreadTimeout参数为true(默认false),那么当空闲线程超过keepaliveTime后直接停掉。(不会判断线程数是否大于corePoolSize)即:最终线程数会变为0。

2.3)workQueue 任务队列

workQueue :它决定了缓存任务的排队策略
ThreadPoolExecutor线程池推荐了三种等待队列,它们是:SynchronousQueue 、LinkedBlockingQueue 和 ArrayBlockingQueue。

1)有界队列:

SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于 阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。
ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
2)无界队列:

LinkedBlockingQueue:基于链表结构的无界阻塞队列,它可以指定容量也可以不指定容量(实际上任何无限容量的队列/栈都是有容量的,这个容量就是Integer.MAX_VALUE)
PriorityBlockingQueue:是一个按照优先级进行内部元素排序的无界阻塞队列。队列中的元素必须实现 Comparable 接口,这样才能通过实现compareTo()方法进行排序。优先级最高的元素将始终排在队列的头部;PriorityBlockingQueue 不会保证优先级一样的元素的排序。
注意:keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义。

2.4)threadFactory

threadFactory :指定创建线程的工厂。(可以不指定)
如果不指定线程工厂时,ThreadPoolExecutor 会使用ThreadPoolExecutor.defaultThreadFactory 创建线程。默认工厂创建的线程:同属于相同的线程组,具有同为 Thread.NORM_PRIORITY 的优先级,以及名为 “pool-XXX-thread-” 的线程名(XXX为创建线程时顺序序号),且创建的线程都是非守护进程。

2.5)handler 拒绝策略

handler :表示当 workQueue 已满,且池中的线程数达到 maximumPoolSize 时,线程池拒绝添加新任务时采取的策略。(可以不指定)
策略

ThreadPoolExecutor.AbortPolicy()

抛出RejectedExecutionException异常。默认策略

ThreadPoolExecutor.CallerRunsPolicy()

由向线程池提交任务的线程来执行该任务

ThreadPoolExecutor.DiscardPolicy()

抛弃当前的任务

ThreadPoolExecutor.DiscardOldestPolicy()

抛弃最旧的任务(最先提交而没有得到执行的任务)

最科学的的还是 AbortPolicy 提供的处理方式:抛出异常,由开发人员进行处理。

3、常用方法:


除了在创建线程池时指定上述参数的值外,还可在线程池创建以后通过如下方法进行设置。

此外,还有一些方法:

getCorePoolSize():返回线程池的核心线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
getMaximumPoolSize():返回线程池的最大线程数,这个值是一直不变的,返回在构造函数中设置的coreSize大小;
getLargestPoolSize():记录了曾经出现的最大线程个数(水位线);
getPoolSize():线程池中当前线程的数量;
getActiveCount():Returns the approximate(近似) number of threads that are actively executing tasks;
prestartAllCoreThreads():会启动所有核心线程,无论是否有待执行的任务,线程池都会创建新的线程,直到池中线程数量达到 corePoolSize;
prestartCoreThread():会启动一个核心线程(同上);
allowCoreThreadTimeOut(true):允许核心线程在KeepAliveTime时间后,退出;
4、Executors类:
Executors类的底层实现便是ThreadPoolExecutor! Executors 工厂方法有:

Executors.newCachedThreadPool():无界线程池,可以进行自动线程回收
Executors.newFixedThreadPool(int):固定大小线程池
Executors.newSingleThreadExecutor():单个后台线程
它们均为大多数使用场景预定义了设置。不过在阿里java文档中说明,尽量不要用该类创建线程池。

二、线程池相关接口介绍


1、ExecutorService接口:


该接口是真正的线程池接口。上面的ThreadPoolExecutor以及下面的ScheduledThreadPoolExecutor都是该接口的实现类。改接口常用方法:

Future<?> submit(Runnable task):提交Runnable任务到线程池,返回Future对象,由于Runnable没有返回值,也就是说调用Future对象get()方法返回null;
<T> Future<T> submit(Callable<T> task):提交Callable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Callable的返回值;
<T> Future<T> submit(Runnable task,T result):提交Runnable任务到线程池,返回Future对象,调用Future对象get()方法可以获取Runnable的参数值;
invokeAll(collection of tasks)/invokeAll(collection of tasks, long timeout, TimeUnit unit):invokeAll会按照任务集合中的顺序将所有的Future添加到返回的集合中,该方法是一个阻塞的方法。只有当所有的任务都执行完毕时,或者调用线程被中断,又或者超出指定时限时,invokeAll方法才会返回。当invokeAll返回之后每个任务要么返回,要么取消,此时客户端可以调用get/isCancelled来判断具体是什么情况。
invokeAny(collection of tasks)/invokeAny(collection of tasks, long timeout, TimeUnit unit):阻塞的方法,不会返回 Future 对象,而是返回集合中某一个Callable 对象的结果,而且无法保证调用之后返回的结果是哪一个 Callable,如果一个任务运行完毕或者抛出异常,方法会取消其它的 Callable 的执行。和invokeAll区别是只要有一个任务执行完了,就把结果返回,并取消其他未执行完的任务;同样,也带有超时功能;
shutdown():在完成已提交的任务后关闭服务,不再接受新任;
shutdownNow():停止所有正在执行的任务并关闭服务;
isTerminated():测试是否所有任务都执行完毕了;
isShutdown():测试是否该ExecutorService已被关闭。


1.1)submit方法示例:

我们知道,线程池接口中有以下三个主要方法,接下来我们看一下具体示例:

1)Callable:

public static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 50, 300, TimeUnit.SECONDS, 
            new ArrayBlockingQueue<Runnable>(50),  
            new ThreadFactory(){ public Thread newThread(Runnable r) {
                return new Thread(r, "schema_task_pool_" + r.hashCode());
            }}, new ThreadPoolExecutor.DiscardOldestPolicy());
 
public static void callableTest() {
    int a = 1;
    //callable
    Future<Boolean> future = threadPool.submit(new Callable<Boolean>(){
        @Override
        public Boolean call() throws Exception {
            int b = a + 100;
            System.out.println(b);
            return true;
        }
    });
    try {
        System.out.println("feature.get");
        Boolean boolean1 = future.get();
        System.out.println(boolean1);
    } catch (InterruptedException e) {
        System.out.println("InterruptedException...");
        e.printStackTrace();
    } catch (ExecutionException e) {
        System.out.println("execute exception...");
        e.printStackTrace();
    } 
}
2)Runnable:

public static void runnableTest() {
    int a = 1;
    //runnable
    Future<?> future1 = threadPool.submit(new Runnable(){
        @Override
        public void run() {
            int b = a + 100;
            System.out.println(b);
        }
    });
    try {
        System.out.println("feature.get");
        Object x = future1.get(900,TimeUnit.MILLISECONDS);
        System.out.println(x);//null
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        System.out.println("execute exception...");
        e.printStackTrace();
    } catch (TimeoutException e) {
        e.printStackTrace();
    }
}

3)Runnable+result:

class RunnableTask implements Runnable {
    Person p;
    RunnableTask(Person p) {
        this.p = p;
    }
 
    @Override
    public void run() {
        p.setId(1);
        p.setName("Runnable Task...");
    }
}
class Person {
    private Integer id;
    private String name;
    
    public Person(Integer id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    public Integer getId() {
        return id;
    }
    public void setId(Integer id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    @Override
    public String toString() {
        return "Person [id=" + id + ", name=" + name + "]";
    }
}
 
public static void runnableTest2() {
    //runnable + result
    Person p = new Person(0,"person");
    Future<Person> future2 = threadPool.submit(new RunnableTask(p),p);
    try {
        System.out.println("feature.get");
        Person person = future2.get();
        System.out.println(person);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}


1.2)ExecutionException

线程池执行时,Callable的call方法(Runnable的run方法)抛出异常后,会出现什么?

在上面的例子中我们可以看到,线程池无论是执行Callable还是Runnable,调用返回的Future对象get()方法时需要处理两种异常(如果是调用get(timeout)方法,需要处理三种异常),如下:

//在线程池上运行
Future<Object> future = threadPool.submit(callable);
try {
    System.out.println("feature.get");
    Object x = future.get(900,TimeUnit.MILLISECONDS);
    System.out.println(x);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    System.out.println("execute exception...");
    e.printStackTrace();
} catch (TimeoutException e) {
    e.printStackTrace();
}
如果get方法被打断,进入InterruptedException异常;
如果线程执行过程(call、run方法)中抛出异常,进入ExecutionException异常;
如果get方法超时,进入TimeoutException异常;


1.3)submit()和execute()方法区别

ExecutorService、ScheduledExecutorService接口的submit()和execute()方法都是把任务提交到线程池中,但二者的区别是

接收的参数不一样,execute只能接收Runnable类型、submit可以接收Runnable和Callable两种类型;
submit有返回值,而execute没有返回值;submit方便Exception处理;
1)submit方法内部实现:

其实submit方法也没有什么神秘的,就是将我们的任务封装成了RunnableFuture接口(继承了Runnable、Future接口),再调用execute方法,我们看源码:

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);  //转成 RunnableFuture,传的result是null
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

2)newTaskFor方法内部实现:

newTaskFor方法是new了一个FutureTask返回,所以三个方法其实都是把task转成FutureTask,如果task是Callable,就直接赋值,如果是Runnable 就转为Callable再赋值。

当submit参数是Callable 时:

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;      
    }
当submit参数是Runnable时:

   // 按顺序看,层层调用
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);  //转 runnable 为 callable 
        this.state = NEW; 
    }
   // 以下为Executors中的方法
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {  //适配器
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {   
            task.run();
            return result;
        }
    }

看了源码就揭开了神秘面纱了,就是因为Future需要返回结果,所以内部task必须是Callable,如果task是Runnable 就偷天换日,在Runnable 外面包个Callable马甲,返回的结果在构造时就写好。

参考:搞懂Runnable、Callable、Future、FutureTask 及应用_赶路人儿的博客-CSDN博客

1.4)ScheduledExecutorService接口

继承ExecutorService,并且提供了按时间安排执行任务的功能,它提供的方法主要有:

schedule(task, initDelay): 安排所提交的Callable或Runnable任务在initDelay指定的时间后执行;
scheduleAtFixedRate():安排所提交的Runnable任务按指定的间隔重复执行;
scheduleWithFixedDelay():安排所提交的Runnable任务在每次执行完后,等待delay所指定的时间后重复执行;
注:该接口的实现类是ScheduledThreadPoolExecutor。

2、Callable接口


jdk1.5以后创建线程可以通过一下方式:

继承Thread类,实现void run()方法;
实现Runnable接口,实现void run()方法;
实现Callable接口,实现V call() Throws Exception方法
1)Callable和Runnale接口区别:

Callable可以抛出异常,和Future、FutureTask配合可以用来获取异步执行的结果;
Runnable没有返回结果,异常只能内部消化;
2)执行Callable的线程的方法可以通过以下两种方式:

借助FutureTask,使用Thread的start方法来执行;
加入到线程池中,使用线程池的execute或submit执行;
注:Callable无法直接使用Thread来执行;

我们都知道,Callable带有返回值的,如果我们不需要返回值,却又想用Callable该如何做?

jdk中有个Void类型(大写V),但必须也要return null。

threadpool.submit(new Callable<Void>() {
    @Override
    public Void call() {
        //...
        return null;
    }
});
3)通过Executors工具类可以把Runnable接口转换成Callable接口:

Executors中的callable方法可以将Runnable转成Callable,如下:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
}
RunnableAdapter类在上面已经看过源码,原理就是将返回值result作为成员变量,通过参数传递进去,进而实现了Runnable可以返回值。

示例:

public static void test5() {
        Person p = new Person(0,"person");
        RunnableTask runnableTask = new RunnableTask(p);//创建runnable
        Callable<Person> callable = Executors.callable(runnableTask,p);//转换
        Future<Person> future1 = threadPool.submit(callable);//在线程池上执行Callable
        try {
            Person person = future1.get();
            System.out.println(person);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        
        Runnable runnable = new Runnable() {//创建Runnable
            @Override
            public void run() {
                
            }
        };
        Callable<Object> callable2 = Executors.callable(runnable);//转换
        Future<Object> future2 = threadPool.submit(callable2);//在线程池上执行Callable
        try {
            Object o = future2.get();
            System.out.println(o);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }


3、Future接口


3.1)Future接口方法

Future是用来获取异步计算结果的接口,常用方法:

boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled() 的后续调用将始终返回 true。
boolean isCancelled():如果在任务正常完成前将其取消,则返回 true。
boolean isDone():如果任务已完成,则返回 true,可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。
V get()throws InterruptedException,ExecutionException:获取异步结果,此方法会一直阻塞等到计算完成;
V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException,TimeoutException:获取异步结果,此方法会在指定时间内一直阻塞等到计算完成,超时后会抛出超时异常。
通过方法分析我们也知道实际上Future提供了3种功能:

能够中断执行中的任务;
判断任务是否执行完成;
获取任务执行完成后额结果。
但是Future只是一个接口,我们无法直接创建对象,因此就需要其实现类FutureTask登场啦。

3.2)FutureTask类

1)FutureTask类的实现:

public class FutureTask<V> implements RunnableFuture<V> {
//...
}
 
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
FutureTask实现了Runnable、Future两个接口。由于FutureTask实现了Runnable,因此它既可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行。并且还可以直接通过get()函数获取执行结果,该函数会阻塞,直到结果返回。因此FutureTask既是Future、Runnable,又是包装了Callable( 如果是Runnable最终也会被转换为Callable ), 它是这两者的合体。

2)FutureTask的构造函数:

public FutureTask(Callable<V> callable) {
 
}
 
public FutureTask(Runnable runnable, V result) {
 
}
3.3)示例:(FutureTask两种构造函数、以及在Thread和线程池上运行)

1)FutureTask包装过的Callable在Thread、线程池上执行:

public static void test3() {
        int a = 1,b = 2;
        Callable<Integer> callable = new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return a + b;
            }
        };
        //通过futureTask来执行Callable
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        
        //1.使用Thread执行线程
        new Thread(futureTask).start();
        try {
            Integer integer = futureTask.get();
            System.out.println(integer);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        
        //2.使用线程池执行线程
        Executors.newFixedThreadPool(1).submit(futureTask);
        threadPool.shutdown();
        try {
            Integer integer = futureTask.get();
            System.out.println(integer);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } 
    }
2)FutureTask包装过的Runnable在Thread、线程池上执行:

public static void test4() {
        Person p = new Person(0,"person");
        RunnableTask runnableTask = new RunnableTask(p);
        
        //创建futureTask来执行Runnable
        FutureTask<Person> futureTask = new FutureTask<>(runnableTask,p);
        
        //1.使用Thread执行线程
        new Thread(futureTask).start();
        try {
            Person x = futureTask.get();
            System.out.println(x);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } 
        
        //2.使用线程池执行线程
        threadPool.submit(futureTask);
        threadPool.shutdown();
        try {
            Person y = futureTask.get();
            System.out.println(y);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
Person、RunnableTask类同上面的示例中。

4、FutureTask,Runable,Future关系
在这里插入图片描述

 三 、AbstractExecutorService介绍

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现。这些方法包括submit,invokeAny和InvokeAll。

注意的是来自Executor接口的execute方法在AbstractExecutorService中未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略。这个在分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来。

1、submit方法

首先来看submit方法,它的基本逻辑是这样的:

1). 生成一个任务类型和Future接口的包装接口RunnableFuture的对象

2). 执行任务

3). 返回future。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
 
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
因为submit支持Callable和Runnable两种类型的任务,因此newTaskFor方法有两个重载方法:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }
 
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
Callable和Runnable的区别在于前者带返回值,也就是说Callable=Runnable+返回值。因此java中提供了一种adapter,把Runnable+返回值转换成Callable类型。这点可以在newTaskFor中的FutureTask类型的构造函数的代码中看到:

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable);
    }
 
    public FutureTask(Runnable runnable, V result) {
        sync = new Sync(Executors.callable(runnable, result));
    }

2、Executors callable 方法

以下是Executors.callable方法的代码:

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
那么RunnableAdapter的代码就很好理解了,它是一个Callable的实现,call方法的实现就是执行Runnable的run方法,然后返回那个value。

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

3、invokeAll 方法

接下来先说说较为简单的invokeAll:

1). 为每个task调用newTaskFor方法生成得到一个既是Task也是Future的包装类对象的List

2). 循环调用execute执行每个任务

3). 再次循环调用每个Future的get方法等待每个task执行完成

4). 最后返回Future的list。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
        if (tasks == null || unit == null)
            throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            // 为每个task生成包装对象
            for (Callable<T> t : tasks)
                futures.add(newTaskFor(t));
 
            long lastTime = System.nanoTime();
 
            // 循环调用execute执行每个方法
            // 这里因为设置了超时时间,所以每次执行完成后
            // 检查是否超时,超时了就直接返回future集合
            Iterator<Future<T>> it = futures.iterator();
            while (it.hasNext()) {
                execute((Runnable)(it.next()));
                long now = System.nanoTime();
                nanos -= now - lastTime;
                lastTime = now;
                if (nanos <= 0)
                    return futures;
            }
 
            // 等待每个任务执行完成
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    if (nanos <= 0)
                        return futures;
                    try {
                        f.get(nanos, TimeUnit.NANOSECONDS);
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    } catch (TimeoutException toe) {
                        return futures;
                    }
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

4、invokeAny方法

最后说说invokeAny,它的难点在于只要一个任务执行成功就要返回,并且会取消其他任务,也就是说重点在于找到第一个执行成功的任务。

这里我想到了BlockingQueue,当所有的任务被提交后,任务执行返回的Future会被依次添加到一个BlockingQueue中,然后找到第一个执行成功任务的方法就是从BlockingQueue取出第一个元素,这个就是doInvokeAny方法用到的ExecutorCompletionService的基本原理。

因为两个invokeAny方法都是调用doInvokeAny方法,下面是doInvokeAny的代码分析:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        // ExecutorCompletionService负责执行任务,后面调用用poll返回第一个执行结果
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);
        // 这里出于效率的考虑,每次提交一个任务之后,就检查一下有没有执行完成的任务
 
        try {
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();
 
            // 先提交一个任务
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;
 
            for (;;) {
                // 尝试获取有没有执行结果(这个结果是立刻返回的)
                Future<T> f = ecs.poll();
                // 没有执行结果
                if (f == null) {
                    // 如果还有任务没有被提交执行的,就再提交一个任务
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    // 没有任务在执行了,而且没有拿到一个成功的结果。
                    else if (active == 0)
                        break;
                    // 如果设置了超时情况
                    else if (timed) {
                        // 等待执行结果直到有结果或者超时
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        // 这里的更新不可少,因为这个Future可能是执行失败的情况,那么还需要再次等待下一个结果,超时的设置还是需要用到。
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    // 没有设置超时,并且所有任务都被提交了,则一直等到第一个执行结果出来
                    else
                        f = ecs.take();
                }
                // 有返回结果了,尝试从future中获取结果,如果失败了,那么需要接着等待下一个执行结果
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }
 
            // ExecutorCompletionService执行时发生错误返回了全是null的future
            if (ee == null)
                ee = new ExecutionException();
            throw ee;
 
        } finally {
            // 尝试取消所有的任务(对于已经完成的任务没有影响)
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }

 

参考文献:​


​https://blog.csdn.net/liuxiao723846/article/details/108026782
https://blog.csdn.net/xinruyulu/article/details/64453449
https://juejin.cn/post/6844903672736907272
https://blog.csdn.net/wszhongguolujun/article/details/89708668

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/76788.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JMeter】 使用Synchronizing Timer设置请求集合点,实现绝对并发

目录 布局设置说明 Number of Simulated Users to Group Timeout in milliseconds 使用时需要注意的点 集合点作用域 实际运行 资料获取方法 布局设置说明 参数说明&#xff1a; Number of Simulated Users to Group 每次释放的线程数量。如果设置为0&#xff0c;等同…

【css】使用float实现水平导航栏

该实例使用float 浮动实现元素浮动在水平方向&#xff0c;从而实现水平导航栏效果。 overflow: hidden&#xff1a;当不给父级元素设置高度的时候&#xff0c;其内部元素浮动后会导致下面的元素顶上去&#xff0c;这是因为子元素浮动后&#xff0c;子元素脱离标准流&#xff0…

深度学习——注意力机制、自注意力机制

什么是注意力机制&#xff1f; 1.注意力机制的概念&#xff1a; 我们在听到一句话的时候&#xff0c;会不自觉的捕获关键信息&#xff0c;这种能力叫做注意力。 比如&#xff1a;“我吃了100个包子” 有的人会注意“我”&#xff0c;有的人会注意“100个”。 那么对于机器来说…

C语言:相交链表

Lei宝啊&#xff1a;个人主页 愿美好与我们不期而遇 题目&#xff1a; 描述 给你两个单链表的头节点 headA和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 接口 struct ListNode *getIntersectionNode (str…

与“云”共舞,联想凌拓的新科技与新突破

伴随着数字经济的高速发展&#xff0c;IT信息技术在数字中国建设中起到的驱动和支撑作用也愈发凸显。特别是2023年人工智能和ChatGPT在全球的持续火爆&#xff0c;更是为整个IT产业注入了澎湃动力。那么面对日新月异的IT信息技术&#xff0c;再结合疫情之后截然不同的经济环境和…

springboot+vue网红酒店客房预定系统的设计与实现_ui9bt

随着计算机技术发展&#xff0c;计算机系统的应用已延伸到社会的各个领域&#xff0c;大量基于网络的广泛应用给生活带来了十分的便利。所以把网红酒店预定管理与现在网络相结合&#xff0c;利用计算机搭建网红酒店预定系统&#xff0c;实现网红酒店预定的信息化。则对于进一步…

当你软件测试遇上加密接口,是不是就不能测了?

相信大家在工作中做接口测试的时候&#xff0c;肯定会遇到一个场景&#xff0c;那就是你们的软件&#xff0c;密码是加密存储的。 那么这样的话&#xff0c;我们在执行接口的时候&#xff0c;对于密码的处理就开始头疼了。 所以&#xff0c;本文将使用jmeter这款java开源的接…

Pytorch Tutorial【Chapter 3. Simple Neural Network】

Pytorch Tutorial【Chapter 3. Simple Neural Network】 文章目录 Pytorch Tutorial【Chapter 3. Simple Neural Network】Chapter 3. Simple Neural Network3.1 Train Neural Network Procedure训练神经网络流程3.2 Build Neural Network Procedure 搭建神经网络3.3 Use Loss …

海外应用商店优化实用指南之关键词

和SEO一样&#xff0c;关键词是ASO中的一个重要因素。就像应用程序标题一样&#xff0c;在Apple App Store和Google Play中处理应用程序关键字的方式也有所不同。 关键词研究。 对于Apple&#xff0c;我们的所有关键词只能获得100个字符&#xff0c;Google Play没有特定的关键…

【新版系统架构补充】-传输介质、子网划分

传输介质 双绞线&#xff1a;无屏蔽双绞线UTP和屏蔽双绞线STP&#xff0c;传输距离在100m内 网线安装标准&#xff1a; 光纤&#xff1a;由纤芯和包层组成&#xff0c;分多模光纤MMF、单模光纤SMF 无线信道&#xff1a;分为无线电波和红外光波 通信方式和交换方式 单工…

做测试8年,33岁前只想追求大厂高薪,今年只求稳定收入

疫情3年&#xff0c;每一个行业的危机&#xff0c;每一个企业的倒下&#xff0c;背后都是无数人的降薪、降职和失业。这也暴露了人生的残酷真相&#xff1a;人活一辈子&#xff0c;总有“丰年”和“荒年” 优秀的测试既过得了丰年&#xff0c;也受得住荒年 一个测试宝妈&…

数据结构: 线性表(带头双向循环链表实现)

之前一章学习了单链表的相关操作, 但是单链表的限制却很多, 比如不能倒序扫描链表, 解决方法是在数据结构上附加一个域, 使它包含指向前一个单元的指针即可. 那么怎么定义数据结构呢? 首先我们先了解以下链表的分类 1. 链表的分类 链表的结构非常多样, 以下情况组合起来就有…

爬虫---练习源码

选取的是网上对一些球员的评价&#xff0c;来评选谁更加伟大一点 import csv import requests import re import timedef main(page):url fhttps://tieba.baidu.com/p/7882177660?pn{page}headers {User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/53…

AMEYA360:瑞萨电子MCU和MPU产品线将支持Microsoft Visual Studio Code

全球半导体解决方案供应商瑞萨电子宣布其客户现可以使用Microsoft Visual Studio Code&#xff08;VS Code&#xff09;开发瑞萨全系列微控制器&#xff08;MCU&#xff09;和微处理器&#xff08;MPU&#xff09;。瑞萨已为其所有嵌入式处理器开发了工具扩展&#xff0c;并将其…

分布式开源监控Zabbix实战

Zabbix作为一个分布式开源监控软件&#xff0c;在传统的监控领域有着先天的优势&#xff0c;具备灵活的数据采集、自定义的告警策略、丰富的图表展示以及高可用性和扩展性。本文简要介绍Zabbix的特性、整体架构和工作流程&#xff0c;以及安装部署的过程&#xff0c;并结合实战…

分布式异步任务处理组件(七)

分布式异步任务处理组件底层网络通信模型的设计--如图&#xff1a; 使用Java原生NIO来实现TCP通信模型普通节点维护一个网络IO线程&#xff0c;负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写&#xff0c;上层会有另一个线程负…

Linux下安装VMware虚拟机

目录 1. 简介 2. 工具/原料 2.1. 下载VMware 2.2. 安装 1. 简介 ​ VMware Workstation&#xff08;中文名“威睿工作站”&#xff09;是一款功能强大的桌面虚拟计算机软件&#xff0c;提供用户可在单一的桌面上同时运行不同的操作系统&#xff0c;和进行开发、测试 …

为什么list.sort()比Stream().sorted()更快?

真的更好吗&#xff1f; 先简单写个demo List<Integer> userList new ArrayList<>();Random rand new Random();for (int i 0; i < 10000 ; i) {userList.add(rand.nextInt(1000));}List<Integer> userList2 new ArrayList<>();userList2.add…

从零开始:手把手搭建 RocketMQ 单节点、集群节点实例

&#x1f52d; 嗨&#xff0c;您好 &#x1f44b; 我是 vnjohn&#xff0c;在互联网企业担任 Java 开发&#xff0c;CSDN 优质创作者 &#x1f4d6; 推荐专栏&#xff1a;Spring、MySQL、Nacos、Java&#xff0c;后续其他专栏会持续优化更新迭代 &#x1f332;文章所在专栏&…