前言
在没有使用ETL工具且不考虑多数据源的情况下,我们需要从别的系统获取数据时,一般会选择分页接口查询并存储。本文算是我对类似场景代码的提炼,旨在总结相关套路,提升自我对数据库和模块的设计能力。
ETL(英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、转换(transform)、加载(load)至目的端的过程。(数据仓库结构)通俗的说法就是从数据源抽取数据出来,进行清洗加工转换,然后加载到定义好的数据仓库模型中去)
常规接口对接
接口对接推荐使用Feign,常规写法如下
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;/*** @Author WangZY* @Date 2021/8/10 10:51* @Description 获取订单数据**/
@Component
@FeignClient(name = "${spring.application.name}GateWayApi", url = "${gateway.url}")
public interface GateWayFeign {@PostMapping(value = "/ds-data-service/xxx", headers = {"API-TOKEN=xxxx"})OrderLineResDTO getOrderLineToSmbgj(@RequestBody OrderLineQueryDTO query);
}
生成对应的查询参数及结果类
@NoArgsConstructor
@Data
public class OrderLineResDTO {@JsonProperty("data")private List<DataDTO> data;@JsonProperty("success")private Boolean success;@JsonProperty("affectedRow")private String affectedRow;@JsonProperty("errorCode")private Integer errorCode;@JsonProperty("errorInfo")......
@NoArgsConstructor
@Data
public class OrderLineQueryDTO {......
对应依赖及配置文件
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><version>xxxx</version>
</dependency>
# 外部链接
gateway.url=http://service-gw.xxx.com.cn/api
# 日志打印
logging.level.com.xxx.forecastdata.api=debug
需求分类
全量翻新类–订单全流程定时项目已采用此方案并上线稳定运行
详细信息
数据定时任务通过接口接收或者翻新数据库全量数据,逻辑删除以前的历史数据,保留固定次数的历史数据
操作步骤
前提条件
- 项目使用Mybatis-Plus,如果不是的话请自行替换对应SQL操作语句。该方案默认已提供MP的service层
- 对接系统提供分页查询接口
- 分布式调度(定时任务)采用elastic-job框架,请自行替换对应组件
定时任务
import com.alibaba.fastjson.JSON;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;@Component
@Slf4j
public class OrderLineSchedule implements SimpleJob {//引入接口类以及对应实体ServiceImpl类,或者将批量插入方法放到service层private final GateWayFeign gateWayFeign;private final SellOrderListServiceImpl sellOrderListService;public OrderLineSchedule(GateWayFeign gateWayFeign, SellOrderListServiceImpl sellOrderListService) {this.gateWayFeign = gateWayFeign;this.sellOrderListService = sellOrderListService;}//创建日期格式转换器,这里按需使用即可,提供了两种,一种对应普通java.util.Date,一种对应JDK8及以上提供的java.time.LocalDateTimeprivate final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");private final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {log.info("开始定时查询数仓->销售订单表数据");executeRun();log.info("查询数仓->销售订单表数据结束");}//这里单独把操作类拿出来,是为了方便在Controller层通过接口调用,留一个手动操作的口子public void executeRun() {//updateOverDaily获取当前批次queue,解释详见后续service类int queue = sellOrderListService.updateOverDaily();try {cycleExecute(1, queue);//每次定时删除多余的历史数据log.info("数仓->销售订单表数据,删除批次号为[{}]", queue);CompletableFuture.runAsync(sellOrderListService::deletePre);} catch (Exception e) {//这里不使用事务而是采用手动回滚的原因是,由于定时任务一般时间较长,必然导致大事务问题,因此采用手动方式去规避问题log.error("查询数仓->销售订单表数据报错,回滚至上一版数据", e);sellOrderListService.updateRe(queue);}}/*** 该方法使用递归,分页获取接口参数,结束条件是接口没有数据,注意递归次数不宜过多,过多会导致栈溢出 * @param pageNum 接口分页参数--没有可删除* @param queue 当前批次*/ public void cycleExecute(int pageNum, int queue) {//调用接口,这里分页查询 OrderLineQueryDTO query = new OrderLineQueryDTO();query.setPageNo(pageNum);query.setPageSize(1000);query.setInFields(new OrderLineQueryDTO.InFieldsDTO("19900101 00:00:00", "20230101 00:00:00"));log.info("开始调用数仓->销售订单表接口,查询参数={}", JSON.toJSONString(query));OrderLineResDTO res = gateWayFeign.getOrderLineToSmbgj(query);log.info("结束调用数仓->销售订单表接口,返回结果={}", res);//判断接口是否返回值,如果没有返回值,结束递归 if (CollectionUtils.isNotEmpty(res.getData())) {List<SellOrderList> sellOrderLists = new ArrayList<>(1024);for (OrderLineResDTO.DataDTO datum : res.getData()) {//数据转换,谨慎使用BeanUtils.copy()SellOrderList orderAdd = new SellOrderList();orderAdd.setCustomerName(datum.getPartyName());orderAdd.setItemCode(datum.getItemCode());orderAdd.setItemDesc(datum.getItemDesc());orderAdd.setFirstIntegratorSystem(datum.getFirstIntegratorSystem());orderAdd.setFinalCustomerName(datum.getFinalCustomerName());orderAdd.setOrderNum(datum.getOrderNumber());orderAdd.setOrderBelong(datum.getOrderBelongTypeName());orderAdd.setOrderType(datum.getOrderTypeNew());orderAdd.setLineNumber(datum.getOrderNumber());orderAdd.setContract(datum.getCustPoNumber());Date format = null;try {format = sdf.parse(datum.getCreationDate());} catch (ParseException e) {log.error("无法解析时间" + datum.getCreationDate());}orderAdd.setOrderCreateDate(format);orderAdd.setProductDescription(datum.getProductDescription());orderAdd.setOrderQty(datum.getOrderQty());orderAdd.setSoldToCountries(datum.getCountryName());orderAdd.setDataUpdateDate(datum.getLastUpdateDate());orderAdd.setCreateTime(new Date());orderAdd.setIsDelete(0);//批次判断,如果-1说明第一次,给默认值0,如果不是,填入当前批次即可orderAdd.setQueue(queue == -1 ? 0 : queue);sellOrderLists.add(orderAdd);}//推荐自己写批量插入SQL,MP提供的速度不够快,默认批次1000条sellOrderListService.getBaseMapper().batchSchdule(sellOrderLists);//开启递归,分页+1cycleExecute(pageNum + 1, queue, startTime);}}
}
service层
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;/*** <p>* 数仓->销售订单表 服务实现类* </p>** @author WangZY* @since 2022-06-01*/
@Service
@Slf4j
public class SellOrderListServiceImpl extends ServiceImpl<SellOrderListMapper, SellOrderList> implements ISellOrderListService {/*** @author WangZY* @date 2022/6/1 16:09* @description 逻辑删除上一版数据**/@Overridepublic int updateOverDaily() {List<SellOrderList> list = lambdaQuery().eq(SellOrderList::getIsDelete, 0).select(SellOrderList::getQueue).list();if (CollectionUtils.isEmpty(list)) {//没有说明第一次全量,返回标识量-1return -1;} else {//逻辑删除上一版数据,并生成下一批次号即+1Integer queue = list.get(0).getQueue();SellOrderList delivery = new SellOrderList();delivery.setIsDelete(1);lambdaUpdate().eq(SellOrderList::getIsDelete, 0).update(delivery);return queue + 1;}}/*** @author WangZY* @date 2022/6/1 16:09* @description 删除历史数据**/@Overridepublic void deletePre() {QueryWrapper<SellOrderList> qw = new QueryWrapper<>();qw.select("distinct queue");List<SellOrderList> queueNumberList = list(qw);List<Integer> queueDistinct = queueNumberList.parallelStream().map(SellOrderList::getQueue).sorted(Comparator.comparingInt(o -> o)).collect(Collectors.toList());//删除30个批次之前的数据,这里可以调整保留多少版数据if (queueDistinct.size() > 30) {for (int i = 0; i < queueDistinct.size() - 30; i++) {remove(new QueryWrapper<SellOrderList>().eq("queue", queueDistinct.get(i)));}}}/*** @author WangZY* @date 2022/6/1 16:09* @description 恢复指定版本数据,并删除该版本的下一版数据**/@Overridepublic void updateRe(int queue) {SellOrderList deliveryPre = new SellOrderList();deliveryPre.setUpdateTime(new Date());deliveryPre.setIsDelete(1);lambdaUpdate().eq(SellOrderList::getQueue, queue + 1).update(deliveryPre);SellOrderList deliveryAfter = new SellOrderList();deliveryAfter.setUpdateTime(new Date());deliveryAfter.setIsDelete(0);lambdaUpdate().eq(SellOrderList::getQueue, queue).update(deliveryAfter);}
}
数据库需要字段
每日更新类–物料基础信息及SMB海外预测项目已上线并稳定运行
详细信息
数据每日通过接口接收增量数据,增量数据通过数据中的唯一值进行新增和更新的判断
操作步骤
前提条件
同全量翻新类
定时任务
import com.alibaba.fastjson.JSON;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;/*** @author WangZY* @date 2022/6/2 11:51* @description SMB国际获取订单行定时任务**/
@Component
@Slf4j
public class OrderLineSchedule implements SimpleJob {//引入接口类以及对应实体ServiceImpl类,或者将批量插入方法放到service层private final GateWayFeign gateWayFeign;private final SellOrderListServiceImpl sellOrderListService;public OrderLineSchedule(GateWayFeign gateWayFeign, SellOrderListServiceImpl sellOrderListService) {this.gateWayFeign = gateWayFeign;this.sellOrderListService = sellOrderListService;}private final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");private final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");@Overridepublic void execute(ShardingContext shardingContext) {log.info("开始定时查询数仓->SMB国际获取订单行数据");executeRun();log.info("查询数仓->SMB国际获取订单行数据结束");}public void executeRun() {//查询出当前数据库里所有数据的ID,以及唯一条件,切勿全字段容易触发OOMList<SellOrderList> list = sellOrderListService.lambdaQuery().eq(SellOrderList::getIsDelete, 0).select(SellOrderList::getId, SellOrderList::getLineId).list();if (CollectionUtils.isEmpty(list)) {//空的就全量新增即可cycleExecute(1, new HashMap<>(8), "20000101 00:00:00", "20500101 00:00:00");} else {//不为空,说明是增量更新,这里创建MAP,key为唯一条件,value为实体类,其实这个实体最主要的就是要个ID,方便批量更新Map<Integer, SellOrderList> judgeMap = list.stream().filter(e -> e.getLineId() != null).collect(Collectors.toMap(SellOrderList::getLineId, Function.identity()));//每日更新,这里取昨天时间,顺便冗余一小时LocalDateTime time = LocalDateTime.now().minusDays(1).minusHours(1);cycleExecute(1, judgeMap, time.format(dtf), LocalDateTime.now().format(dtf));}}public void cycleExecute(int pageNum, Map<Integer, SellOrderList> judgeMap, String startTime, String endTime) {//正常分页调接口即可OrderLineQueryDTO query = new OrderLineQueryDTO();query.setPageNo(pageNum);query.setPageSize(1000);query.setInFields(new OrderLineQueryDTO.InFieldsDTO(startTime, endTime));log.info("开始调用数仓->SMB国际获取订单行接口,查询参数={}", JSON.toJSONString(query));OrderLineResDTO res = gateWayFeign.getOrderLineToSmbgj(query);log.info("结束调用数仓->SMB国际获取订单行接口,返回结果={}", JSON.toJSONString(res));if (CollectionUtils.isNotEmpty(res.getData())) {if (judgeMap.isEmpty()) {//map为空,说明全量新增,直接批量新增即可List<SellOrderList> sellOrderLists = new ArrayList<>(1024);for (OrderLineResDTO.DataDTO datum : res.getData()) {SellOrderList orderAdd = new SellOrderList();orderAdd.setLineId(datum.getLineId());paddingParam(datum, orderAdd);orderAdd.setCreateTime(new Date());orderAdd.setIsDelete(0);sellOrderLists.add(orderAdd);}sellOrderListService.getBaseMapper().batchSchdule(sellOrderLists);cycleExecute(pageNum + 1, judgeMap, startTime, endTime);} else { List<SellOrderList> sellOrderListAdd = new ArrayList<>(1024);List<SellOrderList> sellOrderListUpdate = new ArrayList<>(1024);for (OrderLineResDTO.DataDTO datum : res.getData()) {SellOrderList existOrder = judgeMap.get(datum.getLineId());if (existOrder == null) {//map中没有数据,直接新增SellOrderList orderAdd = new SellOrderList();orderAdd.setLineId(datum.getLineId());paddingParam(datum, orderAdd);orderAdd.setCreateTime(new Date());orderAdd.setIsDelete(0);sellOrderListAdd.add(orderAdd);} else {//map中有数据,说明该实体类以及初始化了,需要更新,这里把主键放入该修改的实体类中SellOrderList orderUpdate = new SellOrderList();orderUpdate.setId(existOrder.getId());paddingParam(datum, orderUpdate);orderUpdate.setUpdateTime(new Date());sellOrderListUpdate.add(orderUpdate);}}if (CollectionUtils.isNotEmpty(sellOrderListAdd)) {sellOrderListService.getBaseMapper().batchSchdule(sellOrderListAdd);}if (CollectionUtils.isNotEmpty(sellOrderListUpdate)) {sellOrderListService.updateBatchById(sellOrderListUpdate);}cycleExecute(pageNum + 1, judgeMap, startTime, endTime);}}}private void paddingParam(OrderLineResDTO.DataDTO datum, SellOrderList orderAdd) {orderAdd.setCustomerName(datum.getPartyName());orderAdd.setItemCode(datum.getItemCode());orderAdd.setItemDesc(datum.getItemDesc());orderAdd.setFirstIntegratorSystem(datum.getFirstIntegratorSystem());orderAdd.setFinalCustomerName(datum.getFinalCustomerName());orderAdd.setOrderNum(datum.getOrderNumber());orderAdd.setOrderBelong(datum.getOrderBelongTypeName());orderAdd.setOrderType(datum.getOrderTypeNew());orderAdd.setContract(datum.getCustPoNumber());Date format = null;try {format = sdf.parse(datum.getCreationDate());} catch (ParseException e) {log.error("无法解析时间" + datum.getCreationDate());}orderAdd.setOrderCreateDate(format);orderAdd.setProductDescription(datum.getProductDescription());orderAdd.setOrderQty(datum.getOrderQty());orderAdd.setSoldToCountries(datum.getCountryName());orderAdd.setDataUpdateDate(datum.getLastUpdateDate());}
}
优化要点
代码部分其实已经相当完善了,来说点代码之外的东西,例如大家喜闻乐见的优化。
减少大对象
大批量的数据必然会带来大对象,大对象的堆积则必然会导致OOM–java heap space即堆内存溢出。减少数据的传输必然是重中之重,可通过如下方向优化
- 接口的数据能精简的精简
- SQL的查询参数尽量减少并且使用尽量少字段的结果类去接收(即使字段没有值,但是序列化的时候,依然会有key)
减少循环
- 大多数同事可能用惯了stream流,所以会嫌弃使用for。但其实得根据情况,因为stream流本身就是循环的语法糖,多个stream流不方便合并的时候,用一个for循环就好了。
- 增量的时候我们需要对比从数据库里拿到的原数据,这个时候就不要循环里面套循环去contains。建议提前一次循环做个map,以唯一值为key,对象为value,会快很多,唯一不足的是用空间换时间,要注意OOM问题。
多线程优化
熟练使用CompletableFuture.allOf以及parallelStream流会大大提升效率。该部分留待后续文章性能优化-如何爽玩多线程来开发分析,坑很多,知识点也很多。
批量插入
MP为了通用性终究是相对保守了点,可尝试修改批量插入,甚至是加上多线程事务。多线程事务实际上通常采用2PC的思想实现,这部分也留待后续文章分析。
写在最后
设计方案系列来自我对场景代码的总结,日常工作中会有很多这样的场景,我就想着要把这些套路代码留存下来,方便下次使用。本次套路包含了数据库设计、代码案例、优化思想三块,基本上都点到了,希望对读者的工作有所帮助。优化这块算是通用的思考吧,说白了性能优化就是围绕硬件和软件进行开源节流,堆硬件和提升软件效率。
最近一直在写一些多线程代码,尝试去总结套路,下一篇会是ComplatableFutrue的实战,多线程操作以及简易的2PC实现,干货很多,敬请期待。