一、什么是消息队列
消息队列是一种用于在软件系统之间传递消息的技术。它常被用于解耦不同组件或模块之间的通信,减少系统中各个部分之间的直接依赖关系。消息队列可以实现异步通信,发送方将消息发送到队列中,接收方从队列中获取消息并进行处理。这种方式可以提高系统的可靠性、扩展性和可维护性,同时还可以实现消息的持久化、顺序处理、消息重试等功能。
二、代码实现
js代码:
var TaskQueue = /** @class */ (function () {function TaskQueue() {this.waitingQueue = [];this.allQueue = [];this.tempQueue = [];this.finishedQueue = [];this.isRunning = false;}TaskQueue.prototype.add = function (task) {//将任务添加到等待队列中this.waitingQueue.push(task);//排序this.waitingQueue.sort(function (a, b) { return b.priority - a.priority; });//将任务添加到所有任务队列中this.allQueue.push(task);//排序this.allQueue.sort(function (a, b) { return b.priority - a.priority; });};TaskQueue.prototype.start = function () {var _this = this;//改变状态this.isRunning = true;//执行任务this.run();//返回一个Promise对象return new Promise(function (resolve, reject) {resolve(_this.tempQueue);});};TaskQueue.prototype.pause = function () {//改变状态this.isRunning = false;//清空临时的任务队列this.tempQueue = [];};TaskQueue.prototype.resume = function () {//改变状态this.isRunning = true;//拿到之前的任务var res = this.tempQueue;//继续执行任务this.run();return new Promise(function (resolve, reject) {resolve(res);});};TaskQueue.prototype.getAllResults = function () {var _this = this;return new Promise(function (resolve, reject) {resolve(_this.finishedQueue);});};TaskQueue.prototype.run = function () {var _this = this;while (this.waitingQueue.length != 0 && this.isRunning) {//获取当前的任务var currTask = this.waitingQueue.shift();var res = currTask.func();//判断res是不是Promise对象if (res instanceof Promise) {res.then(function (data) {var currRes = {code: 200,msg: '处理成功',data: data};_this.finishedQueue.push(currRes);_this.tempQueue.push(currRes);});}else {var currRes = {code: 400,msg: '任务对象不是Promise',data: res};this.finishedQueue.push(currRes);this.tempQueue.push(currRes);}}};return TaskQueue;
}());
// 测试代码
var queue = new TaskQueue();
queue.add({func: function () { return Promise.resolve('1-Low'); },priority: 10
});
queue.add({func: function () { return Promise.resolve('1-High'); },priority: 5
});
queue.add({func: function () { return Promise.resolve('1-Medium'); },priority: 1
});
queue.start().then(function (res) {console.log('本次处理任务:', res);console.log('全部被处理的任务:', queue.getAllResults());
});
setInterval(function () {queue.pause();queue.add({func: function () { return new Promise(function (resolve, reject) { return setTimeout(function () { return resolve('2-Low'); }, 150); }); },priority: 10});queue.add({func: function () { return new Promise(function (resolve, reject) { return setTimeout(function () { return resolve('2-High'); }, 50); }); },priority: 5});queue.add({func: function () { return new Promise(function (resolve, reject) { return setTimeout(function () { return resolve('2-Medium'); }, 100); }); },priority: 1});queue.resume().then(function (res) {console.log('本次处理任务:', res);console.log('全部被处理的任务:', queue.getAllResults());});
}, 5000);
ts代码:
interface Task {func: Function;priority: number;
}interface Result {code: number;msg: string;data: Promise<any>;
}class TaskQueue {private waitingQueue: Task[];private allQueue: Task[];private tempQueue: Result[];private finishedQueue: Result[];private isRunning: boolean;constructor() {this.waitingQueue = [];this.allQueue = [];this.tempQueue = [];this.finishedQueue = [];this.isRunning = false;}add(task: Task) {//将任务添加到等待队列中this.waitingQueue.push(task)//排序this.waitingQueue.sort((a, b) => b.priority - a.priority)//将任务添加到所有任务队列中this.allQueue.push(task)//排序this.allQueue.sort((a, b) => b.priority - a.priority)}start() {//改变状态this.isRunning = true//执行任务this.run()//返回一个Promise对象return new Promise((resolve, reject) => {resolve(this.tempQueue)})}pause() {//改变状态this.isRunning = false//清空临时的任务队列this.tempQueue = []}resume() {//改变状态this.isRunning = true//拿到之前的任务const res = this.tempQueue//继续执行任务this.run()return new Promise((resolve, reject) => {resolve(res)})}getAllResults() {return new Promise((resolve, reject) => {resolve(this.finishedQueue)})}run() {while (this.waitingQueue.length != 0 && this.isRunning) {//获取当前的任务const currTask = this.waitingQueue.shift() as Taskconst res = currTask.func()//判断res是不是Promise对象if (res instanceof Promise) {res.then((data) => {const currRes = {code: 200,msg: '处理成功',data: data}this.finishedQueue.push(currRes)this.tempQueue.push(currRes)})} else {const currRes = {code: 400,msg: '任务对象不是Promise',data: res}this.finishedQueue.push(currRes)this.tempQueue.push(currRes)}}}
}// 测试代码
const queue = new TaskQueue();queue.add({func: () => Promise.resolve('1-Low'),priority: 10
});
queue.add({func: () => Promise.resolve('1-High'),priority: 5
});
queue.add({func: () => Promise.resolve('1-Medium'),priority: 1
});queue.start().then((res) => {console.log('本次处理任务:', res);console.log('全部被处理的任务:', queue.getAllResults());
});setInterval(() => {queue.pause();queue.add({func: () => new Promise((resolve, reject) => setTimeout(() => resolve('2-Low'), 150)),priority: 10})queue.add({func: () => new Promise((resolve, reject) => setTimeout(() => resolve('2-High'), 50)),priority: 5})queue.add({func: () => new Promise((resolve, reject) => setTimeout(() => resolve('2-Medium'), 100)),priority: 1})queue.resume().then((res) => {console.log('本次处理任务:', res);console.log('全部被处理的任务:', queue.getAllResults());})
},5000)
结果: