Node.JS多线程PromisePool之promise-pool库实现

什么是Promise Pool

Map-like, concurrent promise processing for Node.js.

Promise-Pool是一个用于管理并发请求的JavaScript库,它可以限制同时进行的请求数量,以避免过多的请求导致服务器压力过大。使用Promise-Pool可以方便地实现对多个异步操作的并发控制。

Promise Pool “承诺池” 包允许您批量运行许多承诺。

承诺池确保并发处理任务的最大数量。

承诺池中的每个任务都是其他任务,这意味着一旦一个任务完成,池就开始处理下一个任务。

此处理可确保了为您的任务进行最佳的批处理。

 

Promise Pool - NPMJS

@supercharge/promise-pool - npm (npmjs.com)icon-default.png?t=N7T8https://www.npmjs.com/package/@supercharge/promise-pool

Promise Pool - Document

Promise Poolicon-default.png?t=N7T8https://superchargejs.com/docs/3.x/promise-pool

 

怎么使用PromisePool

Install 安装

so easy , just install it

npm i @supercharge/promise-pool

Usage用例

Using the promise pool is pretty straightforward. The package exposes a class and you can create a promise pool instance using the fluent interface.

使用promise pool承诺池非常简单。该包公开了一个类,您可以使用流畅的接口创建一个承诺池实例。

Here’s an example using a concurrency of 2:

import { PromisePool } from '@supercharge/promise-pool'const users = [{ name: 'Marcus' },{ name: 'Norman' },{ name: 'Christian' }
]const { results, errors } = await PromisePool.withConcurrency(2).for(users).process(async (userData, index, pool) => {const user = await User.createIfNotExisting(userData)return user})

The promise pool uses a default concurrency of 10

默认是十个线程,请按照自己的实际情况(业务+架构)处理

 

在以下示例中,我们创建了一个包含5个worker的线程池。然后,我们向线程池添加了10个任务。线程池会并发执行这些任务,但最多只能有5个任务同时运行。当一个任务完成时,线程池会自动分配下一个任务给空闲的worker。

const PromisePool = require('promise-pool');// 创建一个包含5个worker的线程池
const pool = new PromisePool(5, (task) => {return new Promise((resolve, reject) => {// 模拟一个耗时操作setTimeout(() => {console.log('Task completed:', task);resolve();}, 1000);});
});// 添加任务到线程池
for (let i = 0; i < 10; i++) {pool.addTask(i).then(() => {console.log('Task finished:', i);}).catch((err) => {console.error('Error:', err);});
}//zhengkai.blog.csdn.net

Manually Stop the Pool 手工停止

You can stop the processing of a promise pool using the pool instance provided to the .process() and .handleError() methods. Here’s an example how you can stop an active promise pool from within the .process() method:

await PromisePool.for(users).process(async (user, index, pool) => {if (condition) {return pool.stop()}// processes the `user` data})

You may also stop the pool from within the .handleError() method in case you need to:

import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).handleError(async (error, user, pool) => {if (error instanceof SomethingBadHappenedError) {return pool.stop()}// handle the given `error`}).process(async (user, index, pool) => {// processes the `user` data})

Bring Your Own Error Handling

The promise pool allows for custom error handling. You can take over the error handling by implementing an error handler using the .handleError(handler).

If you provide an error handler, the promise pool doesn’t collect any errors. You must then collect errors yourself.

Providing a custom error handler allows you to exit the promise pool early by throwing inside the error handler function. Throwing errors is in line with Node.js error handling using async/await.

承诺池允许自定义错误处理。

您可以通过使用.手柄错误(处理程序)实现错误处理程序来接管错误处理。

如果您提供了一个错误处理程序,则承诺池不会收集任何错误。

然后,您必须自己收集错误。

提供了一个自定义的错误处理程序,允许您通过抛出错误处理程序函数来提前退出承诺池。

抛出错误与Node.js错误处理使用异步/等待相一致。

import { PromisePool } from '@supercharge/promise-pool'try {const errors = []const { results } = await PromisePool.for(users).withConcurrency(4).handleError(async (error, user) => {if (error instanceof ValidationError) {errors.push(error) // you must collect errors yourselfreturn}if (error instanceof ThrottleError) { // Execute error handling on specific errorsawait retryUser(user)return}throw error // Uncaught errors will immediately stop PromisePool}).process(async data => {// the harder you work for something,// the greater you’ll feel when you achieve it})await handleCollected(errors) // this may throwreturn { results }
} catch (error) {await handleThrown(error)
}

Callback for Started and Finished Tasks 开始和结束任务的回调

You can use the onTaskStarted and onTaskFinished methods to hook into the processing of tasks. The provided callback for each method will be called when a task started/finished processing:

您可以使用任务启动和任务完成的方法来连接到任务的处理中。

当任务启动/完成处理时,将调用为每个方法提供的回调:

import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).onTaskStarted((item, pool) => {console.log(`Progress: ${pool.processedPercentage()}%`)console.log(`Active tasks: ${pool.processedItems().length}`)console.log(`Active tasks: ${pool.activeTasksCount()}`)console.log(`Finished tasks: ${pool.processedItems().length}`)console.log(`Finished tasks: ${pool.processedCount()}`)}).onTaskFinished((item, pool) => {// update a progress bar or something else :)}).process(async (user, index, pool) => {// processes the `user` data})
You can also chain multiple onTaskStarted and onTaskFinished handling (in case you want to separate some functionality):import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).onTaskStarted(() => {}).onTaskStarted(() => {}).onTaskFinished(() => {}).onTaskFinished(() => {}).process(async (user, index, pool) => {// processes the `user` data})

Task Timeouts 超时设置

有时,配置一个任务必须完成处理的超时时间是很有用的。

一个超时的任务被标记为失败。

您可以使用与任务超时(<毫秒>)方法来配置任务的超时:

Sometimes it’s useful to configure a timeout in which a task must finish processing. A task that times out is marked as failed. You may use the withTaskTimeout(<milliseconds>) method to configure a task’s timeout:

import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).withTaskTimeout(2000) // milliseconds.process(async (user, index, pool) => {// processes the `user` data})

Notice: a configured timeout is configured for each task, not for the whole pool. The example configures a 2-second timeout for each task in the pool.

注意:为每个任务配置了一个已配置的超时,而不是为整个池。

该示例为池中的每个任务配置一个2秒的超时。

Correspond Source Items and Their Results 正确响应每个请求

有时,您希望处理后的结果与源项保持一致。

结果项在结果数组中的位置应该与其相关的源项相同。

使用使用对应结果方法来应用此行为:

Sometimes you want the processed results to align with your source items. The resulting items should have the same position in the results array as their related source items. Use the useCorrespondingResults method to apply this behavior:

import { setTimeout } from 'node:timers/promises'
import { PromisePool } from '@supercharge/promise-pool'const { results } = await PromisePool.for([1, 2, 3]).withConcurrency(5).useCorrespondingResults().process(async (number, index) => {const value = number * 2return await setTimeout(10 - index, value)})/*** source array: [1, 2, 3]* result array: [2, 4 ,6]* --> result values match the position of their source items*/

For example, you may have three items you want to process. Using corresponding results ensures that the processed result for the first item from the source array is located at the first position in the result array (=index 0). The result for the second item from the source array is placed at the second position in the result array, and so on …

例如,您可能有三个要处理的项目。

使用相应的结果可以确保从源数组中得到的第一个项的处理结果位于结果数组中的第一个位置(=索引0)。

来自源数组的第二个项的结果被放置在结果数组中的第二个位置,以此类推。

Return Values When Using Corresponding Results 在使用相应的结果时,请返回相应的值

The results array returned by the promise pool after processing has a mixed return type. Each returned item is one of this type:

  • the actual value type: for results that successfully finished processing
  • Symbol('notRun'): for tasks that didn’t run
  • Symbol('failed'): for tasks that failed processing

The PromisePool exposes both symbols and you may access them using

  • Symbol('notRun'): exposed as PromisePool.notRun
  • Symbol('failed'): exposed as PromisePool.failed

处理后由承诺池返回的结果数组具有混合返回类型。

每个返回的项目都是以下类型之一:

实际值类型:对于成功完成处理的结果

符号(“notRun”):用于未运行的任务

符号(“failed”):用于处理失败的任务

承诺池公开了这两个符号,您可以使用

符号(“notRun”):公开为PromisePool.notRun

符号(“failed”):公开为PromisePool.failed

您可以对所有未运行或失败的任务重复处理:

You may repeat processing for all tasks that didn’t run or failed:

import { PromisePool } from '@supercharge/promise-pool'const { results, errors } = await PromisePool.for([1, 2, 3]).withConcurrency(5).useCorrespondingResults().process(async (number) => {// …})const itemsNotRun = results.filter(result => {return result === PromisePool.notRun
})const failedItems = results.filter(result => {return result === PromisePool.failed
})

When using corresponding results, you need to go through the errors array yourself. The default error handling (collect errors) stays the same and you can follow the described error handling section above.

当使用相应的结果时,您需要自己检查错误数组。

默认的错误处理(收集错误)保持不变,您可以按照上面描述的错误处理部分进行操作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/302377.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HarmonyOS 开发-使用SideBarContainer侧边栏淡入淡出动效实现案例

介绍 在2in1或平板上&#xff0c;群聊侧边栏是一种较为常用的功能&#xff0c;虽然HarmonyOS已经具备了基本的动效&#xff0c;但是部分情况下开发者可能有定制侧边栏动效的需求&#xff0c;本例主要介绍了如何基于显式动画实现侧边栏的淡入淡出动效。 效果图预览 使用说明&a…

C#中值类型与引用类型的存储

目录 值对象与引用对象的存储 引用对象的成员存储 值对象与引用对象的存储 数据项的类型定义了存储数据需要的内存大小及组成该类型的数据成员。类型还决定了对象在内存中的存储位置——栈或堆。 C#中类型分为两种&#xff1a;值类型和引用类型&#xff0c;这两种类型的对象…

天机学堂踩坑笔记

相关资源链接&#xff1a; Md笔记&#xff1a;蓝奏云地址 在线笔记&#xff1a;飞书笔记地址 相关视频教程及配套课件&#xff1a; 链接&#xff1a;百度云地址 提取码&#xff1a;hmz1 1. Day01 初识项目 1.1 OpenEuler 22.03LTS yum换源失败 适用于OpenEuler版本为22.03LT…

1.Hexo安装和环境搭建引导

Hexo是一个依赖于一个名为nodejs的程序 因此安装它的方式在Mac和Windows上实际上是一样的 为了在电脑上安装Hexo 需要做两件事 nodejs&#xff0c;基本上是hexo依赖运行的JavaScript框架 Node.js — Run JavaScript Everywheregit&#xff0c;是一个程序&#xff0c;用来管理电…

BurpSuite保姆级教程

Burp Suite下载,破解,代理web,代理模拟器 (一)为Burp Sutie下载运行执行脚本环境(Java) 1.Java官网下载地址&#xff1a;https://www.oracle.com/java/technologies/ 下载Java SE 17.0.8(LTS) 备注&#xff1a;1.2023版Burp Suite 完美的运行脚本的环境是Java17 2.Java8不支持…

数据仓库实践

什么是数据仓库&#xff1f; 数据仓库是一个用于存储大量数据并支持数据分析与报告的系统。它通常用于集成来自不同来源的数据&#xff0c;提供一个统一的视图&#xff0c;以便进行更深入的分析和决策。 数据仓库的主要优势&#xff1f; 决策支持&#xff1a;为企业决策提供可靠…

2024年,AIGC如何渗透我的生活?

本篇博文列举本人最常用的 6 款app中 AIGC 发挥的功能及作用。 Cursor 作为一名科研工作者&#xff0c;平时最常用的软件就是代码编写工具。Cursor内置的Chat功能&#xff0c;可以辅助完成代码编辑&#xff0c;随时随地实现ChatGPT私有化。 Grammarly 可用于Word和Overleaf等…

vue快速入门(三)差值表达式

注释很详细&#xff0c;直接上代码 上一篇 新增内容 插值表达式基本用法插值表达式常用公式 源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-wid…

Jmeter针对多种响应断言的判断

有时候response返回的结果并非一种&#xff0c;有多种&#xff0c;需要对这几种进行判断的时候需要使用Bean Shell。 &#xff08;1&#xff09;首先获取响应数据 String response prev.getResponseDataAsString(); ResponseCode 响应状态码 responseHeaders 响应头信息 res…

c++的学习之路:17、stack、queue与priority_queue

摘要 本文主要是介绍一下stack、queue、priority_queue的使用以及模拟实现&#xff0c;文章末附上代码以及思维导图。 目录 摘要 一、stack的介绍和使用 1、stack的介绍 2、stack的使用 3、stack的模拟实现 二、queue的介绍和使用 1、queue的介绍 2、queue的使用 3、…

每日一题:矩阵置零

给定一个 m x n 的矩阵&#xff0c;如果一个元素为 0 &#xff0c;则将其所在行和列的所有元素都设为 0 。请使用 原地 算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1],[1,1,1]] 输出&#xff1a;[[1,0,1],[0,0,0],[1,0,1]]使用两个标记变量。 class Sol…

2024HW --->反序列化漏洞!

对于反序列化&#xff0c;这个漏洞也是常用的&#xff0c;不过涉及到的方面非常非常广&#xff0c;比其他漏洞也难很多 于是本篇文章就分成PHP和JAVA的反序列化来讲讲 1.反序列化 想要理解反序列化&#xff0c;首先就要理解序列化 序列化&#xff1a;把对象转换为字节序列的过…

模拟memcpy和memmove

memcpy是内存复制函数&#xff0c;原型如下 void *memmove(void *dest, const void *src, size_t count) 从src地址复制count个字节到dest 模拟实现 void *memcpy(void *dest, const void *src, size_t count) {if (dest NULL || src NULL)return NULL;void *ans dest;f…

操作系统知识

根据希赛相关视频课程汇总整理而成&#xff0c;个人笔记&#xff0c;仅供参考。 操作系统概述 *进程管理 进程&#xff1a;程序在一个数据集合上运行的过程&#xff0c;它是系统进行资源分配和调度的一个独立单位。由程序块、进程控制块&#xff08;PCB&#xff09;和数据块三…

【Unity灶台】食品加工系统模型搭建

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

第九届蓝桥杯大赛个人赛省赛(软件类)真题C 语言 A 组-航班时间

#include<iostream> using namespace std;int getTime(){int h1, h2, m1, m2, s1, s2, d 0;//d一定初始化为0&#xff0c;以正确处理不跨天的情况 scanf("%d:%d:%d %d:%d:%d (%d)", &h1, &m1, &s1, &h2, &m2, &s2, &d);return d …

贪心算法|135.分发糖果

力扣题目链接 class Solution { public:int candy(vector<int>& ratings) {vector<int> candyVec(ratings.size(), 1);// 从前向后for (int i 1; i < ratings.size(); i) {if (ratings[i] > ratings[i - 1]) candyVec[i] candyVec[i - 1] 1;}// 从后…

Redis中的复制功能(三)

复制 服务器运行ID 除了复制偏移量和复制积压缓冲区之外&#xff0c;实现部分重同步还需要用到服务器运行ID(run ID): 1.每隔Redis服务器&#xff0c;不论主服务器还是从服务&#xff0c;都会有自己的运行ID2.运行ID在服务器启动时自动生成&#xff0c;由40个随机的十六进制…

算法刷题Day28 | 93.复原IP地址、78.子集、90.子集II

目录 0 引言1 复原IP地址1.1 我的解题 2 子集2.1 我的解题 3 子集II3.1 我的解题 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;算法专栏&#x1f4a5; 标题&#xff1a;算法刷题Day28 | 93.复原IP地址、78.子集、90.子集II❣️ 寄语&#xff1a…

SD-WAN为出海电商提供了什么支持

出海电商行业的持续发展与壮大&#xff0c;使得网络连接的稳定性和效率成为其成功的关键因素。SD-WAN&#xff08;软件定义广域网&#xff09;作为一种先进的网络解决方案&#xff0c;为出海电商提供了诸多优势和支持。 首先&#xff0c;SD-WAN通过智能路由技术&#xff0c;能够…