类图:定义了一些重要的接口和实现类
线程池的几种状态:
ThreadPoolExecutor构造方法
1.救急线程
线程池中会有核心线程和救急线程;救急线程数=最大线程数-核心线程数。而救急线程会在阻塞队列已经占满的情况下,执行下一个即将要被拒绝策略执行任务。且救急线程在执行完任务后的KeepAliveTime时间内,如果没有执行新的任务,那么就会从线程池remove这个线程。不同于核心线程是一直存在于线程池中的。
救急线程是懒惰创建的,只有当有界阻塞队列满的时候才会创建救急线程执行任务。如果阻塞队列是无界的,那么永远不会创建救急线程。
2.核心线程
会优先使用核心线程来执行任务,且核心线程是懒惰创建。当核心线程执行完任务后,并不会主动结束自己,而是还在运行中。需要特殊处理使之结束。
3.拒绝策略
当救急线程被占用完后再来新的任务会由拒绝策略完成。
4.JDK提供线程池执行过程
4.JDK提供的拒绝策略
NewFixedThreadPool 固定大小线程池
1.介绍
2.代码示例
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j(topic = "TC41")
public class TC41 {public static void main(String[] args) throws InterruptedException{ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {private AtomicInteger threadPoolNumber = new AtomicInteger(1);//使用ThreadFactory自定义线程名@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"Pool_t"+threadPoolNumber.getAndIncrement());}});pool.execute(()->{log.debug("1");});pool.execute(()->{log.debug("2");});pool.execute(()->{log.debug("3");});//result: 且程序不会结束,因为核心线程只是执行完任务,但并没有结束//11:10:02.144 [Pool_t2] DEBUG TC41 - 2//11:10:02.144 [Pool_t1] DEBUG TC41 - 1//11:10:02.147 [Pool_t2] DEBUG TC41 - 3}
}
NewCachedThreadPool 带缓冲的线程池
1.介绍
若每个任务执行时间都很长的话会创建太多线程,消耗CPU影响性能。
2.SynchronousQueue
SynchronousQueue指t1线程在Queue里放任务时放不进去,只能阻塞在外面,当t2线程来取的时候,就可以把任务取走了。意味着:该队列就是为了阻塞一个任务且不用拒绝策略,只等待线程来执行。当SynchronousQueue配合NewCachedThreadPool执行时,就是每当阻塞一个任务就创建一个救急线程。
SynchronousQueue使用代码示例
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.SynchronousQueue;@Slf4j(topic = "TC42")
public class TC42 {public static void main(String[] args) {SynchronousQueue queue = new SynchronousQueue();new Thread(()->{try {log.debug("put 1....");//当执行put()后被阻塞住,当take()执行完后,put()才执行完毕queue.put(1);log.debug("putted 1....");log.debug("put 2....");queue.put(2);log.debug("putted 2....");} catch (InterruptedException e) {e.printStackTrace();}},"t1").start();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}new Thread(()->{log.debug("get 1...");try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}},"t2").start();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}new Thread(()->{log.debug("get 2...");try {queue.take();} catch (InterruptedException e) {e.printStackTrace();}},"t3").start();}
}
NewSingleThreadExecutor 单线程执行器
线程异常后,线程池会重新创建新的线程
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j(topic = "TC43")
public class TC43 {public static void main(String[] args) throws InterruptedException {test1();//result: 线程1抛出异常停止后,线程池又创建了一个线程2去执行后面的codes //14:15:44.489 [pool-1-thread-1] DEBUG TC43 - 1//Exception in thread "pool-1-thread-1" 14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 2//14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 3}public static void test1() throws InterruptedException{ExecutorService service = Executors.newSingleThreadExecutor();service.execute(()->{log.debug("1");int i=1/0;});service.execute(()->{log.debug("2");});service.execute(()->{log.debug("3");});}
}
提交任务
Future<T> Submit
有一个Future类型的返回值
import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4j(topic = "TC44")
public class TC44 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);Future<String> future = pool.submit(()->{log.debug("running");Thread.sleep(1000);return "ok";});log.debug("{}",future.get());}
}
List<Future<T>> invokeAll()
执行集合中所有线程,并返回所有返回值到一个集合里。
import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j(topic = "TC45")
public class TC45 {public static void main(String[] args) throws InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);List<Future<Object>> futures = pool.invokeAll(Arrays.asList(()->{log.debug("begin...");Thread.sleep(1000);return "1";},()->{log.debug("begin...");Thread.sleep(500);return "2";},()->{log.debug("begin...");Thread.sleep(2000);return "3";}));futures.forEach(f->{try {log.debug("{}",f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}
}
Object invokeAny()
集合中哪个线程结束最早返回哪个线程,其余线程停止执行任务。
0.5S后打印结果,因为第二个callable线程结束最早只等待了0.5S.
import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;@Slf4j(topic = "TC46")
public class TC46 {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(3);Object o = pool.invokeAny(Arrays.asList(()->{log.debug("begin 1....");Thread.sleep(1000);log.debug("end 1....");return "1";},()->{log.debug("begin 2....");Thread.sleep(500);log.debug("end 2....");return "2";},()->{log.debug("begin 3....");Thread.sleep(800);log.debug("end 3....");return "3";}));log.debug("{}",o.toString());}
}
关闭线程池
shutdown()
不会等待正在运行的线程,等它们运行结束,就会自己停止
shutdownNow()
其他终结方法