多进程与多线程的区别:
每个进程拥有自己的一整套变量,线程共享数据。所以线程通信更高效,更轻量、创建册小开销小。
1. 什么是线程
1.1 一个单独线程中运行一个任务的简单过程:
1.将执行任务的代码,放在实现Runnable接口的类的run方法中。由于runnable接口只有一个方法(run方法),即函数式接口,所以可用一个lambda表达式创建实例:
Runnable task1 = () -> {
任务代码
}
2.构造一个Thread对象,并将Runnable传递进去:
var t = new Thread(task1);
3.启动线程:
t.start();
也可以通过建立Thread类的一个子类来定义线程:
class MyThread extends Thread {public void run() {任务代码}
}
new MyThread().start;
不再推荐这种方法,因为应当把要并行运行的任务与运行机制解耦合。
如果有多个任务,为每个任务分别创建一个单独的线程开销太大。实际上,可以使用线程池。
1.2 银行转账
使用两个线程,一个将钱从账户0转移到账户1,一个从2到3。
package threads;/*** @version 1.30 2004-08-01* @author Cay Horstmann*/
public class ThreadTest
{public static final int DELAY = 10;public static final int STEPS = 100;public static final double MAX_AMOUNT = 1000;public static void main(String[] args){var bank = new Bank(4, 100000);Runnable task1 = () ->{try{for (int i = 0; i < STEPS; i++){double amount = MAX_AMOUNT * Math.random();bank.transfer(0, 1, amount);Thread.sleep((int) (DELAY * Math.random()));}}catch (InterruptedException e){}};Runnable task2 = () ->{try{for (int i = 0; i < STEPS; i++){double amount = MAX_AMOUNT * Math.random();bank.transfer(2, 3, amount);Thread.sleep((int) (DELAY * Math.random()));}}catch (InterruptedException e){}};new Thread(task1).start();new Thread(task2).start();}
}
package threads;import java.util.*;/*** A bank with a number of bank accounts.*/
public class Bank
{private final double[] accounts;/*** Constructs the bank.* @param n the number of accounts* @param initialBalance the initial balance for each account*/public Bank(int n, double initialBalance){accounts = new double[n];Arrays.fill(accounts, initialBalance);}/*** Transfers money from one account to another.* @param from the account to transfer from* @param to the account to transfer to* @param amount the amount to transfer*/public void transfer(int from, int to, double amount){if (accounts[from] < amount) return;System.out.print(Thread.currentThread());accounts[from] -= amount;System.out.printf(" %10.2f from %d to %d", amount, from, to);accounts[to] += amount;System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());}/*** Gets the sum of all account balances.* @return the total balance*/public double getTotalBalance(){double sum = 0;for (double a : accounts)sum += a;return sum;}/*** Gets the number of accounts in the bank.* @return the number of accounts*/public int size(){return accounts.length;}
}
2. 线程状态
线程有如下六种状态:
- New(新建)
- Runnable(可运行)
- Blocked(阻塞)
- Waiting(等待)
- Timed Waiting(计时等待)
- Terminated(终止)
2.1New(新建)
new Thread(r)
线程运行前的基础工作准备
2.2Runnable(可运行)
一旦调用start,线程就处于可运行状态。一个可运行的线程可能正在运行也可能没有运行。
线程的调度细节由操作系统提供的系统服务决定。
抢占式调度系统:
每个可运行线程一个时间片。当时间片用完时,操作系统会强制剥夺该线程的运行权 ,并给另一个线程一个机会来运行。当选择下一个线程时,操作系统会考虑线程的优先级。
桌面系统,服务器系统均为抢占式调度。手机等小型设备可能使用协作式调度,一个线程只有调用yield方法或者被阻塞或等待式才失去控制权。
2.3阻塞(Blocked)或等待(Waiting)
非活动状态
阻塞状态与等待状态没有太大区别
- 当一个线程试图获取一个内部的对象锁(Synchronized标记),而这个锁目前被其他线程占用,该线程会被阻塞。
- 当线程等待另一个线程通知调度器出现某个条件时(如账户大于0元时才能往外转账),这个线程会进入等待状态。(Obeject.wait,Thread.join,juc.Lock-Condition)
- 计时等待(sleep)
当一个线程阻塞或等待时,可以调度另一个线程。
当一个线程被重新激活时,调度器会比较它与当前运行线程的优先级。
2.4终止(Terminated)
终止原因:
- 由于run方法正常退出,线程自然终止。
- 因为一个未捕获的异常终止run方法,意外终止。
3.线程属性
3.1 中断线程
除了已经废弃的stop方法,没有办法强制一个线程终止。不过,interrupt方法可以请求终止一个线程。它会设置线程的中断状态,即一个Boolean标志。Java语言并不要求中断的线程应该终止,中断一个线程只是引起对它的注意。
检查中断状态:
while (!Thread.currentThread().isInterrupted() && more work to do) {do more work;
}
如果对中断状态的线程调用sleep或wait,会抛出InterrruptedException
捕获InterrruptedException异常
Runnable r = () -> {try {...while (more work to do) {do more workThread.sleep(delay);}} catch(InterrruptedException e) {// thread was interrupted during sleep}
}
3.2 守护线程
t.setDaemon(true);
//将一个线程设置为守护线程
为其他线程提供服务。如:计时器线程、清空果实缓存项的线程。
只剩下守护线程时,虚拟机就会退出。没有必要继续运行程序了。
3.3 线程名
默认如:Thread-2
var t = new Thread(runnable);
t.setName("Danny");
3.4 未捕获异常的处理器
p578
3.5 线程优先级
t.setPriority(int newPriority);
Thread.MIN_PRIORITY
Thread.MAX_PRIORITY
线程优先级高度依赖与系统,java线程优先级会映射到主机平台的优先级。各平台不一样。不推荐使用线程优先级。
*4. 同步
多线程需要共享存取相同的数据。为了避免多线程破坏共享数据,必须学习同步存取
4.1竞态条件(race condition)
例子:
随机选择从哪个源账户转到哪个目标账户。
上一个例子中只有两个线程,切分别从0到1转账和从2到3转账,不存在共享数据,所以没有冲突。
package javacore.cha12.unsych;/*** This program shows data corruption when multiple threads access a data structure.* @version 1.32 2018-04-10* @author Cay Horstmann*/
public class UnsynchBankTest
{public static final int NACCOUNTS = 100;public static final double INITIAL_BALANCE = 1000;public static final double MAX_AMOUNT = 1000;public static final int DELAY = 10;public static void main(String[] args){var bank = new Bank(NACCOUNTS, INITIAL_BALANCE);for (int i = 0; i < NACCOUNTS; i++){int fromAccount = i;Runnable r = () -> {try{while (true){int toAccount = (int) (bank.size() * Math.random());double amount = MAX_AMOUNT * Math.random();bank.transfer(fromAccount, toAccount, amount);Thread.sleep((int) (DELAY * Math.random()));}}catch (InterruptedException e){} };var t = new Thread(r);t.start();}}
}
package javacore.cha12.unsych;import java.util.*;/*** A bank with a number of bank accounts.*/
public class Bank
{private final double[] accounts;/*** Constructs the bank.* @param n the number of accounts* @param initialBalance the initial balance for each account*/public Bank(int n, double initialBalance){accounts = new double[n];Arrays.fill(accounts, initialBalance);}/*** Transfers money from one account to another.* @param from the account to transfer from* @param to the account to transfer to* @param amount the amount to transfer*/public void transfer(int from, int to, double amount){if (accounts[from] < amount) return;System.out.print(Thread.currentThread());accounts[from] -= amount;System.out.printf(" %10.2f from %d to %d", amount, from, to);accounts[to] += amount;System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());}/*** Gets the sum of all account balances.* @return the total balance*/public double getTotalBalance(){double sum = 0;for (double a : accounts)sum += a;return sum;}/*** Gets the number of accounts in the bank.* @return the number of accounts*/public int size(){return accounts.length;}
}
为什么破坏了共享数据
accounts[to] += amount;
自增命令不是原子操作,由多条指令组成,执行这些这些指令的线程可能在任何一条指令上被操作系统中断。
有两种机制可以防止并发访问一个代码块:synchronized关键字,ReentrantLock类。
4.3锁对象
以下皆为显示对象锁
myLock.lock();//该可重入锁对象持有计数+1
try {临界区
} finally {myLock.unlock();//持有计数-1,如果为零,其他线程可获得锁对象
}
- 任何时刻只有一个线程进入临界区。一旦一个线程锁定了锁对象,任何其他线程都无法通过lock语句。当其他线程调用lock时,它们会暂停,知道第一个线程释放这个锁对象。而当一个线程成功获得锁,他就会贪婪的执行完临界区的所有代码(没有条件对象阻止的情况下)
- 要把unlock操作包在finally子句中,如果抛出异常必须释放锁,不然其他线程将永远阻塞。如果由于抛出异常而绕过临界区代码,共享对象可能处于被破坏的状态。
- 不能使用try-with-resources语句。1.解锁方法名不是close。2.同步使用同一个锁对象,而try()声明一个新变量。
- ReentrantLock,可重入锁。锁有一个持有计数(hold count)来跟踪lock方法的嵌套调用,lock加一,unlock减一,计数为零时释放锁。由一个锁保护的代码可以调用另一个同样使用这个锁的代码。
- ReentrantLock(boolean fair)。构造一个采用公平策略的锁。公平锁倾向与等待时间最长的线程,严重影响性能(排队不等式)。默认非公平。
4.4条件对象
通常,线程进入临界区后却发现只有满足了某个条件之后它才能执行。可以使用一个条件对象(condition object)来管理那些已经获得了一个锁却不能有效工作的线程。
有足够多的资金才能用于转账。注意不能使用下面的代码:
if (bank.getBalance(from) >= amount)bank.transfer(from, to, amout);
在成功通过if判断后,但在调用transfer之前,当前线程可能被中断。虚假唤醒。在这个线程再次运行时,账户余额可能已经低于提款金额了。
一个锁对象可以由一个或多个条件对象。
通过newCondition获得条件对象,取合适的名字反应它表示的条件。如:
class Bank {private Condition sufficientFunds; //条件对象,表示资金充足...public Bank() {...sufficientFunds = bankLock.newCondition();}
}
如果transfer方法返现资金不足,他会调用:
sufficientFunds.await();
当前线程暂停,并放弃锁。
等待获得锁的线程和调用await方法的线程存在本质上的不同。
一旦一个线程调用了await方法,他就进入了这个条件的等待集(wait set)。当锁可用时,该线程并不会变为可运行状态,直到另一线程在同一条件上调用了signalAll方法。
当另一线程完成转账时,它应该调用:
sufficientFunds.signalAll();
package javacore.cha12.synch;import java.util.*;
import java.util.concurrent.locks.*;/*** A bank with a number of bank accounts that uses locks for serializing access.*/
public class Bank
{private final double[] accounts;private Lock bankLock;private Condition sufficientFunds;/*** Constructs the bank.* @param n the number of accounts* @param initialBalance the initial balance for each account*/public Bank(int n, double initialBalance){accounts = new double[n];Arrays.fill(accounts, initialBalance);bankLock = new ReentrantLock();sufficientFunds = bankLock.newCondition();}/*** Transfers money from one account to another.* @param from the account to transfer from* @param to the account to transfer to* @param amount the amount to transfer*/public void transfer(int from, int to, double amount) throws InterruptedException{bankLock.lock();try{//注意是while循环while (accounts[from] < amount)sufficientFunds.await();System.out.print(Thread.currentThread());accounts[from] -= amount;System.out.printf(" %10.2f from %d to %d", amount, from, to);accounts[to] += amount;System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());//只要状态发生改变就应该使用signalAllsufficientFunds.signalAll();}finally{bankLock.unlock();}}/*** Gets the sum of all account balances.* @return the total balance*/public double getTotalBalance(){bankLock.lock();try{double sum = 0;for (double a : accounts)sum += a;return sum;}finally{bankLock.unlock();}}/*** Gets the number of accounts in the bank.* @return the number of accounts*/public int size(){return accounts.length;}
}
package javacore.cha12.synch;/*** This program shows how multiple threads can safely access a data structure.* @version 1.32 2018-04-10* @author Cay Horstmann*/
public class SynchBankTest
{public static final int NACCOUNTS = 100;public static final double INITIAL_BALANCE = 1000;public static final double MAX_AMOUNT = 1000;public static final int DELAY = 10;public static void main(String[] args){var bank = new Bank(NACCOUNTS, INITIAL_BALANCE);for (int i = 0; i < NACCOUNTS; i++){int fromAccount = i;Runnable r = () -> {try{while (true){int toAccount = (int) (bank.size() * Math.random());double amount = MAX_AMOUNT * Math.random();bank.transfer(fromAccount, toAccount, amount);Thread.sleep((int) (DELAY * Math.random()));}}catch (InterruptedException e){} };var t = new Thread(r);t.start();}}
}
4.5synchronized关键字
内部对象锁
将一个方法声明为synchronized
每个对象都有一个锁,通过调用同步方法(synchronized声明的方法)获得这个锁
ReentrantLock:await,signalAll
syschronized:wait,notifyAll
如果时静态方法被声明为synchronized:调用这个方法时,会锁定该方法对应类的.class对象。
package javacore.cha12.synch2;import java.util.*;/*** A bank with a number of bank accounts that uses synchronization primitives.*/
public class Bank
{private final double[] accounts;/*** Constructs the bank.* @param n the number of accounts* @param initialBalance the initial balance for each account*/public Bank(int n, double initialBalance){accounts = new double[n];Arrays.fill(accounts, initialBalance);}/*** Transfers money from one account to another.* @param from the account to transfer from* @param to the account to transfer to* @param amount the amount to transfer*/public synchronized void transfer(int from, int to, double amount) throws InterruptedException{while (accounts[from] < amount)wait();System.out.print(Thread.currentThread());accounts[from] -= amount;System.out.printf(" %10.2f from %d to %d", amount, from, to);accounts[to] += amount;System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());notifyAll();}/*** Gets the sum of all account balances.* @return the total balance*/public synchronized double getTotalBalance(){double sum = 0;for (double a : accounts)sum += a;return sum;}/*** Gets the number of accounts in the bank.* @return the number of accounts*/public int size(){return accounts.length;}
}
package javacore.cha12.synch2;/*** This program shows how multiple threads can safely access a data structure,* using synchronized methods.* @version 1.32 2018-04-10* @author Cay Horstmann*/
public class SynchBankTest2
{public static final int NACCOUNTS = 100;public static final double INITIAL_BALANCE = 1000;public static final double MAX_AMOUNT = 1000;public static final int DELAY = 10;public static void main(String[] args){var bank = new Bank(NACCOUNTS, INITIAL_BALANCE);for (int i = 0; i < NACCOUNTS; i++){int fromAccount = i;Runnable r = () -> {try{while (true){int toAccount = (int) (bank.size() * Math.random());double amount = MAX_AMOUNT * Math.random();bank.transfer(fromAccount, toAccount, amount);Thread.sleep((int) (DELAY * Math.random()));}}catch (InterruptedException e){}};var t = new Thread(r);t.start();}}
}
synchronized
优点:简单,不易出错
缺点:每个锁只有一个条件
Lock/Condition
灵活
易出错
最好既不使用Lock/Condition也不使用synchronized关键字。
推荐使用juc的某种机制,如阻塞队列(线程安全的集合)。
4.6同步块
p596
4.7监视器概念
p597
4.8volatile字段
对字段读取和赋值的免锁机制。
- 保证可见性:各寄存器与内存的值相同。即各处理器上的线程看到的值与内存值相同。线程的修改对读取这个变量的所有其他线程都可见。
- 禁止指令重排。
- 不保证原子性:不能确保字段中的值取反。无法保证读取、取反和写入不被中断。
private volatile boolean done;
public boolean isDone() return done;//right
public void setDone() done = true;//right
public void flipDone() done = !done; //wrong不保证原子性,有问题
4.9final变量
final var accounts = new HashMap<String, Double>();
防止accounts变量还未被赋值就被其他线程看到。即其他线程不会看到accounts = null。但是如果有多个线程更改和读取这个映射,仍然需要同步保证安全。
4.10原子性
java.util.concurrent.atomic包中有很多原子类,通过高效的机器指令(cas机制必须是原子性的)保证原子性,无锁。
如:AtomicInteger类提供了incrementAndGet和decrementAndGet实现自增自减(本质也是调用unsafe cas)。更复杂的更新使用compareAndSet方法。
public class MyTest {public static void main(String[] args) throws InterruptedException {var number = new AtomicInteger(0);for (int i = 0; i < 10000; i++) {new Thread(() -> {number.incrementAndGet();}).start();}TimeUnit.MILLISECONDS.sleep(1000);System.out.println(number);}
}
乐观锁
- juc原子类的cas机制
- mysql的mvcc,多版本并发控制器
CAS操作方式:即compare and swap 或者 compare and set,涉及到三个操作数,数据所在的内存值,预期值,新值。当需要更新时,判断当前内存值与之前取到的值是否相等,若相等,则用新值更新,若失败则重试,一般情况下是一个自旋操作,即不断的重试。
version方式:一般是在数据表中加上一个数据版本号version字段,表示数据被修改的次数,当数据被修改时,version值会加一。当线程A要更新数据值时,在读取数据的同时也会读取version值,在提交更新时,若刚才读取到的version值为当前数据库中的version值相等时才更新,否则重试更新操作,直到更新成功。
乐观锁的缺点:
- aba问题。如果一个变量V初次读取的时候是A值,并且在准备赋值的时候检查到它仍然是A值,那我们就能说明它的值没有被其他线程修改过了吗?很明显是不能的,因为在这段时间它的值可能被改为其他值,然后又改回A,那CAS操作就会误认为它从来没有被修改过。这个问题被称为CAS操作的 “ABA”问题。JDK 1.5 以后的 AtomicStampedReference 类就提供了此种能力,其中的 compareAndSet 方法就是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
- 如果有多个变量要访问相同的原子值,性能会大幅下降。自旋CAS(也就是不成功就一直循环执行直到成功)如果长时间不成功,会给CPU带来非常大的执行开销。
- 只能保证一个共享变量的原子操作
CAS 只对单个共享变量有效,当操作涉及跨多个共享变量时 CAS 无效。但是从 JDK 1.5开始,提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行 CAS 操作.所以我们可以使用锁或者利用AtomicReference类把多个共享变量合并成一个共享变量来操作。
4.10原子性
对于存在大量竞争,自旋开销大,可以不使用AtomicLong,而使用LongAdder和LongAccumulator类解决。
LongAdder:
包含多个变量(加数),供多个线程更新不同加数,其总值为当前值,线程增加时会自动提供新的加数。
4.11死锁
观察上面同步之后写的代码。转账金额限制在$1000以内,因为平均每个账户$1000。避免死锁。
public class SynchBankTest2
{public static final int NACCOUNTS = 100;public static final double INITIAL_BALANCE = 1000;public static final double MAX_AMOUNT = 1000;public static final int DELAY = 10;public static void main(String[] args){var bank = new Bank(NACCOUNTS, INITIAL_BALANCE);for (int i = 0; i < NACCOUNTS; i++){int fromAccount = i;Runnable r = () -> {try{while (true){int toAccount = (int) (bank.size() * Math.random());double amount = MAX_AMOUNT * Math.random();bank.transfer(fromAccount, toAccount, amount);Thread.sleep((int) (DELAY * Math.random()));}}catch (InterruptedException e){}};var t = new Thread(r);t.start();}}
}
4.12为什么废除stop和suspend方法
避免破坏数据对象
避免死锁
4.13静态初始化
在第一次使用类时执行一个静态初始化器,而且只执行一次。jvm用锁实现的。
4.14线程局部变量
ThreadLocal辅助类为各个线程提供各自的实例,在初次get调用时才实例化。
public static final ThreadLocal<SimpleDateFormat> dateFormat = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd"));
String dateStamp = dateFormat.get().format(new Date());
此后每次get都会返回属于当前线程的那个实例。
java1.7提供的便利类:
int random = ThreadLocalRandom.current().nextInt(upperBound);
线程局部变量有时用于向协作完成某任务的所有方法提供对象,而不必在调用者之间传递这个对象。例如,共享一个数据库连接:同一个线程中的所有方法或类看到共享同一个数据库连接
public static final ThreadLocal<Connection> connection = ThreadLocal.withInitial(() -> null);
//为某一线程初始化数据库连接
connection.set(connect(url, username, password));
//这一线程中的任意类使用数据库连接
var result = connection.get().executeQuery(query);
5. 线程安全的集合
5.1阻塞队列
阻塞队列的实现依赖与锁和条件。
解决生产者消费这问题。
如:银行转账,一个线程将转装指令放入阻塞队列,一个线程从队列中取出指令并完成转账。
put | take |
---|---|
添加一个队尾元素,队满阻塞 | 移除并返回队头元素,队空阻塞 |
add | remove | element |
---|---|---|
抛出异常 |
offer | poll | peek |
---|---|---|
队满返回false | 队空返回null | 队空返回null |
可以选择超时时间(即阻塞多久) | 一样 | 一样 |
因此向阻塞队列中插入null值是非法的。
LinkedBlockingQueue | ArrayBlockingQueue | PriorityBlockingQueue |
---|---|---|
双端队列,可以指定最大容量 | 可以指定公平性 | 优先队列,用堆实现(不是先进先出队列),常用于任务调度 |
下面展示如何使用阻塞队列来控制一组线程。程序在一个目录及其子目录下搜索所有文件,打印包含指定关键字的行。
import java.io.*;
import java.nio.charset.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;/*** @version 1.03 2018-03-17* @author Cay Horstmann*/
public class BlockingQueueTest
{private static final int FILE_QUEUE_SIZE = 10;private static final int SEARCH_THREADS = 100;private static final Path DUMMY = Path.of("");private static BlockingQueue<Path> queue = new ArrayBlockingQueue<>(FILE_QUEUE_SIZE);public static void main(String[] args){try (var in = new Scanner(System.in)) {System.out.print("Enter base directory (e.g. /opt/jdk-11-src): ");String directory = in.nextLine();System.out.print("Enter keyword (e.g. volatile): ");String keyword = in.nextLine();Runnable enumerator = () ->{try{enumerate(Path.of(directory));queue.put(DUMMY);}catch (IOException e){e.printStackTrace();}catch (InterruptedException e){} };new Thread(enumerator).start();for (int i = 1; i <= SEARCH_THREADS; i++){Runnable searcher = () ->{try{boolean done = false;while (!done){Path file = queue.take();if (file == DUMMY){queue.put(file);done = true;}else search(file, keyword);}}catch (IOException e){e.printStackTrace();}catch (InterruptedException e){} };new Thread(searcher).start();}}}/*** Recursively enumerates all files in a given directory and its subdirectories.* See Chapters 1 and 2 of Volume II for the stream and file operations.* @param directory the directory in which to start*/public static void enumerate(Path directory) throws IOException, InterruptedException{try (Stream<Path> children = Files.list(directory)){for (Path child : children.toList()){if (Files.isDirectory(child))enumerate(child);elsequeue.put(child);}}}/*** Searches a file for a given keyword and prints all matching lines.* @param file the file to search* @param keyword the keyword to search for*/public static void search(Path file, String keyword) throws IOException{try (var in = new Scanner(file, StandardCharsets.UTF_8)){int lineNumber = 0;while (in.hasNextLine()){lineNumber++;String line = in.nextLine();if (line.contains(keyword)) System.out.printf("%s:%d:%s%n", file, lineNumber, line);}}}
}
5.2map、set、queue
juc提供了:
var map = new ConcurrentHashMap<String, Integer>();
var map1 = new ConcurrentSkipListMap<String, Integer>();
var set = new ConcurrentSkipListSet<String>();
var queue = new ConcurrentLinkedQueue<String>();
这些集合使用复杂的算法,通过允许并发地访问数据结构的不同部分尽可能减少竞争。hashtable和Collections.synchronizedMap线程安全,但是锁的整张表,效率低下
https://blog.csdn.net/qq_29051413/article/details/107869427
5.3ConcurrentHashMap条目的原子更新
map.compute(word, (k, v) -> v == null ? 1 : v + 1;
// ConcurrentHashMap中不允许由null值。很多方法都使用null值来表示映射中某个给定的键不存在
map.merge(word, 1L, (existingValue, newValue) -> existingValue + newValue);
map.merge(word, 1L, Long::sum);
上述两个方法的函数都是阻塞运行。
下面使用一个并发散列映射统计一个目录树的Java文件中的所有单词:
package javacore.cha12.concurrentHashMap;import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;/*** This program demonstrates concurrent hash maps.* @version 1.0 2018-01-04* @author Cay Horstmann*/
public class CHMDemo
{public static ConcurrentHashMap<String, Long> map = new ConcurrentHashMap<>();/*** Adds all words in the given file to the concurrent hash map. * @param file a file*/public static void process(Path file){try (var in = new Scanner(file)){while (in.hasNext()){String word = in.next();map.merge(word, 1L, Long::sum);}}catch (IOException e){e.printStackTrace();}}/*** Returns all descendants of a given directory--see Chapters 1 and 2 of Volume II* @param rootDir the root directory* @return a set of all descendants of the root directory*/public static Set<Path> descendants(Path rootDir) throws IOException{try (Stream<Path> entries = Files.walk(rootDir)){return entries.collect(Collectors.toSet());}}public static void main(String[] args)throws InterruptedException, ExecutionException, IOException{int processors = Runtime.getRuntime().availableProcessors();ExecutorService executor = Executors.newFixedThreadPool(processors);Path pathToRoot = Path.of(".");for (Path p : descendants(pathToRoot)){if (p.getFileName().toString().endsWith(".java"))executor.execute(() -> process(p));}executor.shutdown();executor.awaitTermination(10, TimeUnit.MINUTES);map.forEach((k, v) -> { if (v >= 10) System.out.println(k + " occurs " + v + " times");});}
}
5.4ConcurrentHashMap的批操作
P617
5.5并发集视图
通过ConcurrentHashMap得到线程安全的集。
类似于MySQL的虚表与基表。
Set<String> words = ConcurrentHashMap.<String>newKeySet();
//所有映射值为Boolean.TRUE
public class MyTest {public static void main(String[] args) throws InterruptedException {ConcurrentHashMap.KeySetView<String, Boolean> set = ConcurrentHashMap.<String>newKeySet();set.add("Danny");set.add("May");System.out.println(set);}
}
public class MyTest {public static void main(String[] args) throws InterruptedException {var map = new ConcurrentHashMap<String, Integer>();map.put("Danny", 1);map.put("May", 2);map.put("David", 3);ConcurrentHashMap.KeySetView<String, Integer> set = map.keySet(100);set.add("java");System.out.println(set);System.out.println(map);}
}
public class MyTest {public static void main(String[] args) throws InterruptedException {var map = new ConcurrentHashMap<String, Integer>();map.put("Danny", 1);ConcurrentHashMap.KeySetView<String, Integer> set = map.keySet();set.add("java");System.out.println(set);System.out.println(map);}
}
5.6CopyOnWriteArrayList/CopyOnWriteArraySet
它们是线程安全的集合,其所有更改器会建立底层数组的一个副本。
写时复制数组,加锁(同时只能有一个写线程),写完毕后将原数组的引用指向新数组。
读时,无锁(无同步开销),但读到的可能时旧数据。
适用于读多写少。
读写分离的思想,读和写不同的容器。
https://blog.csdn.net/u010002184/article/details/90452918
读锁,写锁?
5.7并行数组算法
Arrays类提供了大量并行化操作。
var contents = new String(Files.readAllBytes(Path.of("alice.txt")), StandardCharsets.UTF_8);
String[] words = contents.split("[\\P{L}]+");
Arrays.parallelSort(words);
Arrays.parallelSort(words, Comparator.comparing(String::length));
5.9早期的线程安全集合
Vector,Hashtable。
任何集合类都可以通过使用同步包装器编程线程安全的(同步视图):
List<String> synchList = Collections.synchronizedList(new ArrayList<String>());
Map<Object, Object> synchMap = Collections.synchronizedMap(new HashMap<>());
得到的集合的方法加锁。锁的时整个集合对象。
如果一个线程使用迭代器,同时其他线程可能修改集合,那么需要“客户端”锁定:
synchronized(synchMap){Iterator<K> iter = synchMap.keySet().iterator();while(iter.hasNext())...;
不然会抛出并发修改异常(ConcurrentModificationException)
6. 任务和线程池
为线程池提供一个Runnable,其中会有一个线程调用run方法。run方法退出时,线程不会死亡,而是留在池中准备为下一个请求提供服务。
6.1Callable与Future
Runnable:封装了一个异步运行的任务,是没有参数和返回值的异步方法。
Callable与Runnable类似,但有返回值:
public interface Callable<V> {V call() throws Exception;
}
Callable<Integer>表示一个返回Integer对象的异步计算
Future保存异步计算的结果。把Future交个异步计算任务,就可以干其他事情了,计算出结果时,Future对象的所有者就会得到这个结果。
FutureTask,它实现了Future和Runnable接口:
Callable<Integer> task = ...;
var futureTask = new FutureTask<Integer>(task);
var t = new Thread(futureTask);//既有runnable特性
...
Integer result = futureTask.get();//又有future特性
这是执行Callable的一种方法。
更常见的是把它传递给执行器。
6.2执行器Executors
执行器类(Executors)有许多用来构造线程池的静态工厂方法。
newCachedThreadPool:缓存线程池,任务周期很短时。
newFixedThreadPool:固定大小线程池,并发线程数等于处理器内核数时。
newSingleThreadExecutor:单线程执行器,性能分析。
这三个方法都返回一个实现了ExecutorService接口的ThreadPoolExecutor类的对象。
下面方法向ExecutorService提交一个Runnable或Callable:
Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);//可以调用isDone、cancel或isCancelled,但get返回null
Future<T> submit(Runnable task, T result);//get返回 T result
ScheduledExecutorService接口:
查查资料
6.3控制任务组invoke
invokeAny方法提交一个Callable对象集合中的所有对象,并返回第一个完成的任务结果。对于搜索问题,拿到一个解就够了。
invokeAll方法提交一个Callable对象集合中的所有对象,阻塞,直到所有任务都完成,并返回表示所有任务答案的Future对象列表:
List<Callable<T>> tasks = ...;
List<Future<T>> results = executor.invokeAll(tasks);
for(Future<T> result : results) processFurther(result.get());//阻塞,知道该结果可用
按照结果可用的顺序排放Future列表:ExecutorCompletionService
var service = new ExecutorComletionService<T>(executor);
for (Callable<T> task : tasks) service.submit(task);
for (int i = 0; i < tasks.size(); i++) processFurther(service.take().get());
例子:
第一个计算中:统计目录树中给定单词的出现次数。为每个文件创建一个单独的Callable任务。将任务列表通过invokeAll传递给执行器,最后阻塞加总每个Future结果。
第二个计算:搜索包含指定单词的第一个文件。invokeAny。
一旦有任务返回,invokeAny方法就会终止,所以任务失败时用抛出异常来代替任务返回。
任务成功时,其他任务就要取消,因此要监视中断状态。
package javacore.cha12.executors;import java.io.*;
import java.nio.file.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;/*** This program demonstrates the Callable interface and executors.* @version 1.01 2021-05-30* @author Cay Horstmann*/
public class ExecutorDemo
{/*** Counts occurrences of a given word in a file.* @return the number of times the word occurs in the given word*/public static long occurrences(String word, Path path){try (var in = new Scanner(path)){int count = 0;while (in.hasNext())if (in.next().equals(word)) count++;return count;}catch (IOException ex){return 0;}}/*** Returns all descendants of a given directory--see Chapters 1 and 2 of Volume II.* @param rootDir the root directory* @return a set of all descendants of the root directory*/public static Set<Path> descendants(Path rootDir) throws IOException{try (Stream<Path> entries = Files.walk(rootDir)){return entries.filter(Files::isRegularFile).collect(Collectors.toSet());}}/*** Yields a task that searches for a word in a file.* 第二个计算:搜索包含指定单词的第一个文件。invokeAny。
一旦有任务返回,invokeAny方法就会终止,所以任务失败时用抛出异常来代替任务返回。
任务成功时,其他任务就要取消,因此要监视中断状态。* @param word the word to search* @param path the file in which to search* @return the search task that yields the path upon success*/public static Callable<Path> searchForTask(String word, Path path){return () ->{try (var in = new Scanner(path)){while (in.hasNext()){if (in.next().equals(word)) return path;if (Thread.currentThread().isInterrupted()){System.out.println("Search in " + path + " canceled.");return null;}}throw new NoSuchElementException();}};}public static void main(String[] args)throws InterruptedException, ExecutionException, IOException{try (var in = new Scanner(System.in)){System.out.print("Enter base directory (e.g. /opt/jdk-9-src): ");String start = in.nextLine();System.out.print("Enter keyword (e.g. volatile): ");String word = in.nextLine();Set<Path> files = descendants(Path.of(start));var tasks = new ArrayList<Callable<Long>>();for (Path file : files){Callable<Long> task = () -> occurrences(word, file); tasks.add(task);}ExecutorService executor = Executors.newCachedThreadPool();// use a single thread executor instead to see if multiple threads// speed up the search// ExecutorService executor = Executors.newSingleThreadExecutor();Instant startTime = Instant.now();List<Future<Long>> results = executor.invokeAll(tasks);long total = 0;for (Future<Long> result : results)total += result.get();Instant endTime = Instant.now();System.out.println("Occurrences of " + word + ": " + total);System.out.println("Time elapsed: "+ Duration.between(startTime, endTime).toMillis() + " ms");var searchTasks = new ArrayList<Callable<Path>>();for (Path file : files)searchTasks.add(searchForTask(word, file));Path found = executor.invokeAny(searchTasks);System.out.println(word + " occurs in: " + found);if (executor instanceof ThreadPoolExecutor tpExecutor)// the single thread executor isn'tSystem.out.println("Largest pool size: " + tpExecutor.getLargestPoolSize());executor.shutdown();}}
}
6.4fork-join框架
解决问题:线程池中有空闲线程。
递归计算。
需要提供一个扩展(extends)了RecursiveTask<T>(计算会生成一个T类型的结果)的类
或RecursiveActon(不生成任何结果)。
在覆盖(@Override)compute方法来生成、调用子任务,然后合并结果
package javacore.cha12.forkJoin;import java.util.concurrent.*;
import java.util.function.*;/*** This program demonstrates the fork-join framework.* @version 1.02 2021-06-17* @author Cay Horstmann*/
public class ForkJoinTest
{public static void main(String[] args){final int SIZE = 10000000;var numbers = new double[SIZE];for (int i = 0; i < SIZE; i++) numbers[i] = Math.random();var counter = new Counter(numbers, 0, numbers.length, x -> x > 0.5);var pool = new ForkJoinPool();pool.invoke(counter);System.out.println(counter.join());}
}//RecursiveTask<T>计算会生成一个T类型结果
//RecursiveAction不生成结果
//
class Counter extends RecursiveTask<Integer>
{public static final int THRESHOLD = 1000;private double[] values;private int from;private int to;private DoublePredicate filter;public Counter(double[] values, int from, int to, DoublePredicate filter){this.values = values;this.from = from;this.to = to;this.filter = filter;}protected Integer compute(){if (to - from < THRESHOLD){int count = 0;for (int i = from; i < to; i++){if (filter.test(values[i])) count++;}return count;}else{int mid = from + (to - from) / 2;var first = new Counter(values, from, mid, filter);var second = new Counter(values, mid, to, filter);invokeAll(first, second);return first.join() + second.join();}}
}
work stealing(工作密取):平衡可用线程的工作负载。每个工作线程都有存放任务的一个双端队列(deque)。一个工作线程将子任务压入其双端队列的队头(只有它可以访问队头,所以不用加锁)。当另一个工作线程空闲时,它会从deque队尾steal一个任务。由于大任务都在队尾,所以steal很少见。
fork-join池是针对非阻塞工作负载优化的。
7. 异步
先看看收藏夹
【【Java并发·05】CompletableFuture扩展】 https://www.bilibili.com/video/BV1wZ4y1A7PK/?share_source=copy_web&vd_source=448c4b50b6938e7b8aa0551018e065d3
1CompletableFuture
无等待。
前面的Future都需要调用get()方法阻塞等待结果。
CompletableFuture与FutureTast都实现了Future接口。
CompletableFuture异步得到结果:实现了Future接口。你要注册一个回调(callback),一旦结果可用,就会(在某个线程中)利用该结果调用这个回调。
public class SmallTool {public static void sleepMillis(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}public static void printTimeAndThread(String tag) {String result = new StringJoiner("\t|\t").add(String.valueOf(System.currentTimeMillis())).add(String.valueOf(Thread.currentThread().getId())).add(Thread.currentThread().getName()).add(tag).toString();System.out.println(result);}}
supplyAsync:开启
public class _01_supplyAsync {public static void main(String[] args) {SmallTool.printTimeAndThread("小白进入餐厅");SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("厨师炒菜");SmallTool.sleepMillis(200);SmallTool.printTimeAndThread("厨师打饭");SmallTool.sleepMillis(100);return "番茄炒蛋 + 米饭 做好了";});SmallTool.printTimeAndThread("小白在打王者");SmallTool.printTimeAndThread(String.format("%s ,小白开吃", cf1.join()));}
}
thenCompose:连接
public class _02_thenCompose {public static void main(String[] args) {SmallTool.printTimeAndThread("小白进入餐厅");SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("厨师炒菜");SmallTool.sleepMillis(200);return "番茄炒蛋";}).thenCompose(dish -> CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员打饭");SmallTool.sleepMillis(100);return dish + " + 米饭";}));SmallTool.printTimeAndThread("小白在打王者");SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));}/*** 用 applyAsync 也能实现*/private static void applyAsync() {SmallTool.printTimeAndThread("小白进入餐厅");SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("厨师炒菜");SmallTool.sleepMillis(200);CompletableFuture<String> race = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员打饭");SmallTool.sleepMillis(100);return " + 米饭";});return "番茄炒蛋" + race.join();});SmallTool.printTimeAndThread("小白在打王者");SmallTool.printTimeAndThread(String.format("%s 好了,小白开吃", cf1.join()));}
}
thenCombine:结合
public class _03_thenCombine {public static void main(String[] args) {SmallTool.printTimeAndThread("小白进入餐厅");SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("厨师炒菜");SmallTool.sleepMillis(200);return "番茄炒蛋";}).thenCombine(CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员蒸饭");SmallTool.sleepMillis(300);return "米饭";}), (dish, rice) -> {SmallTool.printTimeAndThread("服务员打饭");SmallTool.sleepMillis(100);return String.format("%s + %s 好了", dish, rice);});SmallTool.printTimeAndThread("小白在打王者");SmallTool.printTimeAndThread(String.format("%s ,小白开吃", cf1.join()));}/*** 用 applyAsync 也能实现*/private static void applyAsync() {SmallTool.printTimeAndThread("小白进入餐厅");SmallTool.printTimeAndThread("小白点了 番茄炒蛋 + 一碗米饭");CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("厨师炒菜");SmallTool.sleepMillis(200);return "番茄炒蛋";});CompletableFuture<String> race = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员蒸饭");SmallTool.sleepMillis(300);return "米饭";});SmallTool.printTimeAndThread("小白在打王者");String result = String.format("%s + %s 好了", cf1.join(), race.join());SmallTool.printTimeAndThread("服务员打饭");SmallTool.sleepMillis(100);SmallTool.printTimeAndThread(String.format("%s ,小白开吃", result));}
}
2组合ComletableFuture
即另开一线程。
https://blog.csdn.net/sermonlizhi/article/details/123356877?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522168707917916782425124543%2522%252C%2522scm%2522%253A%252220140713.130102334…%2522%257D&request_id=168707917916782425124543&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2alltop_positive~default-1-123356877-null-null.142v88control_2,239v2insert_chatgpt&utm_term=completablefuture&spm=1018.2226.3001.4187
thenAplly:
public class _01_thenApply {public static void main(String[] args) {SmallTool.printTimeAndThread("小白吃好了");SmallTool.printTimeAndThread("小白 结账、要求开发票");CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员收款 500元");SmallTool.sleepMillis(100);return "500";}).thenApplyAsync(money -> {SmallTool.printTimeAndThread(String.format("服务员开发票 面额 %s元", money));SmallTool.sleepMillis(200);return String.format("%s元发票", money);});SmallTool.printTimeAndThread("小白 接到朋友的电话,想一起打游戏");SmallTool.printTimeAndThread(String.format("小白拿到%s,准备回家", invoice.join()));}private static void one() {SmallTool.printTimeAndThread("小白吃好了");SmallTool.printTimeAndThread("小白 结账、要求开发票");CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员收款 500元");SmallTool.sleepMillis(100);SmallTool.printTimeAndThread("服务员开发票 面额 500元");SmallTool.sleepMillis(200);return "500元发票";});SmallTool.printTimeAndThread("小白 接到朋友的电话,想一起打游戏");SmallTool.printTimeAndThread(String.format("小白拿到%s,准备回家", invoice.join()));}private static void two() {SmallTool.printTimeAndThread("小白吃好了");SmallTool.printTimeAndThread("小白 结账、要求开发票");CompletableFuture<String> invoice = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员收款 500元");SmallTool.sleepMillis(100);CompletableFuture<String> waiter2 = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("服务员开发票 面额 500元");SmallTool.sleepMillis(200);return "500元发票";});return waiter2.join();});SmallTool.printTimeAndThread("小白 接到朋友的电话,想一起打游戏");SmallTool.printTimeAndThread(String.format("小白拿到%s,准备回家", invoice.join()));}
}
applyToEither:谁先成功用谁
public class _02_applyToEither {public static void main(String[] args) {SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");SmallTool.printTimeAndThread("等待 700路 或者 800路 公交到来");CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("700路公交正在赶来");SmallTool.sleepMillis(100);return "700路到了";}).applyToEither(CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("800路公交正在赶来");SmallTool.sleepMillis(200);return "800路到了";}), firstComeBus -> firstComeBus);SmallTool.printTimeAndThread(String.format("%s,小白坐车回家", bus.join()));}
}
exceptionally:
public class _03_exceptionally {public static void main(String[] args) {SmallTool.printTimeAndThread("张三走出餐厅,来到公交站");SmallTool.printTimeAndThread("等待 700路 或者 800路 公交到来");CompletableFuture<String> bus = CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("700路公交正在赶来");SmallTool.sleepMillis(100);return "700路到了";}).applyToEither(CompletableFuture.supplyAsync(() -> {SmallTool.printTimeAndThread("800路公交正在赶来");SmallTool.sleepMillis(200);return "800路到了";}), firstComeBus -> {SmallTool.printTimeAndThread(firstComeBus);if (firstComeBus.startsWith("700")) {throw new RuntimeException("撞树了……");}return firstComeBus;}).exceptionally(e -> {SmallTool.printTimeAndThread(e.getMessage());SmallTool.printTimeAndThread("小白叫出租车");return "出租车 叫到了";});SmallTool.printTimeAndThread(String.format("%s,小白坐车回家", bus.join()));}
}