基于电商场景的高并发RocketMQ实战-促销活动推送千万级用户解决方案【多线程+分片推送实现高性能推送】

🌈🌈🌈🌈🌈🌈🌈🌈
欢迎关注公众号(通过文章导读关注),发送【资料】可领取 深入理解 Redis 系列文章结合电商场景讲解 Redis 使用场景中间件系列笔记编程高频电子书
【11来了】文章导读地址:点击查看文章导读!
🍁🍁🍁🍁🍁🍁🍁🍁
首先介绍一下发布促销活动的整体业务流程:

  1. 运维人员操作页面发布促销活动

  2. 判断促销活动是否和以往活动发布重复

  3. 先将促销活动落库

  4. 发布【促销活动创建】事件

  5. 消费者监听到【促销活动创建】事件,开始对所有用户推送促销活动

    由于用户量很大,这里使用 多线程 + 分片推送 来大幅提升推送速度

整个流程中的主要技术难点就在于:多线程 + 分片推送

整体的流程图如下:

在这里插入图片描述

接下来,开始根据流程图中的各个功能来介绍代码如何实现:

通过 Redis 判断发布的活动是否重复

通过将已经发布的促销活动先保存在 Redis 中,避免短时间内将同一促销活动重复发布

Redis 中存储的 key 的设计:promotion_Concurrency + [促销活动名称] + [促销活动创建人] + [促销活动开始事件] + [促销活动结束时间]

如果一个促销活动已经发布,那么就将这个促销活动按照这个 key 存储进入 Redis 中,value 的话,设置为 UUID 即可

过期时间 的设置:这里将过期时间设置为 30 分钟

通过 MQ 发送【促销活动创建】事件

这里发布促销活动创建事件的时候,消息中存放的数据使用了一个事件类进行存储,这个事件类中只有一个属性,就是促销活动的实体类:

/*** 促销活动创建事件,用于 MQ 传输*/
@Data
public class SalesPromotionCreatedEvent {// 促销活动实体类private SalesPromotionDO salesPromotion;
}

那么通过消费者监听到【促销活动创建】事件之后,就会进行 用户推送 的动作

如何实现用户分片 + 多线程推送

首先来了解一下为什么要对用户进行分片:在电商场景中用户的数量是相当庞大的,中小型电商系统的用户数量都可以达到千万级,那么如果给每一个用户都生成一条消息进行 MQ 推送,这个推送的时间相当漫长,必须优化消息推送的速度,因此将多个用户 合并成一个分片 来进行推送,这样消耗的时间可能还有些久,就再将多个分片 合并成一条消息,之后再将合并后的消息通过 多线程 推送到 MQ 中,整个优化流程如下:

在这里插入图片描述

接下来说一下分片中具体的实现:

首先对用户分片的话,需要知道用户的总数,并且设置好每个分片的大小,才可以将用户分成一个个的分片

获取用户总数的话,假设用户表中 id 是自增的,那么直接从用户表中拿到最大的 用户 id 作为用户总数即可,用户总数不需要非常准确,某个分片多几个少几个影响不大,将每个分片大小设置为 1000,也就是一个分片存放 1000 个用户 id

那么分片操作就是创建一个 Map<Long, Long> userBuckets = LinkedHashMap<Long, Long>(),将每一个分片的用户起使 id 和结束 id 放入即可

之后再将多个用户分片给合并为一条消息,这里合并的时候保证一条消息不超过 1MB(RocketMQ 官方推荐),首先将需要推送的一个个分片给生成一个 JSON 串,表示一个个的推送任务,将所有推送任务放入到 List 集合中,接下来去遍历 List 集合进行多个分片的合并操作,List 集合中存储的是一个个分片任务的 String 串,只需要拿到 String 串的长度,比如说长度为 200,那么这个 String 串占用的空间为 200B,累加不超过 1MB,就将不超过 1MB 的分片合并为一条消息,代码如下:

@Slf4j
@Component
public class SalesPromotionCreatedEventListener implements MessageListenerConcurrently {@DubboReference(version = "1.0.0")private AccountApi accountApi;@Resourceprivate DefaultProducer defaultProducer;@Autowired@Qualifier("sharedSendMsgThreadPool")private SafeThreadPool sharedSendMsgThreadPool;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {try {for(MessageExt messageExt : list) {// 这个代码就可以拿到一个刚刚创建成功的促销活动String message = new String(messageExt.getBody());SalesPromotionCreatedEvent salesPromotionCreatedEvent =JSON.parseObject(message, SalesPromotionCreatedEvent.class);// 将消息中的数据解析成促销活动实体类SalesPromotionDO salesPromotion = salesPromotionCreatedEvent.getSalesPromotion();// 为了这个促销活动,针对全体用户发起push// bucket,就是一个用户分片,这里定义用户分片大小final int userBucketSize = 1000;// 拿到全体用户数量,两种做法,第一种是去找会员服务进行 count,第二种是获取 max(userid),自增主键JsonResult<Long> queryMaxUserIdResult = accountApi.queryMaxUserId();if (!queryMaxUserIdResult.getSuccess()) {throw new BaseBizException(queryMaxUserIdResult.getErrorCode(), queryMaxUserIdResult.getErrorMessage());}Long maxUserId = queryMaxUserIdResult.getData();// 上万条 key-value 对,每个 key-value 对就是一个 startUserId->endUserId,推送任务分片Map<Long, Long> userBuckets = new LinkedHashMap<>(); // // 数据库自增主键是从1开始的long startUserId = 1L; // 这里对所有用户进行分片,将每个分片的 <startUserId, endUserId> 都放入到 userBuckets 中Boolean doSharding = true;while (doSharding) {if (startUserId > maxUserId) {doSharding = false;break;}userBuckets.put(startUserId, startUserId + userBucketSize);startUserId += userBucketSize;}// 提前创建一个推送的消息实例,在循环中直接设置 startUserId 和 endUserId,避免每次循环都去创建一个新对象PlatformPromotionUserBucketMessage promotionPushTask = PlatformPromotionUserBucketMessage.builder().promotionId(salesPromotion.getId()).promotionType(salesPromotion.getType()).mainMessage(salesPromotion.getName()).message("您已获得活动资格,打开APP进入活动页面").informType(salesPromotion.getInformType()).build();// 将需要推送的消息全部放到这个 List 集合中List<String> promotionPushTasks = new ArrayList<>();for (Map.Entry<Long, Long> userBucket : userBuckets.entrySet()) {promotionPushTask.setStartUserId(userBucket.getKey());promotionPushTask.setEndUserId(userBucket.getValue());String promotionPushTaskJSON = JsonUtil.object2Json(promotionPushTask);promotionPushTasks.add(promotionPushTaskJSON);}log.info("本次推送消息用户桶数量, {}",promotionPushTasks.size());// 将上边 List 集合中的推送消息进行合并,这里 ListSplitter 的代码会在下边贴出来ListSplitter splitter = new ListSplitter(promotionPushTasks, MESSAGE_BATCH_SIZE);while(splitter.hasNext()){List<String> sendBatch = splitter.next();log.info("本次批次消息数量,{}",sendBatch.size());// 将多个分片合并为一条消息,提交到线程池中进行消息的推送sharedSendMsgThreadPool.execute(() -> {defaultProducer.sendMessages(RocketMqConstant.PLATFORM_PROMOTION_SEND_USER_BUCKET_TOPIC, sendBatch, "平台优惠活动用户桶消息");});}}} catch(Exception e) {log.error("consume error, 促销活动创建事件处理异常", e);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

其中实现将分片合并的代码 ListSplitter 如下:

public class ListSplitter implements Iterator<List<String>> {// 设置每一个batch最多不超过800k,因为rocketmq官方推荐,不建议长度超过1MB,// 而封装一个rocketmq的message,包括了messagebody,topic,addr等数据,所以我们这边儿设置的小一点儿private int sizeLimit = 800 * 1024;private final List<String> messages;private int currIndex;private int batchSize = 100;public ListSplitter(List<String> messages, Integer batchSize) {this.messages = messages;this.batchSize = batchSize;}public ListSplitter(List<String> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}// 每次从list中取一部分@Overridepublic List<String> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {String message = messages.get(nextIndex);// 获取每条消息的长度int tmpSize = message.length();// 如果当前这个分片就已经超过一条消息的大小了,就将这个分片单独作为一条消息发送if (tmpSize > sizeLimit) {if (nextIndex - currIndex == 0) {nextIndex++;}break;}if (tmpSize + totalSize > sizeLimit || (nextIndex - currIndex) == batchSize ) {break;} else {totalSize += tmpSize;}}List<String> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}@Overridepublic void remove() {throw new UnsupportedOperationException("Not allowed to remove");}
}

线程池中的参数如何设置?

上边使用了线程池进行并发推送消息,那么线程池的参数如何设置了呢?

这里主要说一下对于核心线程数量的设置,直接设置为 0,因为这个线程池主要是对促销活动的消息进行推送,这个推送任务并不是一直都有的,有间断性的特点,因此不需要线程常驻在线程池中,空闲的时候,将所有线程都回收即可

在这个线程池中,通过信号量来控制最多向线程池中提交的任务,如果超过最大提交数量的限制,会在信号量处阻塞,不会再提交到线程池中:

public class SafeThreadPool {private final Semaphore semaphore;private final ThreadPoolExecutor threadPoolExecutor;// 创建线程池的时候,指定最大提交到线程池中任务的数量public SafeThreadPool(String name, int permits) {// 如果超过了 100 个任务同时要运行,会通过 semaphore 信号量阻塞semaphore = new Semaphore(permits);/*** 为什么要这么做,corePoolSize 是 0 ?* 消息推送这块,并不是一直要推送的,促销活动、发优惠券,正常情况下是不会推送* 发送消息的线程池,corePoolSize是0,空闲把线程都回收掉就挺好的*/threadPoolExecutor = new ThreadPoolExecutor(0,permits * 2,60,TimeUnit.SECONDS,new SynchronousQueue<>(),NamedDaemonThreadFactory.getInstance(name));}public void execute(Runnable task) {/*** 超过了 100 个 batch 要并发推送,就会在这里阻塞住* 在比如说 100 个线程都在繁忙的时候,就不可能说有再超过 100 个 batch 要同时提交过来* 极端情况下,最多也就是 100 个 batch 可以拿到信号量*/semaphore.acquireUninterruptibly();threadPoolExecutor.submit(() -> {try {task.run();} finally {semaphore.release();}});}
}// 自定义的线程工厂,创建的线程都作为守护线程存在
public class NamedDaemonThreadFactory implements ThreadFactory {private final String name;private final AtomicInteger counter = new AtomicInteger(0);private NamedDaemonThreadFactory(String name) {this.name = name;}public static NamedDaemonThreadFactory getInstance(String name) {Objects.requireNonNull(name, "必须要传一个线程名字的前缀");return new NamedDaemonThreadFactory(name);}@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, name + "-" + counter.incrementAndGet());thread.setDaemon(true);return thread;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/229161.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

如何使用Docker将.Net6项目部署到Linux服务器(三)

目录 四 安装nginx 4.1 官网下载nginx 4.2 下载解压安装nginx 4.3 进行configure 4.4 执行make 4.5 查看nginx是否安装成功 4.6 nginx的一些常用命令 4.6.1 启动nginx 4.6.2 通过命令查看nginx是否启动成功 4.6.3 关闭Nginx 4.6.5 重启Nginx 4.6.6 杀掉所有Nginx进…

浏览器---善用的一些调试技巧

https://www.cnblogs.com/dasusu/p/17932742.html

c# 捕获全部线程的异常 试验

1.概要 捕获全部线程的异常 试验&#xff0c;最终结果task的异常没有找到捕获方法 2.代码 2.1.试验1 2.1.1 试验结果 2.2 代码 2.2.1主程序代码 using NLog; using System; using System.Threading; using System.Windows.Forms;namespace 异常监控 {static class Program…

C++day4作业

定义一个Person类&#xff0c;私有成员int age&#xff0c;string &name&#xff0c;定义一个Stu类&#xff0c;包含私有成员double *score&#xff0c;写出两个类的构造函数、析构函数、拷贝构造和拷贝赋值函数&#xff0c;完成对Person的运算符重载(算术运算符、条件运算…

IRQ Handler 的使用——以USART串口接收中断分别在标准库与HAL库版本下的举例

前言&#xff1a; 1.中断系统及EXTI外部中断知识点见我的博文&#xff1a; 9.中断系统、EXTI外部中断_eirq-CSDN博客文章浏览阅读301次&#xff0c;点赞7次&#xff0c;收藏6次。EXTI&#xff08;Extern Interrupt&#xff09;外部中断EXTI可以监测指定GPIO口的电平信号&…

Redis(认识NoSQL,认识redis,安装redis,redis桌面客户端,redis常见命令,redis的Java客户端)

文章目录 Redis快速入门1.初识Redis1.1.认识NoSQL1.1.1.结构化与非结构化1.1.2.关联和非关联1.1.3.查询方式1.1.4.事务1.1.5.总结 1.2.认识Redis1.3.安装Redis1.3.1.依赖库1.3.2.上传安装包并解压1.3.3.启动1.3.4.默认启动1.3.5.指定配置启动1.3.6.开机自启 1.4.Redis桌面客户端…

病情聊天机器人,利用Neo4j图数据库和Elasticsearch全文搜索引擎相结合

项目设计目的&#xff1a; 本项目旨在开发一个病情聊天机器人&#xff0c;利用Neo4j图数据库和Elasticsearch全文搜索引擎相结合&#xff0c;实现对病情相关数据的存储、查询和自动回答。通过与用户的交互&#xff0c;机器人可以根据用户提供的症状描述&#xff0c;给出初步的可…

Linux---进程控制

一、进程创建 fork函数 在Linux中fork函数是非常重要的函数&#xff0c;它从已存在进程中创建一个新进程&#xff0c;原进程为父进程 fork函数的功能&#xff1a; 分配新的内存和内核数据结构给子进程将父进程部分数据结构内容拷贝至子进程添加子进程到系统的进程列表中fork返…

听GPT 讲Rust源代码--library/portable-simd

File: rust/library/portable-simd/crates/core_simd/examples/spectral_norm.rs spectral_norm.rs是一个示例程序&#xff0c;它展示了如何使用Portable SIMD库中的SIMD&#xff08;Single Instruction Multiple Data&#xff09;功能来实现频谱规范化算法。该示例程序是Rust源…

如何使用SeaFile搭建本地私有云盘并结合cpolar实现远程访问

文章目录 1. 前言2. SeaFile云盘设置2.1 SeaFile的安装环境设置2.2 SeaFile下载安装2.3 SeaFile的配置 3. cpolar内网穿透3.1 Cpolar下载安装3.2 Cpolar的注册3.3 Cpolar云端设置3.4 Cpolar本地设置 4.公网访问测试5.结语 1. 前言 现在我们身边的只能设备越来越多&#xff0c;…

从零开始:使用 BIND 构建和管理您的 DNS 服务器

1 前言 在这篇文章中&#xff0c;我将详细介绍如何使用 BIND&#xff08;Berkeley Internet Name Domain&#xff09;软件包中的 named 程序来配置和管理一个基本的 DNS 服务器。 从安装 BIND 开始&#xff0c;到设置 DNS 区域文件&#xff0c;再到运行和测试您的服务器&#x…

typescript 中 infer 用法

infer 介绍 infer 一般在 extends 子语句中,infer 会引入一个待推断的类型变量 (如 infer R) R可以是任意单词字母 这个推断的类型变量可以在有条件类型的 true 分支中被引用 允许出现多个同类型变量的 infer。 基本示例 type ParamType<T> T extends (arg: infer…

kubeadm创建k8s集群

kubeadm来快速的搭建一个k8s集群&#xff1a; 二进制搭建适合大集群&#xff0c;50台以上。 kubeadm更适合中下企业的业务集群。 部署框架 master192.168.10.10dockerkubelet kubeadm kubectl flannelnode1192.168.10.20dockerkubelet kubeadm kubectl flannelnode2192.168.1…

新火种AI|福布斯Top50,估值高达50亿,这家AI法律公司令人震惊

2023年3月&#xff0c;OpeAI以雷霆之势推出了ChatGPT&#xff0c;为AI产业带来了颠覆性的进展&#xff0c;让所有人为之震惊。其中有一项对于ChatGPT的测试还引起了了不小的轰动&#xff0c;当时美国伊利诺伊理工大学芝加哥肯特法学院称&#xff0c;GPT-4通过了美国律师资格考试…

嵌入式视频播放器(mplayer)

1.文件准备&#xff1a; MPlayer-1.0rc2.tar.bz2 libmad-0.15.1b.tar.gz 直接Git到本地 git clone https://gitee.com/zxz_FINE/mplayer_tarball.git 2.文件夹准备&#xff1a; src存放解压后的源码文件&#xff0c;target_Mplayer存放编译安装的目标文件 mkdir src targe…

将本地工作空间robot_ws上传到gitee仓库

git config --global user.name "geniusChinaHN" git config --global user.email "12705243geniuschinahnuser.noreply.gitee.com" cd ~/robot_ws #git init#创建原始仓库时候用 git add . git commit -m "上传文件内容描述" #git remote add r…

​iOS实时查看App运行日志

目录 一、设备连接 二、使用克魔助手查看日志 三、过滤我们自己App的日志 &#x1f4dd; 摘要&#xff1a; 本文介绍了如何在iOS iPhone设备上实时查看输出在console控制台的日志。通过克魔助手工具&#xff0c;我们可以连接手机并方便地筛选我们自己App的日志。 &#x1f4…

009:vue结合el-table实现表格行拖拽排序(基于sortablejs)

文章目录 1. 实现效果2. 安装 sortablejs 插件3. 完整组件代码4. 注意点 1. 实现效果 2. 安装 sortablejs 插件 sortablejs 更多用法 cnpm i --save sortablejs3. 完整组件代码 <template><div class"home"><div class"body"><el-ta…

pytorch深度学习笔记(共计169页,基于本人听完B站小土堆PyTorch深度学习快速入门教程所写)

一、笔记视频 pytorch深度学习&#xff08;共计169页&#xff0c;基于本人听完B站小土堆PyTorch深度学习快速入门教程所写&#xff09; 二、获取方式 方式一&#xff1a; 点击下面的链接 pytorch深度学习笔记 如果链接无法打开 直接复制下方链接即可 https://mall.bilibili.c…

appium入门基础

介绍 appium支持在不同平台的UI自动化&#xff0c;如web,移动端,桌面端等。还支持使用java&#xff0c;python&#xff0c;js等语言编写自动化代码。主要用于自动化测试脚本&#xff0c;省去重复的手动操作。 Appium官网 安装 首先必须环境有Node.js用于安装Appium。 总体来…