系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、项目背景
- 二、第七轮对话-补充异步执行代码
- 三、第八轮对话-增加累计完成等字眼
- 四、第九轮对话-线程安全优化
- 五、第十轮对话-增加运行时数据
- 七、参考文章
前言
在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本文将介绍从零到一用AI大模型编写一个多线程并发框架。
一、项目背景
经过上两篇文章和AI的对话,我们基本捣鼓出来了多线程并发框架的雏形,接下来是比较重要的监控模块,本文将会通过比较长的篇幅来叙述。
本多线程框架使用示例如下:源码地址
1、引入依赖。
<dependency><groupId>io.github.vipjoey</groupId><artifactId>mmc-juc</artifactId><version>1.0</version>
</dependency>
2、使用示例。
// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder().taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源.taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法.taskMerger(Integer::sum) // 设置结果处理方法(可选).threshold(10) // 设置任务处理阈值(可选).taskName("mmcTaskExample") // 设置任务名称.rateLimiter(10, 20) // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms.forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4.build();// 同步执行并打印结果
System.out.println("result: " + mmcTaskExecutor.execute());// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms
result: 5050
二、第七轮对话-补充异步执行代码
1、参考限流器的对话,我们知道AI是不会主动帮忙抽象公共接口的,所以这里我们干脆要求AI给我们定义好监听器接口,方便后续扩展。
2、下面是AI给的回复。
- 【为了实现监听器功能,我们首先需要创建一个监听器接口MmcTaskListener,然后在MmcTaskExecutor类中使用它。我们还将提供一个默认实现DefaultMmcTaskListener,并允许用户通过Builder类替换监听器。首先,我们创建一个MmcTaskListener接口。】
public interface MmcTaskListener {void onTaskStarted(int totalTasks, int remainingTasks);void onTaskCompleted(int totalTasks, int remainingTasks, long elapsedTime);
}
- 【接下来,我们创建一个默认的监听器实现DefaultMmcTaskListener。】
public class DefaultMmcTaskListener implements MmcTaskListener {@Overridepublic void onTaskStarted(int totalTasks, int remainingTasks) {System.out.println("Task started. Total tasks: " + totalTasks + ", remaining tasks: " + remainingTasks);}@Overridepublic void onTaskCompleted(int totalTasks, int remainingTasks, long elapsedTime) {System.out.println("Task completed. Total tasks: " + totalTasks + ", remaining tasks: " + remainingTasks + ", elapsed time: " + elapsedTime + " ms");}
}
- 【然后,我们修改MmcTaskExecutor类以使用MmcTaskListener接口。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量private MmcTaskListener taskListener;private MmcTaskExecutor(Builder<T, R> builder) {// ...省略其他初始化代码this.taskListener = builder.taskListener;}// ...省略其他方法public static class Builder<T, R> {// ...省略其他成员变量private MmcTaskListener taskListener = new DefaultMmcTaskListener(); // 默认使用DefaultMmcTaskListener// ...省略其他方法public Builder<T, R> taskListener(MmcTaskListener taskListener) {this.taskListener = taskListener;return this;}// ...省略其他方法}
}
- 【最后,我们需要修改MmcTask类以调用监听器的方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量private MmcTaskListener taskListener;public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener) {// ...省略其他初始化代码this.taskListener = taskListener;}@Overrideprotected R compute() {// ...省略其他代码// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(taskSource.size(), end - start);// ...省略其他代码// 在任务完成后调用onTaskCompleted方法long elapsedTime = System.currentTimeMillis() - startTime;taskListener.onTaskCompleted(taskSource.size(), end - start - 1, elapsedTime);// ...省略其他代码}
}
本轮AI的回复有些欠缺,它并没有GET到我的诉求,可能跟我的提问方式有关。实际上我需要监控的是TaskSource集合被执行完成的进度,AI只是监控了compute方法,也就是监控每个阈值下的小任务的执行,这明显是错的。
三、第八轮对话-增加累计完成等字眼
1、由于上一轮对话,可能是提问方式不对,我重新增加【首次】、【累计】这些字眼,希望AI能理解我的意思。
2、下面是AI大模型给的回复。
- 【为了满足您的要求,我们需要修改MmcTaskListener接口以添加新的事件方法,然后修改DefaultMmcTaskListener以实现这些方法。接下来,我们将在MmcTaskExecutor类和MmcTask类中调用这些方法。首先,我们修改MmcTaskListener接口以添加新的事件方法。】
public interface MmcTaskListener {void onTasksSubmitted(int totalTasks, long startTime);void onTaskStarted(int completedTasks, int remainingTasks);void onTasksCompleted(long elapsedTime, long endTime);
}
- 【接下来,我们修改DefaultMmcTaskListener以实现新的事件方法。】
public class DefaultMmcTaskListener implements MmcTaskListener {@Overridepublic void onTasksSubmitted(int totalTasks, long startTime) {System.out.println("Tasks submitted. Total tasks: " + totalTasks + ", start time: " + startTime);}@Overridepublic void onTaskStarted(int completedTasks, int remainingTasks) {System.out.println("Task started. Completed tasks: " + completedTasks + ", remaining tasks: " + remainingTasks);}@Overridepublic void onTasksCompleted(long elapsedTime, long endTime) {System.out.println("Tasks completed. Elapsed time: " + elapsedTime + " ms, end time: " + endTime);}
}
- 【然后,我们需要修改MmcTaskExecutor类以调用onTasksSubmitted和onTasksCompleted方法。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks = new AtomicInteger(); // 添加一个原子整数以跟踪已完成的任务数量// 同步执行并返回结果public R execute() {MmcTask<T, R> mmcTask = new MmcTask<>(taskSource, taskProcessor, taskMerger, threshold, 0, taskSource.size(), rateLimiter, taskListener, completedTasks);// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(taskSource.size(), System.currentTimeMillis());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法taskListener.onTasksCompleted(mmcTask.getElapsedTime(), System.currentTimeMillis());return result;}// ...省略其他方法
}
- 【最后,我们需要修改MmcTask类以调用onTaskStarted方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks;public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener, AtomicInteger completedTasks) {// ...省略其他初始化代码this.completedTasks = completedTasks;}@Overrideprotected R compute() {// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(completedTasks.get(), end - start);// ...省略其他代码// 在任务完成后更新已完成任务的计数completedTasks.incrementAndGet();// ...省略其他代码}
}
从本次AI大模型的回复来看,存在以下问题:
- 它似乎不理解任务拆分前后计数的区别,实际上后续我尝试了N种问法,它都不理解,它理解的任务累计完成个数就是
completedTasks.incrementAndGet();
而不是我想要的taskRuntime.getCompletedTasks().addAndGet(end - start);
- 它不会主动发现线程安全问题,这个跟我们提问的要求有关,毕竟是生成式AI,所以它不会考虑任务提交时需要重置AtomicInteger。
- 同理,它的剩余任务个数的计算方式也是错误的,它的答案是
end - start
,而不是我想要的taskSource.size() - 累计完成任务个数
。
不过,这些都是小问题,不用太纠结,我们继续提诉求去优化。
四、第九轮对话-线程安全优化
1、从AI大模型给出的代码来看,是没有考虑线程安全问题的,我们需要补充一下诉求:
2、下面是AI的回复。
- 【为了满足您的要求,我们将对MmcTaskExecutor类进行修改以确保线程安全,并在每次提交任务时重置计数器。我们将使用原子整数AtomicInteger来实现线程安全的计数器。首先,我们将修改MmcTaskExecutor类以添加一个原子整数completedTasks以跟踪已完成的任务数量。我们还将在每次执行任务时重置completedTasks。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks = new AtomicInteger(); // 添加一个原子整数以跟踪已完成的任务数量// 同步执行并返回结果public R execute() {completedTasks.set(0); // 重置已完成任务的计数器MmcTask<T, R> mmcTask = new MmcTask<>(taskSource, taskProcessor, taskMerger, threshold, 0, taskSource.size(), rateLimiter, taskListener, completedTasks);// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(taskSource.size(), System.currentTimeMillis());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法taskListener.onTasksCompleted(mmcTask.getElapsedTime(), System.currentTimeMillis());return result;}// ...省略其他方法
}
- 【接下来,我们将修改MmcTask类以确保线程安全并调用监听器的方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量和方法private AtomicInteger completedTasks;public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener, AtomicInteger completedTasks) {// ...省略其他初始化代码this.completedTasks = completedTasks;}@Overrideprotected R compute() {// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(completedTasks.get(), end - start);// ...省略其他代码// 在任务完成后更新已完成任务的计数int completed = completedTasks.incrementAndGet();// 调用onTaskStarted方法,以便在每个小任务完成时更新已完成任务的计数taskListener.onTaskStarted(completed, end - start - completed);// ...省略其他代码}
}
这次AI大模型的回复也是不尽人意,如果多个MmcTask任务同时执行的话,计数器是不准确的,我们继续耐心优化一下我们的提示词。
五、第十轮对话-增加运行时数据
1、由于我们设计的初衷,任务可以多次提交的。所以每个任务运行时应该产生运行时数据,我们监控的也是运行时数据,运行时数据需要一个唯一标识,也就是任务名称来与之关联。另外,由于MmcTask的构造函数过长,我在其它对话中让它优化过了,这里就不详细讲述。
2、以下是AI的回复。
- 【为了满足您的要求,我们可以创建一个新的类TaskRuntime,它将包含任务名称、任务源总数、已完成任务数和剩余任务数等信息。然后,我们将修改MmcTask和MmcTaskListener以使用TaskRuntime。首先,我们创建一个新的类TaskRuntime。】
public class TaskRuntime {private String taskName;private int totalTasks;private AtomicInteger completedTasks;private AtomicInteger remainingTasks;public TaskRuntime(String taskName, int totalTasks) {this.taskName = taskName;this.totalTasks = totalTasks;this.completedTasks = new AtomicInteger();this.remainingTasks = new AtomicInteger(totalTasks);}// ...省略getter和setter方法
}
- 【接下来,我们修改MmcTaskListener接口以使用TaskRuntime。】
public interface MmcTaskListener {void onTasksSubmitted(TaskRuntime taskRuntime, long startTime);void onTaskStarted(TaskRuntime taskRuntime);void onTasksCompleted(TaskRuntime taskRuntime, long elapsedTime, long endTime);
}
- 【然后,我们修改MmcTask以使用TaskRuntime。】
import java.util.List;
import java.util.concurrent.RecursiveTask;public class MmcTask<T, R> extends RecursiveTask<R> {// ...省略其他成员变量和方法private TaskRuntime taskRuntime;public MmcTask(Builder<T, R> builder) {// ...省略其他初始化代码this.taskRuntime = new TaskRuntime(builder.taskName, builder.taskSource.size());}@Overrideprotected R compute() {// 在处理任务之前调用onTaskStarted方法taskListener.onTaskStarted(taskRuntime);// ...省略其他代码// 在任务完成后更新已完成任务的计数taskRuntime.getCompletedTasks().incrementAndGet();taskRuntime.getRemainingTasks().decrementAndGet();// ...省略其他代码}
}
- 【最后,我们修改MmcTaskExecutor类以使用TaskRuntime。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;public class MmcTaskExecutor<T, R> {// ...省略其他成员变量和方法// 提交新任务public R submitTask(MmcTask<T, R> mmcTask) {// 调用onTasksSubmitted方法taskListener.onTasksSubmitted(mmcTask.getTaskRuntime(), System.currentTimeMillis());R result = forkJoinPool.invoke(mmcTask);// 调用onTasksCompleted方法taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), mmcTask.getElapsedTime(), System.currentTimeMillis());return result;}// 同步执行并返回结果public R execute() {MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()// ...省略其他方法调用.build();return submitTask(mmcTask);}
}
这轮AI的回答还是有问题,例如下面:
public MmcTask(Builder<T, R> builder) {// ...省略其他初始化代码this.taskRuntime = new TaskRuntime(builder.taskName, builder.taskSource.size());}
它在MmcTask来初始化运行时数据,这明显是不对的,继续让它修改。下一篇,继续优化监听器 - -
七、参考文章
- 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
- 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
- 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
- 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
- 《AI大模型编写多线程并发框架(六十五):发布和应用》
加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你