当前位置: 首页 > news >正文

Node.JS多线程PromisePool之promise-pool库实现

什么是Promise Pool

Map-like, concurrent promise processing for Node.js.

Promise-Pool是一个用于管理并发请求的JavaScript库,它可以限制同时进行的请求数量,以避免过多的请求导致服务器压力过大。使用Promise-Pool可以方便地实现对多个异步操作的并发控制。

Promise Pool “承诺池” 包允许您批量运行许多承诺。

承诺池确保并发处理任务的最大数量。

承诺池中的每个任务都是其他任务,这意味着一旦一个任务完成,池就开始处理下一个任务。

此处理可确保了为您的任务进行最佳的批处理。

 

Promise Pool - NPMJS

@supercharge/promise-pool - npm (npmjs.com)icon-default.png?t=N7T8https://www.npmjs.com/package/@supercharge/promise-pool

Promise Pool - Document

Promise Poolicon-default.png?t=N7T8https://superchargejs.com/docs/3.x/promise-pool

 

怎么使用PromisePool

Install 安装

so easy , just install it

npm i @supercharge/promise-pool

Usage用例

Using the promise pool is pretty straightforward. The package exposes a class and you can create a promise pool instance using the fluent interface.

使用promise pool承诺池非常简单。该包公开了一个类,您可以使用流畅的接口创建一个承诺池实例。

Here’s an example using a concurrency of 2:

import { PromisePool } from '@supercharge/promise-pool'const users = [{ name: 'Marcus' },{ name: 'Norman' },{ name: 'Christian' }
]const { results, errors } = await PromisePool.withConcurrency(2).for(users).process(async (userData, index, pool) => {const user = await User.createIfNotExisting(userData)return user})

The promise pool uses a default concurrency of 10

默认是十个线程,请按照自己的实际情况(业务+架构)处理

 

在以下示例中,我们创建了一个包含5个worker的线程池。然后,我们向线程池添加了10个任务。线程池会并发执行这些任务,但最多只能有5个任务同时运行。当一个任务完成时,线程池会自动分配下一个任务给空闲的worker。

const PromisePool = require('promise-pool');// 创建一个包含5个worker的线程池
const pool = new PromisePool(5, (task) => {return new Promise((resolve, reject) => {// 模拟一个耗时操作setTimeout(() => {console.log('Task completed:', task);resolve();}, 1000);});
});// 添加任务到线程池
for (let i = 0; i < 10; i++) {pool.addTask(i).then(() => {console.log('Task finished:', i);}).catch((err) => {console.error('Error:', err);});
}//zhengkai.blog.csdn.net

Manually Stop the Pool 手工停止

You can stop the processing of a promise pool using the pool instance provided to the .process() and .handleError() methods. Here’s an example how you can stop an active promise pool from within the .process() method:

await PromisePool.for(users).process(async (user, index, pool) => {if (condition) {return pool.stop()}// processes the `user` data})

You may also stop the pool from within the .handleError() method in case you need to:

import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).handleError(async (error, user, pool) => {if (error instanceof SomethingBadHappenedError) {return pool.stop()}// handle the given `error`}).process(async (user, index, pool) => {// processes the `user` data})

Bring Your Own Error Handling

The promise pool allows for custom error handling. You can take over the error handling by implementing an error handler using the .handleError(handler).

If you provide an error handler, the promise pool doesn’t collect any errors. You must then collect errors yourself.

Providing a custom error handler allows you to exit the promise pool early by throwing inside the error handler function. Throwing errors is in line with Node.js error handling using async/await.

承诺池允许自定义错误处理。

您可以通过使用.手柄错误(处理程序)实现错误处理程序来接管错误处理。

如果您提供了一个错误处理程序,则承诺池不会收集任何错误。

然后,您必须自己收集错误。

提供了一个自定义的错误处理程序,允许您通过抛出错误处理程序函数来提前退出承诺池。

抛出错误与Node.js错误处理使用异步/等待相一致。

import { PromisePool } from '@supercharge/promise-pool'try {const errors = []const { results } = await PromisePool.for(users).withConcurrency(4).handleError(async (error, user) => {if (error instanceof ValidationError) {errors.push(error) // you must collect errors yourselfreturn}if (error instanceof ThrottleError) { // Execute error handling on specific errorsawait retryUser(user)return}throw error // Uncaught errors will immediately stop PromisePool}).process(async data => {// the harder you work for something,// the greater you’ll feel when you achieve it})await handleCollected(errors) // this may throwreturn { results }
} catch (error) {await handleThrown(error)
}

Callback for Started and Finished Tasks 开始和结束任务的回调

You can use the onTaskStarted and onTaskFinished methods to hook into the processing of tasks. The provided callback for each method will be called when a task started/finished processing:

您可以使用任务启动和任务完成的方法来连接到任务的处理中。

当任务启动/完成处理时,将调用为每个方法提供的回调:

import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).onTaskStarted((item, pool) => {console.log(`Progress: ${pool.processedPercentage()}%`)console.log(`Active tasks: ${pool.processedItems().length}`)console.log(`Active tasks: ${pool.activeTasksCount()}`)console.log(`Finished tasks: ${pool.processedItems().length}`)console.log(`Finished tasks: ${pool.processedCount()}`)}).onTaskFinished((item, pool) => {// update a progress bar or something else :)}).process(async (user, index, pool) => {// processes the `user` data})
You can also chain multiple onTaskStarted and onTaskFinished handling (in case you want to separate some functionality):import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).onTaskStarted(() => {}).onTaskStarted(() => {}).onTaskFinished(() => {}).onTaskFinished(() => {}).process(async (user, index, pool) => {// processes the `user` data})

Task Timeouts 超时设置

有时,配置一个任务必须完成处理的超时时间是很有用的。

一个超时的任务被标记为失败。

您可以使用与任务超时(<毫秒>)方法来配置任务的超时:

Sometimes it’s useful to configure a timeout in which a task must finish processing. A task that times out is marked as failed. You may use the withTaskTimeout(<milliseconds>) method to configure a task’s timeout:

import { PromisePool } from '@supercharge/promise-pool'await PromisePool.for(users).withTaskTimeout(2000) // milliseconds.process(async (user, index, pool) => {// processes the `user` data})

Notice: a configured timeout is configured for each task, not for the whole pool. The example configures a 2-second timeout for each task in the pool.

注意:为每个任务配置了一个已配置的超时,而不是为整个池。

该示例为池中的每个任务配置一个2秒的超时。

Correspond Source Items and Their Results 正确响应每个请求

有时,您希望处理后的结果与源项保持一致。

结果项在结果数组中的位置应该与其相关的源项相同。

使用使用对应结果方法来应用此行为:

Sometimes you want the processed results to align with your source items. The resulting items should have the same position in the results array as their related source items. Use the useCorrespondingResults method to apply this behavior:

import { setTimeout } from 'node:timers/promises'
import { PromisePool } from '@supercharge/promise-pool'const { results } = await PromisePool.for([1, 2, 3]).withConcurrency(5).useCorrespondingResults().process(async (number, index) => {const value = number * 2return await setTimeout(10 - index, value)})/*** source array: [1, 2, 3]* result array: [2, 4 ,6]* --> result values match the position of their source items*/

For example, you may have three items you want to process. Using corresponding results ensures that the processed result for the first item from the source array is located at the first position in the result array (=index 0). The result for the second item from the source array is placed at the second position in the result array, and so on …

例如,您可能有三个要处理的项目。

使用相应的结果可以确保从源数组中得到的第一个项的处理结果位于结果数组中的第一个位置(=索引0)。

来自源数组的第二个项的结果被放置在结果数组中的第二个位置,以此类推。

Return Values When Using Corresponding Results 在使用相应的结果时,请返回相应的值

The results array returned by the promise pool after processing has a mixed return type. Each returned item is one of this type:

  • the actual value type: for results that successfully finished processing
  • Symbol('notRun'): for tasks that didn’t run
  • Symbol('failed'): for tasks that failed processing

The PromisePool exposes both symbols and you may access them using

  • Symbol('notRun'): exposed as PromisePool.notRun
  • Symbol('failed'): exposed as PromisePool.failed

处理后由承诺池返回的结果数组具有混合返回类型。

每个返回的项目都是以下类型之一:

实际值类型:对于成功完成处理的结果

符号(“notRun”):用于未运行的任务

符号(“failed”):用于处理失败的任务

承诺池公开了这两个符号,您可以使用

符号(“notRun”):公开为PromisePool.notRun

符号(“failed”):公开为PromisePool.failed

您可以对所有未运行或失败的任务重复处理:

You may repeat processing for all tasks that didn’t run or failed:

import { PromisePool } from '@supercharge/promise-pool'const { results, errors } = await PromisePool.for([1, 2, 3]).withConcurrency(5).useCorrespondingResults().process(async (number) => {// …})const itemsNotRun = results.filter(result => {return result === PromisePool.notRun
})const failedItems = results.filter(result => {return result === PromisePool.failed
})

When using corresponding results, you need to go through the errors array yourself. The default error handling (collect errors) stays the same and you can follow the described error handling section above.

当使用相应的结果时,您需要自己检查错误数组。

默认的错误处理(收集错误)保持不变,您可以按照上面描述的错误处理部分进行操作。

相关文章:

Node.JS多线程PromisePool之promise-pool库实现

什么是Promise Pool Map-like, concurrent promise processing for Node.js. Promise-Pool是一个用于管理并发请求的JavaScript库&#xff0c;它可以限制同时进行的请求数量&#xff0c;以避免过多的请求导致服务器压力过大。使用Promise-Pool可以方便地实现对多个异步操作的并…...

【C++】红黑树讲解及实现

前言&#xff1a; AVL树与红黑树相似&#xff0c;都是一种平衡二叉搜索树&#xff0c;但是AVL树的平衡要求太严格&#xff0c;如果要对AVL树做一些结构修改的操作性能会非常低下&#xff0c;比如&#xff1a;插入时要维护其绝对平衡&#xff0c;旋转的次数比较多&#xff0c;更…...

security如何不拦截websocket

只要添加一个关键配置就行 //忽略websocket拦截Overridepublic void configure(WebSecurity webSecurity){webSecurity.ignoring().antMatchers("/**");} 全部代码我放着了 package com.oddfar.campus.framework.config;import com.oddfar.campus.framework.secur…...

Unity类银河恶魔城学习记录12-3 p125 Limit Inventory Slots源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili Inventory.cs using Newtonsoft.Json.Linq; using System.Collections; us…...

【智能排班系统】雪花算法生成分布式ID

文章目录 雪花算法介绍起源与命名基本原理与结构优势与特点应用场景 代码实现代码结构自定义机器标识RandomWorkIdChooseLocalRedisWorkIdChooselua脚本 实体类SnowflakeIdInfoWorkCenterInfo 雪花算法类配置类雪花算法工具类 说明 雪花算法介绍 在复杂而庞大的分布式系统中&a…...

sass中的导入与部分导入

文章目录 sass中的导入与部分导入1. import&#xff1a;传统的导入方式2. use&#xff1a;现代化的模块化导入 sass中的导入与部分导入 在大型前端项目中&#xff0c;CSS代码量往往十分庞大&#xff0c;为了保持其可读性、可维护性以及便于团队协作&#xff0c;模块化开发成为…...

工业组态 物联网组态 组态编辑器 web组态 组态插件 编辑器

体验地址&#xff1a;by组态[web组态插件] BY组态是一款非常优秀的纯前端的【web组态插件工具】&#xff0c;可无缝嵌入到vue项目&#xff0c;react项目等&#xff0c;由于是原生js开发&#xff0c;对于前端的集成没有框架的限制。同时由于BY组态只是一个插件&#xff0c;不能独…...

git可视化工具

Gitkraken GitKraken 是一款专门用于管理和协作Git仓库的图形化界面工具。它拥有友好直观的界面&#xff0c;使得Git的操作变得更加简单易用&#xff0c;尤其适合那些不熟悉Git命令行的开发者。GitKraken提供了丰富的功能&#xff0c;如代码审查、分支管理、仓库克隆、提交、推…...

基于单片机电子密码锁系统设计

**单片机设计介绍&#xff0c;基于单片机电子密码锁系统设计 文章目录 一 概要二、功能设计设计思路 三、 软件设计原理图 五、 程序六、 文章目录 一 概要 基于单片机电子密码锁系统设计概要主要包括以下几个方面&#xff1a; 一、系统概述 基于单片机电子密码锁系统是一个…...

点云从入门到精通技术详解100篇-基于点云与图像纹理的 道路识别(续)

目录 3.1.2 图像滤波去噪 3.2 道路纹理特征提取 3.3 基于超像素分割的图像特征表达...

《机器学习在量化投资中的应用研究》目录

机器学习在量化投资中的应用研究 获取链接&#xff1a;机器学习在量化投资中的应用研究_汤凌冰著_北京&#xff1a;电子工业出版社 更多技术书籍&#xff1a;技术书籍分享&#xff0c;前端、后端、大数据、AI、人工智能... 内容简介 《机器学习在量化投资中的应用研究…...

Spring拓展点之SmartLifecycle如何感知容器启动和关闭

Spring为我们提供了拓展点感知容器的启动与关闭&#xff0c;从而使我们可以在容器启动或者关闭之时进行定制的操作。Spring提供了Lifecycle上层接口&#xff0c;这个接口只有两个方法start和stop两个方法&#xff0c;但是这个接口并不是直接提供给开发者做拓展点&#xff0c;而…...

深入理解Java匿名内部类(day21)

在Java编程中&#xff0c;匿名内部类是一种非常有用的特性&#xff0c;它允许我们定义和实例化一个类的子类或实现一个接口&#xff0c;而无需给出子类的名称。这种特性使得代码更加简洁、紧凑&#xff0c;尤其适用于一些只使用一次的临时对象。本文将深入探讨Java匿名内部类的…...

《状态模式(极简c++)》

本文章属于专栏- 概述 - 《设计模式&#xff08;极简c版&#xff09;》-CSDN博客 模式说明&#xff1a; 方案&#xff1a;状态模式是一种行为设计模式&#xff0c;用于在对象的内部状态发生改变时改变其行为。它包括三个关键角色&#xff1a;上下文&#xff08;Context&#x…...

Day4-Hive直播行业基础笔试题

Hive笔试题实战 短视频 题目一&#xff1a;计算各个视频的平均完播率 有用户-视频互动表tb_user_video_log&#xff1a; id uid video_id start_time end_time if_follow if_like if_retweet comment_id 1 101 2001 2021-10-01 10:00:00 2021-10-01 10:00:30 …...

mybatis批量新增数据

数据量大的时候如果在循环中执行单条新增操作&#xff0c;是非常慢的。那么如何在mybatis中实现批量新增数据呢&#xff1f; 方法 insert 标签的 foreach 属性可以用于批量插入数据。您可以使用 foreach 属性遍历一个集合&#xff0c;并为集合中的每个元素生成一条插入语句。…...

webrtcP2P通话流程

文章目录 webrtcP2P通话流程webrtc多对多 mesh方案webrtc多对多 mcu方案webrtc多对多 sfu方案webrtc案例测试getUserMediagetUserMedia基础示例-打开摄像头getUserMedia canvas - 截图 打开共享屏幕 webrtcP2P通话流程 在这里&#xff0c;stun服务器包括stun服务和turn转发服…...

游戏引擎中的物理系统

一、物理对象与形状 1.1 对象 Actor 一般来说&#xff0c;游戏中的对象&#xff08;Actor&#xff09;分为以下四类&#xff1a; 静态对象 Static Actor动态对象 Dynamic Actor ---- 可能受到力/扭矩/冲量的影响检测器 TriggerKinematic Actor 运动学对象 ---- 忽略物理法则…...

【C++ STL有序关联容器】map 映射

文章目录 【 1. 基本原理 】【 2. map 的创建 】2.1 调用默认构造函数&#xff0c;创建一个空的 map2.2 map 被构造的同时初始化2.3 通过一个 queue 初始化另一个 queue2.4 取已建 map 中指定区域内的键值对&#xff0c;初始化新的 map2.5 指定排序规则 【 2. map 元素的操作 】…...

【ZZULIOJ】1041: 数列求和2(Java)

目录 题目描述 输入 输出 样例输入 Copy 样例输出 Copy code 题目描述 输入一个整数n&#xff0c;输出数列1-1/31/5-……前n项的和。 输入 输入只有一个整数n。 输出 结果保留2为小数,单独占一行。 样例输入 Copy 3 样例输出 Copy 0.87 code import java.util…...

C++【适配器模式】

简单介绍 适配器模式是一种结构型设计模式 | 它能使接口不兼容的对象能够相互合作。&#xff08;是适配各种不同接口的一个中间件&#xff09; 基础理解 举个例子&#xff1a;当你引用了一个第三方数据分析库&#xff0c;但这个库的接口只能兼容JSON 格式的数据。但你需要它…...

go | 上传文件分析 | http协议分析 | 使用openssl 实现 https 协议 server.key、server.pem

是这样的&#xff0c;现在分析抓包数据 test.go package mainimport ("fmt""log""github.com/gin-gonic/gin" )func main() {r : gin.Default()// Upload single filer.MaxMultipartMemory 8 << 20r.POST("/upload", func(c *g…...

Chatgpt掘金之旅—有爱AI商业实战篇|专业博客|(六)

演示站点&#xff1a; https://ai.uaai.cn 对话模块 官方论坛&#xff1a; www.jingyuai.com 京娱AI 一、AI技术创业博客领域有哪些机会&#xff1f; 人工智能&#xff08;AI&#xff09;技术作为当今科技创新的前沿领域&#xff0c;为创业者提供了广阔的机会和挑战。随着AI技…...

单例模式 JAVA

单例模式 什么是单例模式&#xff1f; 1、单例类只能有一个实例。2、单例类必须自己创建自己的唯一实例。3、单例类必须给所有其他对象提供这一实例。 应用&#xff1a;数据库的连接类&#xff0c;这样就可以确保只创建一次。节省资源。 单例模式代码&#xff1a;涉及懒加载…...

C++从入门到精通——初步认识面向对象及类的引入

初步认识面向对象及类的引入 前言一、面向过程和面向对象初步认识C语言C 二、类的引入C的类名代表什么示例 C与C语言的struct的比较成员函数访问权限继承默认构造函数默认成员初始化结构体大小 总结 前言 面向过程注重任务的流程和控制&#xff0c;适合简单任务和流程固定的场…...

GitHub入门与实践

ISBN: 978-7-115-39409-5 作者&#xff1a;【日】大塚弘记 译者&#xff1a;支鹏浩、刘斌 页数&#xff1a;255页 阅读时间&#xff1a;2023-08-05 推荐指数&#xff1a;★★★★★ 好久之前读完的了&#xff0c;一直没有写笔记。 这本入门Git的书籍还是非常推荐的&#xff0c;…...

centos 安装 stable-diffusion 详细流程

一、安装git 新版 先安装git 工具来更新git源码 &#xff0c; 载下源码后卸载git 版本(centos 默认1.8版本&#xff0c;说是安装会引起失败) 安装git 命令&#xff0c;可使用 git --version查看版本 sudo yum install git -y 卸载git命令 sudo yum remove git 正式源码安装…...

CSS编写登录框样式

/* 重置浏览器默认样式 */ * { margin: 0; padding: 0; box-sizing: border-box; } /* 设置登录框的基本样式 */ .login-box { width: 100%; max-width: 400px; margin: 50px auto; background-color: #f4f4f4; padding: 20px; box-shad…...

Python|OpenCV-获取鼠标点击位置的坐标,并绘制图像(13)

前言 本文是该专栏的第14篇,后面将持续分享OpenCV计算机视觉的干货知识,记得关注。 本文主要来详细说明,基于OpenCV来获取鼠标点击位置的坐标,并按坐标的位置进行自动绘制图像。具体怎么实现,笔者在正文中将结合实际代码案例进行详细说明。 具体细节部分以及完整代码的实…...

设计模式(14):命令模式

介绍 将一个请求封装为一个对象&#xff0c;从而使我们可用不同的请求对象客户进行参数化&#xff1b;对请求排队或者记录请求日志&#xff0c;以及支持可撤销的操作。也称之为&#xff1a;动作Action模式&#xff0c;事务transaction模式。 命令模式角色 抽象命令类(Comman…...

springboot快速搭建网站/页面关键词优化

以植物大战僵尸为例&#xff1a; 用到的工具&#xff1a;spy2.7 用于读取窗口的文件句柄。Cheat Engine6.5这个工具用于读取内存地址 1 import win32process 2 import win32con3 import win32api4 import ctypes 5 import win32gui 6 import time7 8 PROCESS_ALL_ACCESS(0x000F…...

网站建设 上海网站建设/设计网站排行

问题描述 利用 XShell 登录远程 Unix 服务器&#xff0c;启动后台进程&#xff0c;如下所示。 $ command & 当关闭 XShell 后&#xff0c;后台进程就会一起终止。 问题原因 在查看 Bash 的使用手册后&#xff0c;发现如下一段描述&#xff1a; The shell exits by de…...

wordpress有哪些好模版/seo优化前景

一、最小化安装 1、进入系统之后&#xff0c;要配置network网络。 首先ping www.baidu.com (Ctrlz 推出正在执行的命令) 如果ping不通&#xff0c;则修改&#xff1a; vi /etc/sysconfig/network-scripts/ifcfg-ens33 ONBOOTyes 修改之后重启network&#xff1a; servi…...

做网站去哪里找/win7优化极致性能

Apple Pay交通卡功能在国内又下一城。继天津之后&#xff0c;Apple Pay现在正式实现对长沙潇湘卡的支持。这意味着&#xff0c;用户可以使用Apple Pay在长沙乘坐公交、地铁和磁悬浮。手持iPhone 或Apple Watch的用户可以按照以下的方式进行开通&#xff1a;iPhone用户直接打开钱…...

专门做调查的网站/引流推广怎么做

这几天没事&#xff0c;在网上看了一下NET6的开源框架&#xff0c;有几个比较完整切好用的框架&#xff0c;分享一下 1、Admin.NET通用管理平台 这个框架用了几次&#xff0c;还不错&#xff0c;写法也感觉满好的 gitee地址&#xff1a;https://gitee.com/zuohuaijun/Admin.NET…...

南京鼓楼做网站公司/推广信息怎么写

MM:详解Reservation (预留) (2012-03-25 23:45:37) 转载▼ 标签&#xff1a; 预留 杂谈 分类&#xff1a; SAPMM 预留的概念 预订是向仓库提出的一个请求&#xff0c;要求仓库为今后某个日期的发货和为某个目的将物料保持在就绪状态。可以由多个部门为多个帐户分配对象&a…...