设备多久(60/50/40min)未上报,类似场景发送通知实现方案

场景描述

设备比较多,几十万甚至上百万,设备在时不时会上报消息。
用户可以设置设备60分钟、50分钟、40分钟、30分钟未上报数据,发送通知给用户,消息要及时可靠。

基本思路

思路:

由于设备在一直上报,如果直接存储到数据库对数据库的压力比较大,考虑使用缓存,每次上报都更新到缓存中; 由于是多久未上报发通知,考虑使用定时任务查找超过60/50/40/30min的设备;定时任务遍历时要尽可能少的查询设备缓存,因为绝大多数设备是不需要进行通知的,最好是只遍历需要发送通知的设备缓存,可以考虑使用类似于时间窗口机制,将设备缓存按时间进行分割,建立两个缓存,缓存1设备数据指向缓存2(主要用于实现设备数据在缓存2不用时间窗口转换),缓存2数据,用于定时任务数据扫描;考虑到消息通知的及时性,考虑使用延迟定时任务,来及时发送消息通知。由于设备比较大,考虑对缓存1按hash算法分割开来,来提升性能。

思路转化方案:

  1. 涉及的Redis缓存
  • 缓存1(hash),用于找到缓存2

大Key:device:one:0,小Key:pk:8620241008283980,Value:device:two:202410091900, 即 {"pk:8620241008283980":"device:two:202410091900"}

  • 缓存2(hash), 通过缓存2达到过滤数据的目的

大Key:device:two:202410091900,小Key:pk:8620241008283980,Value:1728473450149, 即 {"pk:8620241008283980":"1728473450149"}

  1. PK:DK按照hash算法,分成100份,设备上报时,存储到缓存1中
  2. 按照1分钟为跨度,设备上报时,将当前设备数据存储到缓存2中
  3. 设备上报时,判断该设备是否有延迟定时任务,如果存在删除该延迟定时任务,判断该设备是否存在缓存1与缓存2,如果存在先删除,再添加。(其过程实现了数据在缓存2不同集合的转化)
  4. 定时任务:根据当前时间,扫描对应60/50/40min前的缓存2数据,并添加到延迟定时任务(考虑到消息要及时发送)中
  5. 延迟定时任执行:删除缓存1该设备数据,删除缓存2该设备数据,下发通知

基本流程

方案示意图:
在这里插入图片描述
设备上报处理流程:
在这里插入图片描述
定时任务处理流程:
在这里插入图片描述

业务流程实现

设备上报处理逻辑

场景1: 缓存1中存在,缓存2中也存在,延迟定时任务中也存在

删除该设备延迟定时任务数据,删除缓存2数据,删除缓存1数据,新增缓存2,新增缓存1

场景2: 缓存1中存在,缓存2中也存在,延迟定时任务中不存在

删除缓存2数据,删除缓存1数据,新增缓存2,新增缓存1

场景3: 缓存1不存在,缓存2中不存在,延迟定时任务中不存在

新增缓存2,新增缓存1

相关代码:

package com.angel.ocean.service;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.angel.ocean.redis.RedisCacheKey;
import com.angel.ocean.util.FutureTaskUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;@Slf4j
@Service
public class DataHandlerService {private static final String COMMA = ":";@Resourceprivate RedissonClient redissonClient;@Resourceprivate ScheduleTaskService scheduleTaskService;public void setCache(String productKey, String deviceKey, long ts, int expiredNoticeTime) {String childKey = productKey + COMMA + deviceKey;String oneKey = RedisCacheKey.getCacheOneHashKey(productKey, deviceKey);RMap<String, String> oneHash = redissonClient.getMap(oneKey);String oldTwoKey = oneHash.get(childKey);if(StrUtil.isNotEmpty(oldTwoKey)) {if(FutureTaskUtil.futureTasks.containsKey(childKey)) {log.info("移除通知延迟任务,{}", childKey);scheduleTaskService.stopTask(childKey);}RMap<String, String> oldTwoHash = redissonClient.getMap(oldTwoKey);log.info("该设备缓存已存在,先删除历史缓存,再更新,{}", childKey);// 删除缓存2oldTwoHash.remove(childKey);// 删除缓存1oneHash.remove(childKey);}String twoKey = RedisCacheKey.getCacheTwoHashKey(ts);RMap<String, String> twoHash = redissonClient.getMap(twoKey);long expiredTime = ts + expiredNoticeTime * 60 * 1000L;twoHash.put(childKey, Long.toString(expiredTime));oneHash.put(childKey, twoKey);}
}

缓存工具类:

package com.angel.ocean.redis;import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;/***  缓存键*/
public class RedisCacheKey {public static final String COMMA = ":";private static final int n = 100;private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");/*** 获取缓存1 Key,依据pk和dk* @param productKey* @param deviceKey* @return*/public static String getCacheOneHashKey(String productKey, String deviceKey) {String data = productKey + COMMA + deviceKey;return "device:one:" + Math.abs(data.hashCode()) % n;}/*** 获取缓存2 Key,依据时间戳* @param ts* @return*/public static String getCacheTwoHashKey(long ts) {// 将时间戳转换为 InstantInstant instant = Instant.ofEpochMilli(ts);ZoneId zoneId = ZoneId.systemDefault();// 转换为 ZonedDateTimeZonedDateTime zdt = instant.atZone(zoneId);// 格式化 ZonedDateTimeString formattedDateTime = zdt.format(formatter);// 构建并返回缓存键return "device:two:" + formattedDateTime;}public static void main(String[] args) {System.out.println(getCacheTwoHashKey(System.currentTimeMillis()));}
}

定时任务逻辑(每分钟执行一次)

  • 依据当前时间和多久未上报(60/50/40min),获取对应的缓存2数据
  • 遍历该缓存2集合
  • 判断该设备的通知时间,是否小于当前时间加上1分钟,如果小于就加入到延迟定时任务中
  • 延迟定时任务执行时,删除该设备的缓存2数据,删除该设备的缓存1数据

相关代码:

package com.angel.ocean.task;import com.angel.ocean.service.DataHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;@Slf4j
@Component
public class ScheduledTasks {@Resourceprivate DataHandlerService dataHandlerService;// 每1分钟执行一次// 遍历缓存2,放入延迟定时任务中@Scheduled(cron = "0 0/1 * * * ?")public void dataHandler() {log.info("dataHandler....");// 60分钟未上报通知dataHandlerService.delayTaskHandler(60);// 50分钟未上报通知dataHandlerService.delayTaskHandler(50);// 40分钟未上报通知dataHandlerService.delayTaskHandler(40);}
}
package com.angel.ocean.service;import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.angel.ocean.redis.RedisCacheKey;
import com.angel.ocean.util.FutureTaskUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;@Slf4j
@Service
public class DataHandlerService {private static final String COMMA = ":";@Resourceprivate RedissonClient redissonClient;@Resourceprivate ScheduleTaskService scheduleTaskService;// 将数据放入延迟定时任务public void delayTaskHandler(int delayTime) {long start = System.currentTimeMillis();log.info("delayTaskHandler() start..., time:{}", System.currentTimeMillis());long now = System.currentTimeMillis();long ts = now - delayTime * 60 * 1000L;String twoKey = RedisCacheKey.getCacheTwoHashKey(ts);RMap<String, String> hashMap = redissonClient.getMap(twoKey);if(CollUtil.isEmpty(hashMap)) {return;}Map<String, String> allEntries = hashMap.readAllMap();allEntries.forEach((key, value) -> {long tsLimit = now + 60000;log.info("tsLimit={}, ts={}", tsLimit, value);if(Long.parseLong(value) < tsLimit) {Runnable task = () -> {noticeHandler(key, twoKey);};if(Long.parseLong(value) <= System.currentTimeMillis()) {scheduleTaskService.singleTask(task);} else {scheduleTaskService.delayTask(key, task, Long.parseLong(value) - System.currentTimeMillis());}}});long end = System.currentTimeMillis();log.info("delayTaskHandler() end..., 耗时:{}毫秒", (end - start));}// 模拟通知逻辑private void noticeHandler(String childKey, String twoKey) {log.info("发送通知,设备:{}, ts={}",  childKey, System.currentTimeMillis());String[] arr = childKey.split(RedisCacheKey.COMMA);String oneKey = RedisCacheKey.getCacheOneHashKey(arr[0], arr[1]);RMap<String, String> oneHash = redissonClient.getMap(oneKey);String currentTwoKey = oneHash.get(childKey);// 由于并发问题,会存在延迟定时任务(twoKey)的与缓存1中存储的值(currentTwoKey)不一致,因此,需要校验两个值是否相同。if(StrUtil.isNotEmpty(currentTwoKey) && currentTwoKey.equals(twoKey)) {// TODO 相同的话执行通知逻辑,删除缓存1// 删除缓存1oneHash.remove(childKey);}// 删除缓存2,无论twoKey与currentTwoKey相不相同都删除RMap<String, String> twoHash = redissonClient.getMap(twoKey);twoHash.remove(childKey);}
}

延迟定时任务实现

Springboot定时任务,线程池配置

package com.angel.ocean.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;@Configuration
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(20); // 设置线程池大小scheduler.setThreadNamePrefix("Thread-task-"); // 设置线程名称前缀scheduler.setDaemon(true); // 设置为守护线程// 你可以继续设置其他属性...return scheduler;}
}

定时任务工具类

package com.angel.ocean.util;import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;@Slf4j
public class FutureTaskUtil {private FutureTaskUtil() {}// FutureTask集合public static ConcurrentMap<String, ScheduledFuture<?>> futureTasks = new ConcurrentHashMap<String, ScheduledFuture<?>>();/*** 判断是否包含 futureTask* @param taskId* @return*/public static boolean isContains(String taskId) {boolean result = false;if(futureTasks.containsKey(taskId)) {result = true;}return result;}/*** 添加 futureTask* @param taskId* @param futureTask*/public static void addFutureTask(String taskId, ScheduledFuture<?> futureTask) {if(futureTasks.containsKey(taskId)) {log.error("FutureTaskUtil.addFutureTask(), key: {}已存在", taskId);return;}futureTasks.put(taskId, futureTask);}/*** 获取 futureTask* @param taskId* @return*/public static ScheduledFuture<?> getFutureTask(String taskId) {ScheduledFuture<?> futureTask = null;if(futureTasks.containsKey(taskId)) {log.info("FutureTaskUtil.getFutureTask(), taskId: {}", taskId);futureTask = futureTasks.get(taskId);}return futureTask;}/*** 移除 futureTask* @param taskId*/public static void removeFutureTask(String taskId) {if(futureTasks.containsKey(taskId)) {log.info("FutureTaskUtil.removeFutureTask(), taskId: {}", taskId);futureTasks.remove(taskId);}}
}

需要关注的问题

  • 并发问题如何处理?

由于并发问题,会造成缓存1和缓存2的数据不一致,延迟任务执行时校验缓存1中存储的缓存2的Key于延迟定时任务的缓存Key是否一致,一致的话才下发通知。

  • 服务重启,造成延迟定时任务数据丢失,如何补发通知?

由于延迟定时任务存在于内存中,服务重新启动,会导致其数据丢失,可以考虑从缓存2再拿一次数据,做个数据补偿。

package com.angel.ocean.runner;import com.angel.ocean.service.DataHandlerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;@Slf4j
@Component
public class StartupRunner implements CommandLineRunner {@Resourceprivate DataHandlerService dataHandlerService;@Overridepublic void run(String... args) throws Exception {// 只处理近2个小时的数据int i = 120;while (i > 50) {dataHandlerService.delayTaskHandler(i);i = i - 1;}}
}

模拟验证

package com.angel.ocean.test;import cn.hutool.core.util.RandomUtil;
import com.angel.ocean.domain.DeviceCacheInfo;
import java.util.ArrayList;
import java.util.List;public class DeviceDataUtil {private static int deviceNumber = 500000;private static List<String> dks = new ArrayList<>(deviceNumber + 5);private static boolean initFlag = false;private static void init() {int number = 1;while (number <= deviceNumber) {String formattedNumber = String.format("%06d", number);String dk = "8620241008" + formattedNumber;dks.add(dk);number++;}initFlag = true;}public static void setDeviceNumber(int number) {DeviceDataUtil.deviceNumber = number;}public static DeviceCacheInfo deviceReport() {if(!initFlag) {init();}DeviceCacheInfo deviceCacheInfo = new DeviceCacheInfo();deviceCacheInfo.setProductKey("pk");deviceCacheInfo.setTs(System.currentTimeMillis());deviceCacheInfo.setExpiredNoticeTime(60);String dk = dks.get(RandomUtil.randomInt(1, deviceNumber));deviceCacheInfo.setDeviceKey(dk);return deviceCacheInfo;}
}
package com.angel.ocean;import com.angel.ocean.domain.DeviceCacheInfo;
import com.angel.ocean.service.DataHandlerService;
import com.angel.ocean.test.DeviceDataUtil;
import com.angel.ocean.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;@Slf4j
@SpringBootTest
class ApplicationTests {@Resourceprivate DataHandlerService dataHandlerService;@Testvoid contextLoads() {for(int i = 0; i < 500000; i++) {DeviceCacheInfo deviceCacheInfo = DeviceDataUtil.deviceReport();Runnable task = () -> {dataHandlerService.setCache(deviceCacheInfo.getProductKey(), deviceCacheInfo.getDeviceKey(), deviceCacheInfo.getTs(), deviceCacheInfo.getExpiredNoticeTime());};ThreadPoolUtil.pools.submit(task);try {Thread.sleep(2);} catch (InterruptedException e) {throw new RuntimeException(e);}}}
}
package com.angel.ocean.util;import cn.hutool.core.thread.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolUtil {private ThreadPoolUtil() {}public static final ThreadPoolExecutor pools = new ThreadPoolExecutor(16, 50, 60, TimeUnit.SECONDS,new LinkedBlockingDeque<>(10000),new ThreadFactoryBuilder().setNamePrefix("MyThread-").build(),new ThreadPoolExecutor.CallerRunsPolicy());
}

缓存截图:
在这里插入图片描述
缓存1:
在这里插入图片描述缓存2:
在这里插入图片描述运行日志截图:
在这里插入图片描述执行延迟定时任务日志截图:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/443608.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Airtest脚本的重构与优化:提升测试效率和可读性

在自动化测试的工作里&#xff0c;编写高效且易于维护的测试脚本是一项挑战&#xff0c;尤其是在应对复杂的测试场景时。Airtest作为一款常用的自动化测试工具&#xff0c;它提供了丰富的API和灵活的脚本编写方式&#xff0c;帮助测试人员高效地开展UI自动化测试。然而&#xf…

Linux的环境与历史

目录 引言 1. Linux 背景介绍 2. 开源 3. 官网 4. 企业应用现状 5. 发行版本 6.见见猪跑 引言 在这个信息化时代&#xff0c;掌握一门操作系统技能显得尤为重要。Linux作为一款开源、稳定且功能强大的操作系统&#xff0c;不仅在服务器领域占据主导地位&#xff0c;也逐渐…

哈希表结构

哈希表结构&#xff1a;数组链表 案例一&#xff1a;HashSet集合的常见使用方法 package com.collection;import java.util.HashSet; import java.util.Iterator;/*** HashSet集合的使用* 存储结构&#xff1a;哈希表(数组链表红黑树)*/ public class Demo07 {public static v…

性能测试学习6:jmeter安装与基本配置/元件/线程组介绍

一.JDK安装 官网&#xff1a;https://www.oracle.com/ 二.Jmeter安装 官网&#xff1a;http://jmeter.apache.org/download_jmeter.cgi 下载zip包&#xff0c;zip后缀那个才是Windows系统的jmeter 三.Jmeter工作目录介绍 四.Jmeter功能 1&#xff09;修改默认配置-汉化 2&am…

SapGUI For Windows捕获技术

一、SapGUI For Windows捕获技术 文章目录 一、SapGUI For Windows捕获技术SAP GUI:SAP NetWeaver Business Client:SAP Fiori:二.Sap的自动化配置SAP客户端配置三.Sap GUI自动化脚本四.Sap GUI自动化开发SAP GUI: SAP图形用户界面,是最常用的SAP前端界面。它是一个桌面应…

React(一) 认识React、熟悉类组件、JSX书写规范、嵌入变量表达式、绑定属性

文章目录 一、初始React1. React的基本认识2. Hello案例2.1 三个依赖2.2 渲染页面2.3 hello案例完整代码 二、类组件1. 封装类组件2. 组件里的数据3. 组件里的函数 (重点)4. 案例练习(1) 展示电影列表 三、JSX语法1. 认识JSX2. JSX书写规范及注释3. JSX嵌入变量作为子元素4. JS…

leetcode58:最后一个单词的长度

给你一个字符串 s&#xff0c;由若干单词组成&#xff0c;单词前后用一些空格字符隔开。返回字符串中 最后一个 单词的长度。 单词 是指仅由字母组成、不包含任何空格字符的最大 子字符串 。 示例 1&#xff1a; 输入&#xff1a;s "Hello World" 输出&#xff…

Linux SSH服务

Linux SSH&#xff08;Secure Shell&#xff09;服务是一种安全的远程登录协议&#xff0c;用于在Linux操作系统上远程登录和执行命令。它提供了加密的通信通道&#xff0c;可以在不安全的网络环境中安全地进行远程访问。 SSH服务在Linux系统中通常使用OpenSSH软件包来实现。它…

【Java SE 题库】输出一个数的二进制的奇数位和偶数位

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 题目 2. 分析 3. 代码实现 3.1 运行结果 4. 小结 1. 题目 输入一个数&#xff0c;请分别打印这个数二进制的奇数位和偶数位 例&#xff1a;15 …

Element-快速入门

什么是 Element 在现代前端开发中&#xff0c;组件化的思想日益盛行&#xff0c;Element组件库作为一款流行的UI组件库&#xff0c;特别适用于基于Vue.js的项目&#xff0c;它为开发者提供了丰富的组件和良好的开发体验。 想要使用Element的组件库&#xff0c;我们需要完成下面…

yolov8-pose的TensorRT动态库部署(C++)

文章目录 参考代码概要硬件动态库代码文件结构头文件yolov8-pose.hyolov8-pose.cppCMakeLists.txt调用例子main.cppCMakeLists.txt获取engine模型测试结果参考代码 https://github.com/triple-Mu/YOLOv8-TensorRT 概要 为了方便使用,基于上述开源代码,将其封装成动态库,方…

GAMES104:16 游戏引擎的玩法系统:基础AI-学习笔记

文章目录 一&#xff0c;寻路/导航系统Navigation1.1 Walkable Area1.1.1 Waypoint Network1.1.2 Grid1.1.3 Navigation Mesh1.1.4 Sparse Voxel Octree 1.2 Path Finding1.2.1 Dijkstra Algorithm迪杰斯特拉算法1.2.2 A Star&#xff08;A*算法&#xff09; 1.3 Path Smoothin…

在不支持WSL2的Windows环境下安装Redis并添加环境变量的方法

如果系统版本支持 WSL 2 可跳过本教程。使用官网提供的教程即可 官网教程 查看是否支持 WSL 2 如果不支持或者觉得麻烦可以按照下面的方式安装 下载 点击打开下载地址 下载 zip 文件即可 安装 将下载的 zip 文件解压到自己想要解压的地方即可。&#xff08;注意&#x…

Python 工具库每日推荐【Pillow】

文章目录 引言Python图像处理库的重要性今日推荐:Pillow工具库主要功能:使用场景:安装与配置快速上手示例代码代码解释实际应用案例案例:创建图像拼贴案例分析高级特性图像增强图像水印扩展阅读与资源优缺点分析优点:缺点:总结【 已更新完 TypeScript 设计模式 专栏,感兴…

深入理解HTTP Cookie

&#x1f351;个人主页&#xff1a;Jupiter. &#x1f680; 所属专栏&#xff1a;Linux从入门到进阶 欢迎大家点赞收藏评论&#x1f60a; 目录 HTTP Cookie定义工作原理分类安全性用途 认识 cookie基本格式实验测试 cookie 当我们登录了B站过后&#xff0c;为什么下次访问B站就…

JavaScript 变量的简单学习

目录 1. 变量 1.1 变量是什么 1.2 变量基本使用 1.2.1 声明变量 1.2.2 变量赋值 1.2.3 变量更新 1.2.4 声明多个变量 1.3 变量案例 1.3.1 弹出姓名 1.3.2 交换变量的值 1.4 变量的本质 1.5 变量命名规则 1.6 var VS let 1. 变量 1.1 变量是什么 1.2 变量基本使用 …

Lazada菲律宾本土店选品怎么操作?EasyBoss ERP选品功能来帮你!

由于Lazada本土店在流量、履约速度、类目限制以及回款速度方面的优势&#xff0c;越来越多的Lazada卖家都在考虑转型做本土店&#xff0c;但本土化落地并不是一件容易的事&#xff0c;很多卖家在选品阶段就踩大坑了。 因此&#xff0c;为了选品不踩坑&#xff0c;很多卖家都会…

天海一体,遨游双卫星智能终端扬帆5G智慧海洋

海洋面积占地球表面的70%以上&#xff0c;世界贸易的90%左右由国际海运行业承运。但是&#xff0c;信号覆盖不均、通信延迟高、定位精度不足等问题&#xff0c;严重制约了海洋作业的效率与安全。智慧海洋&#xff0c;通信先行&#xff0c;AORO M5-5G双卫星智能终端应时代需求而…

螺蛳壳里做道场:老破机搭建的私人数据中心---Centos下Docker学习06(Docker网络连接)

如果要搭建基于docker的私人DC&#xff0c;除了虚拟机网络连接外&#xff0c;就得掌握docker的网络连接。磨刀不误砍柴工&#xff0c;或者说工欲善其事必先利其器&#xff0c;我们先学学典型的docker的网络连接方式。Docker的网络连接有四种&#xff1a;bridge、none、containe…

【10086网上营业厅-注册/登录安全分析报告】

前言 由于网站注册入口容易被黑客攻击&#xff0c;存在如下安全问题&#xff1a; 暴力破解密码&#xff0c;造成用户信息泄露短信盗刷的安全问题&#xff0c;影响业务及导致用户投诉带来经济损失&#xff0c;尤其是后付费客户&#xff0c;风险巨大&#xff0c;造成亏损无底洞…