一些辅助类
减少计数CountDownLatch
设置一个计数器,通过countDown方法进行减1操作,使用await方法等待计数器不大于0,继续执行await方法之后的语句。
-
当一个或多个线程调用
await
方法时,这些线程会阻塞 -
其他线程调用
countDown
方法会将计数器减1 -
当计数器的值变为0,因
await
方法阻塞的线程会被唤醒,继续执行
public class CountDownLatchDemo {// 6位同学陆续离开教室之后班长锁门public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(6);for (int i = 0; i < 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+" left.");latch.countDown();},String.valueOf(i+1)).start();}// 等待操作,直到计数器变为0,才会执行await之后的语句latch.await();System.out.println("Close the door.");}
}
循环栅栏CyclicBarrier
CyclicBarrier是一个同步辅助类,允许一组线程互相等待,直到达到某个公共屏障点,在涉及一组固定大小的线程的程序中,这些线程必须不时地相互等待。
构造方法的第一个参数是目标障碍数,每次执行一次CyclicBarrier一次障碍数就会+1,如果达到了目标障碍数,才会执行cyclicBarrier.await()
之后的语句
public class CyclicBarrierDemo {public static void main(String[] args) {// 设置固定值以及固定值达到之后的操作CyclicBarrier barrier = new CyclicBarrier(7,()->{System.out.println("Finished!!");});for (int i = 0; i < 5; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+" save.");try {// 每次执行到await都会有一个计数// 如果数量达到固定值,就会执行预先指定的操作// 如果数量没有达到,就一直会处于等待状态barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}},String.valueOf(i)).start();}}
}
信号灯Semaphore
主要就是限制同时执行操作的进程数量
public class SemaphoreDemo {public static void main(String[] args) {// 创建Semaphore,设置许可数量,第二个参数表示是否为公平的Semaphore semaphore = new Semaphore(3);// 模拟6两汽车for (int i = 0; i < 6; i++) {new Thread(()->{try {// 抢占车位semaphore.acquire();System.out.println(Thread.currentThread().getName()+" Got It.");// 设置随机停车时间TimeUnit.SECONDS.sleep(new Random().nextInt(5));System.out.println(Thread.currentThread().getName()+" Leave.");} catch (InterruptedException e) {e.printStackTrace();} finally {// 释放许可semaphore.release();}}, String.valueOf(i)).start();}}
}
ReentrantReadWriteLock读写锁
悲观锁
乐观锁
读锁发生死锁
写锁发生死锁
class MyCache {private volatile HashMap<String, Object> map = new HashMap<>();// 创建读写锁对象private ReadWriteLock rwLock = new ReentrantReadWriteLock();public void put(String key, Object value) {// 添加上写锁rwLock.writeLock().lock();try {System.out.println(Thread.currentThread().getName()+" Write "+key);TimeUnit.MICROSECONDS.sleep(300);map.put(key, value);System.out.println(Thread.currentThread().getName()+"Finished Writing");} catch (InterruptedException e) {e.printStackTrace();} finally {rwLock.writeLock().unlock();}}public Object get(String key) {rwLock.readLock().lock();Object result = null;try {System.out.println(Thread.currentThread().getName()+" Read "+key);TimeUnit.MICROSECONDS.sleep(300);result = map.get(key);System.out.println(Thread.currentThread().getName()+" Finished Reading "+key);} catch (InterruptedException e) {e.printStackTrace();} finally {rwLock.readLock().unlock();}return result;}
}
public class ReadWriteLockDemo {public static void main(String[] args) {MyCache cache = new MyCache();// 创建线程放数据for (int i = 0; i < 5; i++) {final int num = i;new Thread(()->{cache.put(num+"",num+"");},String.valueOf(i)).start();}// 创建线程取数据for (int i = 0; i < 5; i++) {final int num = i;new Thread(()->{cache.get(num+"");},String.valueOf(i)).start();}}
}
如果不加锁,就会产生读锁或者写锁没有释放就会有其他类型的操作线程同时获取资源,非常不安全。
读写锁:一个资源可以被多个读线程访问,或者被一个写线程访问,不能同时存在两个类型的线程。
读写锁的特点:
-
多个读锁:当没有线程持有写锁时,多个读锁可以同时持有,这样可以提高读操作的并发性。
-
独占写锁:写锁是独占的,即在任何时候只能有一个线程持有写锁。如果一个线程持有写锁,其他线程的读锁和写锁请求都会被阻塞。
-
锁降级:持有写锁的线程释放锁后可以获取读锁,这是安全的,因为写锁的释放意味着对共享资源的修改已经完成,并且其他线程可以看到最新的状态。
-
锁升级:持有读锁的线程尝试获取写锁是不安全的,因为如果允许这样做,可能会导致写锁长时间得不到满足,从而降低性能或导致死锁。
读写锁的问题:
-
写饥饿:如果读锁频繁被获取,写锁可能会长时间得不到满足,导致写操作饥饿。
-
锁升级问题:如前所述,锁升级可能导致死锁或性能问题。
-
公平性:
ReentrantReadWriteLock
提供了公平和非公平两种模式。在公平模式下,锁的分配会按照请求的顺序进行,这可能会降低性能。非公平模式下,性能可能更好,但可能导致饥饿现象。 -
死锁:如果不正确地使用读写锁,尤其是在尝试升级读锁到写锁时,可能会导致死锁。
public static void main(String[] args) {ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();//锁降级---可以正常进行writeLock.lock();System.out.println("write");readLock.lock();System.out.println("read");writeLock.unlock();readLock.unlock();// 锁升级---死锁readLock.lock();System.out.println("read");writeLock.lock();System.out.println("write");// 无法执行readLock.unlock();writeLock.unlock();
}
BlockingQueue阻塞队列
阻塞队列,通过一个共享队列,可以使得数据由队列的一端输入,从另外一端输出。
-
ArrayBlockingQueue
(常用):基于数组的阻塞队列实现,维护了一个定长的数据以便缓存队列中的数据对象,保存两个整型变量,标识队列的头部和尾部在数组中的位置。由数组结构组成的有界阻塞队列 -
LinkedBlockingQueue
(常用):基于链表的阻塞队列,维护一个由链表构成的数据缓冲队列。大小默认为Integer.MAX_VALUE
-
DelayQueue
:只有当其指定的延迟时间到了,才能从队列中获取到该元素,是一个没有大小限制的队列。使用优先级队列实现的延迟无界队列。
常用方法:
public static void main(String[] args) throws InterruptedException {// 创建阻塞队列BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);// 会抛出异常的方法System.out.println(blockingQueue.add("a"));// trueSystem.out.println(blockingQueue.add("b"));// trueSystem.out.println(blockingQueue.add("c"));// trueSystem.out.println(blockingQueue.element());// a// System.out.println(blockingQueue.add("d"));// IllegalStateException: Queue fullSystem.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());System.out.println(blockingQueue.remove());// System.out.println(blockingQueue.remove());// NoSuchElementException// 会返回特殊值的方法System.out.println(blockingQueue.offer("a"));// trueSystem.out.println(blockingQueue.offer("b"));// trueSystem.out.println(blockingQueue.offer("c"));// trueSystem.out.println(blockingQueue.offer("d"));// falseSystem.out.println(blockingQueue.poll());// aSystem.out.println(blockingQueue.poll());// bSystem.out.println(blockingQueue.poll());// cSystem.out.println(blockingQueue.poll());// null// 阻塞blockingQueue.put("a");blockingQueue.put("a");blockingQueue.put("a");blockingQueue.put("a");// 阻塞System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());System.out.println(blockingQueue.take());//阻塞}
volatile关键字
在Java中,volatile
是一个关键字,用于修饰变量。它主要有以下几个作用:
-
保证可见性:当一个变量被声明为
volatile
时,它保证了对该变量的写操作对所有线程都是立即可见的。也就是说,当一个线程修改了一个volatile
变量时,新值会立即被写入主内存,而其他线程在访问这个变量时,都会从主内存中读取最新的值。 -
禁止指令重排序:
volatile
关键字可以防止编译器和处理器对读写该变量的指令进行重排序优化。在没有volatile
修饰的情况下,编译器和处理器可能会为了优化性能而改变指令的执行顺序,这可能会导致一些线程安全问题。 -
不保证原子性:尽管
volatile
可以保证变量的可见性和防止指令重排序,但它并不保证复合操作的原子性。例如,对于i++
这样的操作,即使i
是volatile
类型的,也不能保证在多线程环境下的线程安全。
volatile
关键字通常用于以下场景:
-
状态标志:用于表示某个状态的变化,比如一个线程完成工作的标志。
-
双重检查锁定(Double-Checked Locking):在创建对象实例时,用于确保实例只被创建一次。
使用volatile
时需要注意的是,它并不能完全替代synchronized
或Lock
等同步机制,因为volatile
不提供原子性保证,所以在需要原子性操作时,仍然需要使用其他同步机制。
ThreadPool线程池
线程池是一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,避免了在处理短时间任务时创建与销毁线程的代价。保证内核的充分利用,还能防止过度调度。
Java的线程池通过Executor框架实现的,使用到了Executor
,Executors
,ExecutorService
,ThreadPoolExecutor
这些类
使用方式
-
Executors.newFixedThreadPool(int)
一池N线程 -
Executors.newSingleThreadExecutor()
任务一个一个执行,一池一线程 -
Executors.newCachedThreadPool()
线程池根据需要创建线程,可扩容
// 线程池三种常见的分类
public static void main(String[] args) {// 一池N线程// ExecutorService pool = Executors.newFixedThreadPool(5);// try {// // 需要处理10个请求// for (int i = 0; i < 10; i++) {// // 执行// pool.execute(()->{// System.out.println(Thread.currentThread().getName());// });// }// } finally {// // 关闭线程池// pool.close();// }// 一池一线程// try (ExecutorService pool1 = Executors.newSingleThreadExecutor()) {// for (int i = 0; i < 10; i++) {// pool1.execute(() ->// System.out.println(Thread.currentThread().getName())// );// }// }// 可扩容线程try (ExecutorService pool2 = Executors.newCachedThreadPool()) {for (int i = 0; i < 20; i++) {pool2.execute(() -> {System.out.println(Thread.currentThread().getName());});}}}
线程池的底层原理
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
-
int corePoolSize
:核心(常驻)线程数量 -
int maximumPoolSize
:最大线程数量 -
long keepAliveTime
:线程存活时间(多开放的线程关闭) -
TimeUnit unit
:线程存活时间单位 -
BlockingQueue<Runnable> workQueue
:常驻线程队列用完了,只能把任务放入到阻塞队列中 -
ThreadFactory threadFactory
:线程工厂,创建线程 -
RejectedExecutionHandler handler
:拒绝策略
工作流程和拒绝策略
线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,能够更加明确线程池的运行规则
//自定义线程池创建
public class ThreadPoolDemo2 {public static void main(String[] args) {ThreadPoolExecutor executor = new ThreadPoolExecutor(2,5,2L,TimeUnit.SECONDS,new ArrayBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());try {for (int i = 0; i < 10; i++) {executor.execute(()->{System.out.println(Thread.currentThread().getName()+" working");});}} finally {executor.close();}}
}
分支合并框架
Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。
Fork:把一个复杂任务进行分拆
Join:把分拆任务的结果进行合并
CompletableFuture异步回调
public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {// 异步调用 没有返回值CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(()->{System.out.println(Thread.currentThread().getName()+" completableFuture1");});completableFuture1.get();// 异步调用,有返回值CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(()->{System.out.println(Thread.currentThread().getName()+" completableFuture2");int i = 1/0;return 1024;});completableFuture2.whenComplete((a,b)->{System.out.println("---a="+a); // supplyAsync 返回值System.out.println("---b="+b); // supplyAsync 异常}).get();}
}