1:需求描述
支持NVR升级后通道数变更,完成升级后,设备SDK上报通道数量给A平台,A平台将NVR通道数量同步给B平台,B平台自动调用C平台接口,同步通道数量给C平台,C平台重新生成通道序列号,并完成序列号入库,再由C平台将通道序列号同步给A平台,A平台完成序列号入库。
2:需求分析
我所在的小组维护的是B平台;拿到需求后,开始分析:
逻辑很简单:设备SDK在启动后,会将信息上报到A平台,A会同步到B平台,B再上报给C;所以只需要在kafka里加入更新后的通道数就可以
现有接口:A平台在设备重启后会通过Kafka上报设备信息给C、获取NVR的通道信息接口
存在问题:获取NVR通道信息的接口是http请求,可能出现网络异常掉用失败的情况,所以需要设计一个补偿机制
3:补偿逻辑
当走网络掉接口失败后,将失败后的kafka信息存入数据库,用定时任务去扫描失败信息进行kafka再次发送;考虑到以后还会有同类型的升级,补偿机制可以设计成通用的。
4:表设计
固件版本同步失败记录表:
重要字段:status:判断是否同步成功;biz_type: 业务类型,本次升级可以设为1(需要补偿的业务类型);biz_msg:业务信息(需要补偿的信息)
5:代码逻辑
更新版本信息接口:
@Override
public String updateVersionInfo(String macId, String currentVersion, String modelId) {/**更新前置代码**/try {//将更新后的设备信息用kafka同步给C,加入升级后的通道数JSONObject kafkaData = new JSONObject();kafkaData.put("macId", macId);kafkaData.put("version", currentVersion);kafkaData.put("modelId", modelId);kafkaData.put("videoCapability", 2);//升级通道数的是nvr总线设备(1nvr总线设备(总设备)可以理解为总设备是一个大的机盒,下面挂了许多子设备,这次升级的就是设备SDK可以上报通道数量(可以挂子设备的数量)if (cameraDaoMybatis.getDbCameraInfo(macId).getNvrType() == 1) {//通过macID获取nvr的信息(设备id,(state 1:在线),nvrList(这个的size就是通道数));因为获取nvr信息的方法是走网络掉接口,所有可能出现网络异常掉用失败的情况NvrDetailInfo nvrChannels = dcsManager.getNvrChannels(macId);//当调用成功,并且nvr状态为在线if (nvrChannels != null && nvrChannels.getState() == 1) {//在kafka中加入升级后获取到的通道数kafkaData.put("channelNumber", nvrChannels.getNvrChannelList().size());}//当调用失败,将设备id,版本,已经kafka的信息全部存入固件版本同步失败记录表Integer row = syncFailureRecordMybatis.addSyncFailureMsg(macId, currentVersion, kafkaData.toString());LOG.info("addSyncFailureMsg success, rows={} affected, macId={}", row, macId);}/**更新后置代码**/} catch (Exception e) {LOG.error("updateVersionInfo error: ", e);result = JsonUtil.addErrorCode(EErrorCode.ERROR_SYSTEM, result);}return result.toString();
}
6:定时任务补偿
@Component
@Slf4j
public class SyncFailureRecordJob {@Autowiredprivate LittlecConfig littlecConfig;@Autowiredprivate SyncFailureRecordMybatis syncFailureRecordMybatis;@Autowiredprivate DcsManager dcsManager;@Autowiredprivate DeviceVersionProducer deviceVersionProducer;@XxlJob("SyncFailureRecord")public ReturnT<String> syncFailureRecord(String param) {//配置一次升级的条数long limitSize = littlecConfig.getConfigAsLong("camera.SyncFailureRecordMybatis.querySyncFailureRecord.limitSize", 5000L);//记录每次启动定时器,开始扫描的起始idInteger startId = 0;log.info("syncFailureRecord param:limitSize:{} startId:{}", limitSize, startId);boolean isCompleted = false;//记录循环次数int reconTimes = 0;//查t_firmware_version_sync_failure_record表,一天内的数据while (!isCompleted) {reconTimes++;//通过limitSize和startId查出的list集合List<SyncFailureRecordDO> syncFailureRecordDOList = syncFailureRecordMybatis.querySyncFailureRecordList(limitSize, startId);//取出list里的所有mcaidString macIds = syncFailureRecordDOList.stream().map(n -> n.getMacId()).collect(Collectors.joining(","));//日志记录循环次数,与这一次扫描的所有macidlog.info("In querySyncFailureRecordList: recon {} time with syncFailureRecordDOList size {} , collect macIds={}", reconTimes, syncFailureRecordDOList.size(),macIds);//如果扫描出的集合小于定下的升级条数,结束循环(代表已经扫描到了最后的一组数据)if (syncFailureRecordDOList.size() < limitSize) {isCompleted = true;}//如果扫描出的集合大小大于0(代表查出数据)if (syncFailureRecordDOList.size() > 0) {//修改下次扫描的起始id为集合的最后一个idstartId =syncFailureRecordDOList.get(syncFailureRecordDOList.size() - 1).getId();//循环取出list里的对象for (SyncFailureRecordDO dto : syncFailureRecordDOList) {if (dto != null) {//同步需要放入kafuka的所有数据给安防管理平台NvrDetailInfo nvrChannels = dcsManager.getNvrChannels(dto.getMacId());JSONObject bizMsgObject = JSONObject.parseObject(dto.getBizMsg());org.json.JSONObject kafkaData = new org.json.JSONObject();kafkaData.put("macId", bizMsgObject.getString("macId"));kafkaData.put("version", bizMsgObject.getString("version"));kafkaData.put("modelId", bizMsgObject.getString("modelId"));kafkaData.put("videoCapability", 2);//加入升级后的通道数kafkaData.put("channelNumber", nvrChannels.getNvrChannelList().size());//同步方法deviceVersionProducer.sendDeviceVersionMessage(dto.getMacId(), kafkaData.toString());log.info("sendDeviceVersionMessage success, macId={}, channelNumber={}", dto.getMacId(), nvrChannels.getNvrChannelList().size());//修改表中的status,为已经升级Integer row = syncFailureRecordMybatis.updateRecordStatus(dto.getMacId());log.info("updateRecordStatus success,rows={} affected,macId={}",row,dto.getMacId());}}}}log.info("syncFailureRecord success");return ReturnT.SUCCESS;}
}
7:mybatis
List<SyncFailureRecordDO> querySyncFailureRecordList(@Param("limitSize") long limitSize, @Param("startId") Integer startId);Integer updateRecordStatus(@Param("macId") String macId);
8:xml
<select id="querySyncFailureRecordList"resultType="com.cmcc.littlec.camera.dao.entity.SyncFailureRecordDO">SELECT uc.mac_id AS macId, uc.biz_msg AS bizMsgFROM t_firmware_version_sync_failure_record ucWHERE uc.id<![CDATA[ > ]]>#{startId}AND uc.biz_type = 1AND uc.status = 0AND created >= DATE_SUB(NOW(), INTERVAL 1 DAY)
</select><update id="updateRecordStatus">UPDATE t_firmware_version_sync_failure_recordSET modified = NOW(),status = 1WHERE mac_id = #{macId}</update>
9:提交任务
开开心心提交代码给老大code review,老大说:这补偿为什么要我们来做,不合理;几个电话加个会议后决定:
让A平台上报SDK信息的时候带上升级后的通道数,我们只需要加个字段就可以了。。。。
好嘛!两天白干
10:总结
遇到问题能抛出去就抛出去!能麻烦别人就别麻烦自己