使用双异步后,如何保证数据一致性?

在这里插入图片描述

目录

    • 一、前情提要
    • 二、通过Future获取异步返回值
      • 1、FutureTask 是基于 AbstractQueuedSynchronizer实现的
      • 2、FutureTask执行流程
      • 3、get()方法执行流程
    • 三、FutureTask源码具体分析
      • 1、FutureTask源码
        • 2、将异步方法的返回值改为```Future<Integer>```,将返回值放到```new AsyncResult<>();```中;
      • 3、通过```Future<Integer>.get()```获取返回值:
      • 4、这里也可以通过新线程+Future获取Future返回值
      • 在BUG中磨砺,在优化中成长

大家好,我是哪吒。

一、前情提要

在上一篇文章中,我们通过双异步的方式导入了10万行的Excel,有个小伙伴在评论区问我,如何保证插入后数据的一致性呢?

很简单,通过对比Excel文件行数和入库数量是否相等即可。

那么,如何获取异步线程的返回值呢?

在这里插入图片描述

二、通过Future获取异步返回值

我们可以通过给异步方法添加Future返回值的方式获取结果。

FutureTask 除了实现 Future 接口外,还实现了 Runnable 接口。因此,FutureTask 可以交给 Executor 执行,也可以由调用线程直接执行FutureTask.run()。

1、FutureTask 是基于 AbstractQueuedSynchronizer实现的

AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。
基于 AQS 实现的同步器包括: ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。

基于 AQS实现的同步器包含两种操作:

  1. acquire,阻塞调用线程,直到AQS的状态允许这个线程继续执行,在FutureTask中,get()就是这个方法;
  2. release,改变AQS的状态,使state变为非阻塞状态,在FutureTask中,可以通过run()和cancel()实现。

2、FutureTask执行流程

在这里插入图片描述

  1. 执行@Async异步方法;
  2. 建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable);
  3. 判断状态state;
    • 如果未新建或者不处于AQS,直接返回;
    • 否则进入COMPLETING状态,执行异步线程代码;
  4. 如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1;
  5. 线程async-executor-1被唤醒后
    • 将自己从AQS队列中移除;
    • 然后唤醒next线程async-executor-2;
    • 改变线程async-executor-1的state;
    • 等待get()线程取值。
  6. next等待线程被唤醒后,循环线程async-executor-1的步骤
    • 被唤醒
    • 从AQS队列中移除
    • 唤醒next线程
    • 改变异步线程状态
  7. 新建线程async-executor-N,监听异步方法的state
    • 如果处于EXCEPTIONAL以上状态,抛出异常;
    • 如果处于COMPLETING状态,加入AQS队列等待;
    • 如果处于NORMAL状态,返回结果;

3、get()方法执行流程

get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。

自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。

在这里插入图片描述

  1. 如果state小于等于COMPLETING,表示任务还在执行中;
    • 如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常;
    • 如果state大于COMPLETING;
      • 如果已有等待节点WaitNode,将线程置空;
      • 返回当前状态;
    • 如果任务正在执行,让出时间片;
    • 如果还未构造等待节点,则new一个新的等待节点;
    • 如果未入队列,CAS尝试入队;
    • 如果有超时时间参数;
      • 计算超时时间;
      • 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state;
      • 阻塞队列nanos毫秒。
    • 否则阻塞队列;
  2. 如果state大于COMPLETING;
    • 如果执行完毕,返回结果;
    • 如果大于等于取消状态,则抛出异常。

很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~

其实,只要把源码的执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~

简而言之:

1. 如果异步线程还没执行完,则进入CAS自旋;
2. 其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程;
3. 再通过get()判断状态state;
4. 直至返回结果或(取消、超时、异常)为止。

三、FutureTask源码具体分析

1、FutureTask源码

通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。

public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}
public class FutureTask<V> implements RunnableFuture<V> {// 最初始的状态是new 新建状态private volatile int state;private static final int NEW          = 0; // 新建状态private static final int COMPLETING   = 1; // 完成中private static final int NORMAL       = 2; // 正常执行完private static final int EXCEPTIONAL  = 3; // 异常private static final int CANCELLED    = 4; // 取消private static final int INTERRUPTING = 5; // 正在中断private static final int INTERRUPTED  = 6; // 已中断public V get() throws InterruptedException, ExecutionException {int s = state;// 任务还在执行中if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);}private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {// 线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;// 任务已执行完毕或取消if (s > COMPLETING) {// 如果已有等待节点WaitNode,将线程置空if (q != null)q.thread = null;return s;}// 任务正在执行,让出时间片else if (s == COMPLETING) // cannot time out yetThread.yield();// 还未构造等待节点,则new一个新的等待节点else if (q == null)q = new WaitNode();// 未入队列,CAS尝试入队else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);// 如果有超时时间参数else if (timed) {// 计算超时时间nanos = deadline - System.nanoTime();// 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态stateif (nanos <= 0L) {removeWaiter(q);return state;}// 阻塞队列nanos毫秒LockSupport.parkNanos(this, nanos);}else// 阻塞队列LockSupport.park(this);}}private V report(int s) throws ExecutionException {// 获取outcome中记录的返回结果Object x = outcome;// 如果执行完毕,返回结果if (s == NORMAL)return (V)x;// 如果大于等于取消状态,则抛出异常if (s >= CANCELLED)throw new CancellationException();throw new ExecutionException((Throwable)x);}
}
2、将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中;
@Async("async-executor")
public void readXls(String filePath, String filename) {try {// 此代码为简化关键性代码List<Future<Integer>> futureList = new ArrayList<>();for (int time = 0; time < times; time++) {Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();futureList.add(sumFuture);}}catch (Exception e){logger.error("readXlsCacheAsync---插入数据异常:",e);}
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {try {// 此代码为简化关键性代码return new AsyncResult<>(sum);}catch (Exception e){return new AsyncResult<>(0);}
}

3、通过Future<Integer>.get()获取返回值:

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) throws Exception{int[] futureSumArr = new int[futureList.size()];for (int i = 0;i<futureList.size();i++) {try {Future<Integer> future = futureList.get(i);while (true) {if (future.isDone() && !future.isCancelled()) {Integer futureSum = future.get();logger.info("获取Future返回值成功"+"----Future:" + future+ ",Result:" + futureSum);futureSumArr[i] += futureSum;break;} else {logger.info("Future正在执行---获取Future返回值中---等待3秒");Thread.sleep(3000);}}} catch (Exception e) {logger.error("获取Future返回值异常: ", e);}}boolean insertFlag = getInsertSum(futureSumArr, excelRow);logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag);return insertFlag;
}

4、这里也可以通过新线程+Future获取Future返回值

不过感觉多此一举了,就当练习Future异步取返回值了~

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) throws Exception {ExecutorService service = Executors.newSingleThreadExecutor();final boolean[] insertFlag = {false};service.execute(new Runnable() {public void run() {try {insertFlag[0] = getFutureResult(futureList, excelRow);} catch (Exception e) {logger.error("新线程+Future获取Future返回值异常: ", e);insertFlag[0] = false;}}});service.shutdown();return new AsyncResult<>(insertFlag[0]);
}

获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。

但Future会造成主线程的阻塞,这个就很不友好了,有没有更优解呢?


在BUG中磨砺,在优化中成长

使用双异步后,从 191s 优化到 2s

增加索引 + 异步 + 不落地后,从 12h 优化到 15 min

使用懒加载 + 零拷贝后,程序的秒开率提升至99.99%

性能优化2.0,新增缓存后,程序的秒开率不升反降


🏆文章收录于:100天精通Java从入门到就业

全网最细Java零基础手把手入门教程,系列课程包括:Java基础、Java8新特性、Java集合、高并发、性能优化等,适合零基础和进阶提升的同学。

🏆哪吒多年工作总结:Java学习路线总结,搬砖工逆袭Java架构师

华为OD机试 2023B卷题库疯狂收录中,刷题点这里

刷的越多,抽中的概率越大,每一题都有详细的答题思路、详细的代码注释、样例测试,发现新题目,随时更新,全天CSDN在线答疑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/243622.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Emotion】 自动驾驶最近面试总结与反思

outline 写在前面面试问题回顾和答案展望 写在前面 最近由于公司部门即将撤销&#xff0c;开始了新一轮准备。 发现现在整体行情不太乐观&#xff0c;很看过去的尤其是量产的经验 同时本次面试我coding环节答得不好&#xff0c;&#xff08;其实也是半年前大家问的比较简单…

新版AndroidStudio dependencyResolutionManagement出错

在新版AndroidStudio中想像使用4.2版本或者4.3版本的AndroidStudio来构造项目&#xff1f;那下面这些坑我们就需要来避免了&#xff0c;否则会出各种各样的问题。 一.我们先来看看新旧两个版本的不同。 1.jdk版本的不同 新版默认是jdk17 旧版默认是jdk8 所以在新版AndroidSt…

javaSSMmysql书籍借阅管理系统04770-计算机毕业设计项目选题推荐(附源码)

摘 要 随着科学技术的告诉发展&#xff0c;我们已经步入数字化、网络化的时代。图书馆是学校的文献信息中心&#xff0c;是为全校教学和科学研究服务的学术性机构&#xff0c;是学校信息化的重要基地。图书馆的工作是学校和科学研究工作的重要组成部分&#xff0c;是全校师生学…

《WebKit 技术内幕》学习之十(1): 插件与JavaScript扩展

虽然目前的浏览器的功能很强 &#xff0c;但仍然有其局限性。早期的浏览器能力十分有限&#xff0c;Web前端开发者希望能够通过一定的机制来扩展浏览器的能力。早期的方法就是插件机制&#xff0c;现在流行次啊用混合编程&#xff08;Hybird Programming&#xff09;模式。插件…

决策树的基本构建流程

决策树的基本构建流程 决策树的本质是挖掘有效的分类规则&#xff0c;然后以树的形式呈现。 这里有两个重点&#xff1a; 有效的分类规则&#xff1b;树的形式。 有效的分类规则&#xff1a;叶子节点纯度越高越好&#xff0c;就像我们分红豆和黄豆一样&#xff0c;我们当然…

表单的总数据为什么可以写成一个空对象,不用具体的写表单中绑定的值,vue3

<el-form :model"form" label-width"120px"><el-form-item label"Activity name"><el-input v-model"form.name" /></el-form-item> </el-form> const form ref({})from为空对象 在v-model里写form…

Python 猎户星空Orion-14B,截止到目前为止,各评测指标均名列前茅,综合指标最强;Orion-14B表现强大,LLMs大模型

1.简介 Orion-14B-Base是一个具有140亿参数的多语种大模型&#xff0c;该模型在一个包含2.5万亿token的多样化数据集上进行了训练&#xff0c;涵盖了中文、英语、日语、韩语等多种语言。在多语言环境下的一系列任务中展现出卓越的性能。在主流的公开基准评测中&#xff0c;Orio…

Tensorflow2.0笔记 - tensor的合并和分割

主要记录concat,stack,unstack和split相关操作的作用 import tensorflow as tf import numpy as nptf.__version__#concat对某个维度进行连接 #假设下面的tensor0和tensor1分别表示4个班级35名同学的8门成绩和两个班级35个同学8门成绩 tensor0 tf.ones([4,35,8]) tensor1 tf…

centos安装:node.js、npm及pm2

前言 Node.js发布于2009年5月&#xff0c;由Ryan Dahl开发&#xff0c;是一个基于Chrome V8引擎的JavaScript运行环境&#xff0c;使用了一个事件驱动、非阻塞式I/O模型&#xff0c;让JavaScript 运行在服务端的开发平台&#xff0c;它让JavaScript成为与PHP、Python、Perl、Ru…

20.云原生之GitLab CICD实战

云原生专栏大纲 文章目录 GitLab RunnerGitLab Runner 介绍Gitlab Runner工作流程 Gitlab集成Gitlab RunnerGitLab Runner 版本选择Gitlab Runner部署docker-compose方式安装kubesphere中可视化方式安装helm方式安装 配置gitlab-runner配置gitlab-ci.ymlgitlab-ci.yml 介绍编写…

SpringCloud Alibaba 深入源码 - Nacos 分级存储模型、支撑百万服务注册压力、解决并发读写问题(CopyOnWrite)

目录 一、SpringCloudAlibaba 源码分析 1.1、SpringCloud & SpringCloudAlibaba 常用组件 1.2、Nacos的服务注册表结构是怎样的&#xff1f; 1.2.1、Nacos的分级存储模型&#xff08;理论层&#xff09; 1.2.2、Nacos 源码启动&#xff08;准备工作&#xff09; 1.2.…

Linux编辑器---vim

目录 1、vim的基本概念 2正常/普通/命令模式(Normal mode) 2、1命令模式下一些命令&#xff08;不用进入插入模式&#xff09; 3插入模式(Insert mode) 4末行/底行模式(last line mode) 4、1底行模式下的一些命令 5、普通用户无法进行sudo提权的解决方案 6、vim配置问题 6、1配…

超优秀的三维模型轻量化、格式转换、可视化部署平台!

1、基于 HTML5 和 WebGL 技术&#xff0c;可在主流浏览器上进行快速浏览和调试&#xff0c;支持PC端和移动端 2、自主研发 AMRT 展示框架和9大核心技术&#xff0c;支持3D模型全网多端流畅展示与交互 3、提供格式转换、减面展UV、烘焙等多项单模型和倾斜摄影模型轻量化服务 4、…

uniapp 链接跳转(内部跳转和外部跳转)

使用uniapp的超链接跳转在微信小程序中会出现复制链接在外面在跳转如图 这样的客户体验感不好 我们需要可以直接跳转查看 思路&#xff1a;webview 1.先在自己uniapp项目pages.json建一个内部页面webview.vue 在page.json里面指向我们跳转的这个内部路径(这个创建页面会自动…

Unity中URP下的SimpleLit的 BlinnPhong高光反射计算

文章目录 前言一、回顾Blinn-Phong光照模型1、Blinn-Phong模型&#xff1a; 二、URP下的SimpleLit的 BlinnPhong1、输入参数2、程序体计算 前言 在上篇文章中&#xff0c;我们分析了 URP下的SimpleLit的 Lambert漫反射计算。 Unity中URP下的SimpleLit的 Lambert漫反射计算 我…

别再因为React、Vue吵了,真的毫无新意!

最近尤大的一个推文引起了不小热议&#xff0c;大概经过是&#xff1a; 有人在推上夸React文档写的好&#xff0c;把可能的坑点都列出来尤看到后批评道&#xff1a;框架应该自己处理这些坑点&#xff0c;而不是把他们暴露给用户 尤大在推上的发言一直比较耿直&#xff0c;这次…

2024-01-22(MongoDB)

1.Mongodb使用的业务场景&#xff1a; 传统的关系型数据库/mysql在“三高”需求以及应对web2.0的网站需求面前&#xff0c;有点力不从心&#xff0c;什么是“三高”需求&#xff1a; a. 对数据库高并发的读写需求 b. 对海量数据的高效率存储和访问需求 c. 对数据库的高可扩…

Redis应用(1)缓存(1.2)------Redis三种缓存问题

一、 缓存穿透&#xff1a; 1、定义&#xff1a; 缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在&#xff0c;这样缓存永远不会生效&#xff0c;这些请求都会打到数据库。所谓穿透&#xff0c;就是直接透过了redis&#xff0c;直接透到数据库 2、原因&#xff1a;…

Spring Boot3整合Druid(监控功能)

目录 1.前置条件 2.导依赖 错误依赖&#xff1a; 正确依赖&#xff1a; 3.配置 1.前置条件 已经初始化好一个spring boot项目且版本为3X&#xff0c;项目可正常启动。 作者版本为3.2.2最新版 2.导依赖 错误依赖&#xff1a; 这个依赖对于spring boot 3的支持不够&#…

【学习】FPN特征金字塔

论文&#xff1a;Feature Pyramid Networks for Object Detection &#xff08;CVPR 2016) 参考blog&#xff1a;https://blog.csdn.net/weixin_55073640/article/details/122627966 参考视频讲解&#xff1a;添加链接描述 卷积网络中&#xff0c;深层网络容易响应语义特征&am…