模拟了查询耗时操作,并使用FutureTask和CompletableFuture分别获取计算结果,统计执行时长
package org.alllearn.futurtask;import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.FutureTask;
import java.util.stream.Collectors;public class CompletableFutureTest {public static void main(String[] args) {CompletableFutureTest test = new CompletableFutureTest();List<User> users = test.getUsers();//流计算的简单使用Stopwatch started = Stopwatch.createStarted();double average = users.stream().mapToInt(User::getAge).average().getAsDouble();System.out.println(average + " " + started.elapsed());//模拟通过id集合去其他表批量查询数据//单线程任务分割,数据库承受不了大量id的in查询List<List<User>> partition1 = Lists.partition(users, users.size() / 200);started.reset();started.start();double average1 = partition1.stream().mapToDouble(test::select).average().getAsDouble();System.out.println(average1 + " " + started.elapsed());//多线程任务两种方案FutureTask、CompletableFutureint cpu = Runtime.getRuntime().availableProcessors();//计算密集型cpu+1,io密集型cpu*2ForkJoinPool forkJoinPool = new ForkJoinPool(cpu + 1);List<List<User>> partition2 = Lists.partition(users, users.size() / 200);//FutureTaskstarted.reset();started.start();double average2 = partition2.stream().map(u -> {FutureTask<Double> futureTask = new FutureTask<>(() -> test.select(u));forkJoinPool.submit(futureTask);return futureTask;}).collect(Collectors.toList()).stream().mapToDouble(f -> {try {return f.get();} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}}).average().getAsDouble();System.out.println(average2 + " " + started.elapsed());//CompletableFuturestarted.reset();started.start();double average3 = partition2.stream().map(u -> CompletableFuture.supplyAsync(() -> test.select(u), forkJoinPool)).collect(Collectors.toList()).stream().mapToDouble(CompletableFuture::join).average().getAsDouble();forkJoinPool.shutdown();System.out.println(average3 + " " + started.elapsed());}//模拟查询耗时public Double select(List<User> user) {try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return 1D;//return Math.random();}//模拟获取用户public List<User> getUsers() {List<User> users = new ArrayList<>();for (int i = 0; i < 20000; i++) {users.add(new User(i, "user" + i, 1));}return users;}@Getter@Setter@AllArgsConstructorstatic class User {private int id;private String name;private int age;}}
运行结果