[自研开源] 数据集成之分批传输 v0.7

开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673

介绍

本篇基于 数据集成之任务流程 介绍任务分批传输的使用场景和配置操作。

使用场景

mydata使用API方式集成数据,当一次请求或响应 传输数据量较多时 可能无法完成、或容易对服务端造成影响,因此需要分为多次处理;

例如 常见的分页查询、导入大量数据时分批处理、集成对接时的全量同步等;

分批传输数据

业务系统与mydata集成时,在提供数据消费数据这两个方向上分别实现分批传输;

提供数据

由mydata调用应用的API获取数据,通过配置分批参数 实现一次任务内多次调用API获取完整数据,有以下两种基本的配置模式:

  • 配置了 固定参数size=10、递增参数current从1开始每次递增1、每次间隔1秒的任务;

在这里插入图片描述

  • 配置了 递增参数start从1开始每次递增100、递增参数end从100开始每次递增100、每次间隔1秒的任务;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;

  • lastProduceData记录上一次数据,用于和本次对比数据,若重复 则结束,避免死循环(理论上很少有2次完全一样的数据);

  • 若分批有异常,则复用任务3次出错 自动结束并发送邮件通知的功能;

  • 执行完一次后,自动计算递增参数值;

// 提供数据
case MdConstant.DATA_PRODUCER:// 分批模式 记录上一次数据,用于对比两次数据,若重复 则结束,避免死循环List<Map> lastProduceData = null;do {// 若启用分批,则将分批参数加入请求参数中if (taskInfo.isBatch()) {Map<String, Object> batchParam = jobBatchService.parseToMap(taskInfo);Map<String, Object> reqParams = MapUtil.union(taskInfo.getReqParams(), batchParam);taskInfo.setReqParams(reqParams);}// 调用api 获取jsonString json = ApiUtil.read(taskInfo);// 将json按字段映射 解析为业务数据jobDataService.parseData(taskInfo, json);// 若没有返回数据,则结束处理if (CollUtil.isEmpty(taskInfo.getProduceDataList())) {break;}// 对比上一次数据if (lastProduceData != null) {if (CollUtil.isEqualList(lastProduceData, taskInfo.getProduceDataList())) {// 异常任务失败,邮件通知用户检查任务throw new RuntimeException("分批获取数据异常,最后两次获取的数据相同!");}}lastProduceData = taskInfo.getProduceDataList();// 根据条件过滤数据jobDataFilterService.doFilter(taskInfo);// 保存业务数据jobDataService.saveTaskData(taskInfo);// 更新环境变量jobVarService.saveVarValue(taskInfo, json);// 递增分批参数jobBatchService.incBatchParam(taskInfo);// 若启用分批,则等待间隔if (taskInfo.isBatch()) {ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);}} while (taskInfo.isBatch());break;

消费数据

由mydata通过API向应用发送数据,通过配置分批参数 限制每次向API发送的数据量,从而减少数据查询量和请求处理时间;

如下图,配置了分批数量为1000的任务,分批参数为选填,mydata将按1000为限制查询符合条件的数据,通过API请求发送给应用;

在这里插入图片描述

执行过程如下代码,要点有:

  • 通过do-while结构 兼容单次和分批;
  • 自动管理分页参数,执行分页查询数据,发送给API;
  • 直到分页查询没有数据 自动结束;
// 消费数据
case MdConstant.DATA_CONSUMER:String dataCode = taskInfo.getDataCode();if (StrUtil.isEmpty(dataCode)) {break;}List<BizDataFilter> filters = taskInfo.getDataFilters();if (CollUtil.isNotEmpty(filters)) {// 解析过滤条件值中的 自定义字符串parseFilterValue(filters);// 排除值为null的条件filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());}int round = 0;Long skip = null;Integer limit = taskInfo.isBatch() ? taskInfo.getBatchSize() : null;do {if (taskInfo.isBatch()) {skip = (long) round * taskInfo.getBatchSize();}// 根据过滤条件 查询数据List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters, skip, limit);if (CollUtil.isEmpty(dataList)) {break;}taskInfo.setConsumeDataList(dataList);// 根据字段映射转换为api参数jobDataService.convertData(taskInfo);// 调用api传输数据ApiUtil.write(taskInfo);round++;// 若启用分批,则等待间隔if (taskInfo.isBatch()) {ThreadUtil.sleep(taskInfo.getBatchInterval(), TimeUnit.SECONDS);}}while (taskInfo.isBatch());break;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/287480.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hana数据库 No columns were bound prior to calling SQLFetch or SQLFetchScroll

在php调用hana数据库的一个sql时报错了&#xff0c;查表结构的sql&#xff1a; select * from sys.table_columns where table_name VBAP SQLSTATE[SL009]: <<Unknown error>>: 0 [unixODBC][Driver Manager]No columns were bound prior to calling SQLFetch …

55、Qt/事件机制相关学习20240326

一、代码实现设置闹钟&#xff0c;到时间后语音提醒用户。示意图如下&#xff1a; 代码&#xff1a; #include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget), speecher(new QTextToSpeech(t…

canvas跟随鼠标画有透明度的矩形边框

提示&#xff1a;canvas跟随鼠标画有透明度的矩形边框 文章目录 前言一、跟随鼠标画有透明度的矩形边框总结 前言 一、跟随鼠标画有透明度的矩形边框 test.html <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8">&…

【笔记】OpenHarmony设备开发:搭建开发环境(Ubuntu 20.04,VirtualBox 7.0.14)

参考&#xff1a;搭建开发环境&#xff08;HarmonyOS Device&#xff09; Note&#xff1a;Windows系统虚拟机中Ubuntu系统安装完成后&#xff0c;根据指导完成Ubuntu20.04基础环境配置&#xff08;HarmonyOS Connect 开发工具系列课&#xff09; 系统要求 Windows系统要求&…

Python包管理工具 pip 及其常用命令和参数用法

目录 PIP 主要功能 安装包 升级包 卸载包 列出包 检查依赖 pip的配置和环境 主要用法 1&#xff1a;版本 2&#xff1a;安装 Python 库 3&#xff1a;升级库 4&#xff1a;卸载库 5&#xff1a;搜索库 6&#xff1a;查看已安装库详细信息 7&#xff1a;只下载库…

MT6762_联发科MTK6762安卓核心板规格参数

MTK6762核心板是一款集成了蓝牙、fm、wlan和gps模块的高度集成基带平台&#xff0c;为LTE/LTE-A和C2K智能手机应用程序提供支持。该安卓核心板集成了ARM Cortex-A53处理器&#xff0c;工作频率可达2.0GHz&#xff0c;并且还集成了功能强大的多标准视频编解码器。除此之外&#…

OriginBot智能机器人开源套件

详情可参见&#xff1a;OriginBot智能机器人开源套件——支持ROS2/TogetherROS&#xff0c;算力强劲&#xff0c;配套古月居定制课程 (guyuehome.com) OriginBot智能机器人开源套件 最新消息&#xff1a;OriginBot V2.1.0版本正式发布&#xff0c;新增车牌识别&#xff0c;点击…

【Java程序设计】【C00376】基于(JavaWeb)Springboot的社区帮扶对象管理系统(有论文)

【C00376】基于&#xff08;JavaWeb&#xff09;Springboot的社区帮扶对象管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;已经做了六年的毕业设计程序开发&am…

jupyter notebook导出含中文的pdf(LaTex安装和Pandoc、MiKTex安装)

用jupyter notebook导出pdf时&#xff0c;因为报错信息&#xff0c;需要用到Tex nbconvert failed: xelatex not found on PATH, if you have not installed xelatex you may need to do so. Find further instructions at https://nbconvert.readthedocs.io/en/latest/install…

基于SpringBoot和Leaflet的行政区划地图掩膜效果实战

目录 前言 一、掩膜小知识 1、GIS掩膜的实现原理 2、图层掩膜流程 二、使用插件 1、leaflet-mask介绍 2、核心代码解释 三、完整实例实现 1、后台逻辑实现 2、省级行政区划查询实现 3、行政区划定位及掩膜实现 4、成果展示 总结 前言 在之前的博客提过按空间矢量…

Ansys Speos | Light Expert Group探测器组使用技巧

附件下载 联系工作人员获取附件 概述 相机挡板的设计需要在光路的不同位置同步多个照度图&#xff0c;以尽量减少杂散光。2023R2 Speos提供了一种新的探测器&#xff0c;用于高阶杂散光分析&#xff0c;可以同时对多个探测器进行光线追迹。Light Expert工具可以即时过滤3D视…

SQLite数据库成为内存中数据库(三)

返回&#xff1a;SQLite—系列文章目录 上一篇&#xff1a;SQLite使用的临时文件&#xff08;二&#xff09; 下一篇&#xff1a;SQLite中的原子提交&#xff08;四) ​​ SQLite数据库通常存储在单个普通磁盘中文件。但是&#xff0c;在某些情况下&#xff0c;数据库可能…

业务服务:xss攻击

文章目录 前言一、使用注解预防1. 添加依赖2. 自定义注解3. 自定义校验逻辑4. 使用 二、使用过滤器1. 添加配置2. 创建配置类3. 创建过滤器4. 创建过滤器类5. 使用 前言 xss攻击时安全领域中非常常见的一种方法&#xff0c;保证我们的系统安全是非常重要的 xss攻击简单来说就…

代码随想录 图论-并查集

代码随想录 (programmercarl.com) 寻找图中是否存在路径这道题中的类可看做并查集的标准类 目录 1971.寻找图中是否存在路径 684.冗余连接 685.冗余连接II 1971.寻找图中是否存在路径 1971. 寻找图中是否存在路径 已解答 简单 相关标签 相关企业 有一个具有 n 个顶…

使用ai智能写作场景之gpt整理资料,如何ai智能写作整理资料

Ai智能写作助手&#xff1a;Ai智能整理资料小助手 Ai智能整理资料小助手可试用3天&#xff01; 通俗的解释一下怎么用ChatGPT来进行资料整理&#xff1a; 搜寻并获取指定数量的特定领域文章&#xff1a; 想像你在和我说话一样&#xff0c;告诉我你想要多少篇关于某个话题的文…

Flask python :logging日志功能使用

logging日志的使用 一、了解flask日志1.1、Loggers记录器1.2、Handlers 处理器1.3、Formatters 格式化器 二、使用日志2.1、官网上的一个简单的示例2.2、基本配置2.3、具体使用示例2.4、运行 三、写在最后 一、了解flask日志 日志是一种非常重要的工具&#xff0c;可以帮助开发…

【双指针】Leetcode 查找总价格为目标值的两个商品

题目解析 LCR 179. 查找总价格为目标值的两个商品 本题很友好&#xff0c;只需要返回任意一个 算法讲解 这道题很显然就是使用对撞双指针&#xff0c;一个从左边&#xff0c;一个从右边&#xff0c;两边进行和target比较来移动 代码编写 class Solution { public:vector<…

谷粒商城——缓存——SpringCache

1. 配置使用 首先需要导入相关的依赖&#xff1a; <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId></dependency> 随后在配置文件中进行配置&#xff1a; spring:cache:t…

C语言文件操作(详细)

⽬录 一. 为什么使⽤⽂件&#xff1f; 二. 什么是⽂件&#xff1f; 三. ⼆进制⽂件和⽂本⽂件&#xff1f; 四. ⽂件的打开和关闭 五. ⽂件的顺序读写 六. ⽂件的随机读写 七. ⽂件读取结束的判定 八. ⽂件缓冲区 一. 为什么使⽤⽂件&#xff1f; 如果没有⽂件&#…

STM32技术打造:智能考勤打卡系统 | 刷卡式上下班签到自动化解决方案

文章目录 一、简易刷卡式打卡考勤系统&#xff08;一&#xff09;功能简介原理图设计程序设计 哔哩哔哩&#xff1a; https://www.bilibili.com/video/BV1NZ421Y79W/?spm_id_from333.999.0.0&vd_sourcee5082ef80535e952b2a4301746491be0 一、简易刷卡式打卡考勤系统 &…