python线程池提交任务
1. 线程池参数设置
- CPU数量:
N
- 线程池的核心线程数量
IO密集型的话,一般设置为2 * N + 1
;
CPU密集型的话,一般设置为N + 1
或者 使用进程池。 - 线程池的最大任务队列长度
(线程池的核心线程数 / 单个任务的执行时间)* 2
如果线程池有10个核心线程,单个任务的执行时间为0.1s,那么最大任务队列长度设置为200。
from concurrent.futures import ThreadPoolExecutor
thread_pool = ThreadPoolExecutor(max_workers=10)
2. submit
方式提交
submit
这种提交方式是一条一条地提交任务:
1. 可以提交不同的任务函数;
2. 线程池的线程在执行任务时出现异常,程序不会停止,而且也看不到对应的报错信息;
3. 得到的结果是乱序的。
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef run_task(delay):print(f"------------> start to execute task {delay} <------------")time.sleep(delay)print(f"------------> task {delay} execute over !!! <------------")return delay + 10000task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 10 # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)############################### 方式1. 虽然是异步提交任务,但是却是同步执行任务。
for p in task_params:future = thread_pool.submit(run_task, p)print(future.result()) # 直接阻塞当前线程,直到任务完成并返回结果,即变成同步############################### 方式2. 异步提交任务,而且异步执行任务,乱序执行,结果乱序。
future_list = []
for p in task_params:future = thread_pool.submit(run_task, p)future_list.append(future)for res in as_completed(future_list): # 等待子线程执行完毕,先完成的会先打印出来结果,结果是无序的print(f"get last result is {res.result()}")
3. map
方式提交
map
这种提交方式可以分批次提交任务:
- 每个批次提价的任务函数都相同;
- 线程池的线程在执行任务时出现异常,程序终止并打印报错信息;
- 得到的结果是有序的。
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef run_task(delay):print(f"------------> start to execute task {delay} <------------")time.sleep(delay)print(f"------------> task {delay} execute over !!! <------------")return delay + 10000task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 5 # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)task_res = thread_pool.map(run_task, task_params) # 批量提交任务,乱序执行
print(f"main thread run finished!")
for res in task_res: # 虽然任务是乱序执行的,但是得到的结果却是有序的。print(f"get last result is {res}")
4. 防止一次性提交的任务量过多
import time
from concurrent.futures import ThreadPoolExecutordef run_task(delay):print(f"------------> start to execute task <------------")time.sleep(delay)print(f"------------> task execute over !!! <------------")task_params = [1, 4, 2, 5, 3, 6] * 100
threadpool_max_worker = 10 # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)
threadpool_max_queue_size = 200 # 线程池任务队列长度一般设置为 (线程池核心线程数/单个任务执行时间)* 2for p in task_params:print(f"*****************> 1. current queue size of thread pool is {thread_pool._work_queue.qsize()}")while thread_pool._work_queue.qsize() >= threadpool_max_queue_size:time.sleep(1) # sleep时间要超过单个任务的执行时间print(f"*****************> 2. current queue size of thread pool is {thread_pool._work_queue.qsize()}")thread_pool.submit(run_task, p)print(f"main thread run finished!")
5. 案例分享
案例背景:由于kafka一个topic的一个分区数据只能由一个消费者组中的一个消费者消费,所以现在使用线程池,从kafka里消费某一个分区的数据,将数据提取出来并存于mysql或者redis,然后手动提交offset。
import time
import queue
import concurrent.futures
from threading import Thread
from concurrent.futures import ThreadPoolExecutordef send_task_to_queue(q, params):for idx, p in enumerate(params):q.put((idx, p)) # 把kafka数据put到queue里,如果queue满了就先阻塞着,等待第15行get数据后腾出空间,这里继续put数据print(f"\n set p: {p} into task queue, queue size is {q.qsize()}")def run_task(param_queue):idx, p = param_queue.get() # 这里一直get数据,即使queue空了,只要kafka持续产生数据,第10行就会持续put数据到queue里print(f"\n ------------> start to execute task {idx} <------------")time.sleep(p)print(f"\n ------------> task {idx} execute over !!! <------------")return idxtask_params = [1, 4, 2, 5, 3] * 20 # 数据模拟kafka中消费得到的数据
thread_pool = ThreadPoolExecutor(max_workers=10)
task_param_queue = queue.Queue(maxsize=10)# 这里启动一个子线程一直往queue里put数据
thread_send_task = Thread(target=send_task_to_queue, args=(task_param_queue, task_params))
thread_send_task.start()while True: # 这里为什么一直死循环:只要生产者生产数据存储在kafka中,那么消费者就一直能获取到数据future_list = []# 分批去消费queue里的数据for i in range(10):# 这里的子线程从queue里get任务后,queue腾出空间,上面的子线程继续往里面put数据future_list.append(thread_pool.submit(run_task, task_param_queue))# 子线程任务执行结束后,从结果里取最大的索引值,可以用于redis记录,并用户手动提交offset...complete_res, uncomplete_res = concurrent.futures.wait(future_list)future_max_idx = max([future_complete.result() for future_complete in complete_res])print(f"\n ######################################## every batch's max idx is {future_max_idx}")... # 自行使用 future_max_idx 这个值做处理...
6. 案例优化
上面的案例,是将数据存储于线程队列中,保证每个子线程get()
到的数据不重复。
既然线程队列可以保证每个子线程get
到的数据不重复,那么利用生成器的一次性特性(使用完一次就没了),是不是也能达到这个效果呢?
试着优化下:
import time
import concurrent.futures
from concurrent.futures import ThreadPoolExecutordef run_task(param_generator):try:idx, p = next(param_generator)print(f"\n ------------> start to execute task idx {idx} <------------")time.sleep(p)print(f"\n ------------> task value {p} execute over !!! <------------")return idxexcept StopIteration:return -1# 这里将task_params变成生成器,使用生成器的一次性特性:消费完一次后数据就消失了
task_params_generator = ((idx, val) for idx, val in enumerate([1, 4, 2, 5, 3] * 20))
thread_pool = ThreadPoolExecutor(max_workers=10)while True: # 这里为什么一直死循环:只要生产者生产数据存储在kafka中,那么消费者就一直能获取到数据future_list = []# 分批去消费生成器中的数据for i in range(10):# 这里的子线程从生成器中消费数据future_list.append(thread_pool.submit(run_task, task_params_generator))# 子线程任务执行结束后,从结果里取最大的索引用于redis记录,并手动提交offsetcomplete_res, uncomplete_res = concurrent.futures.wait(future_list)future_max_idx = max([future_complete.result() for future_complete in complete_res])print(f"\n ######################################## every batch's max idx is {future_max_idx}")if future_max_idx == -1: # 如果为-1,说明生成器的数据已经迭代完了,等待kafka新生成数据print(f"\n generator has empty !!!!!")time.sleep(60)
相关文章:
python线程池提交任务
1. 线程池参数设置 CPU数量:N线程池的核心线程数量 IO密集型的话,一般设置为 2 * N 1; CPU密集型的话,一般设置为 N 1 或者 使用进程池。线程池的最大任务队列长度 (线程池的核心线程数 / 单个任务的执行时间&#…...

跨境电商企业客户服务优化指南:关键步骤与实用建议
随着全球经济一体化的加强,跨境电子商务产业在过去几年蓬勃发展。但是,为应对激烈竞争,提供全方面的客户服务成为了跨境电子商务卖家在市场中获得优势的关键因素之一。本文将介绍跨境电商企业优化客户服务有哪些步骤?以助力企业提…...
Visual Studio Code 常用快捷键
Visual Studio Code 常用快捷键 文章目录 Visual Studio Code 常用快捷键1. 主命令框2. 常用快捷键2.1 编辑器与窗口管理2.2 代码编辑格式调整光标相关重构代码查找替换显示相关其他 1. 主命令框 F1 或 CtrlShiftP : 打开命令面板。在打开的输入框内,可以输入任何命…...

ubuntu创建pytorch-gpu的docker环境
文章目录 安装docker创建镜像创建容器 合作推广,分享一个人工智能学习网站。计划系统性学习的同学可以了解下,点击助力博主脱贫( •̀ ω •́ )✧ 使用docker的好处就是可以将你的环境和别人的分开,特别是共用的情况下。本文介绍了ubuntu环境…...
数据库原理与应用期末复习试卷2
数据库原理技术与应用 一.单项选择题 设有属性A,B,C,D,以下表示中不是关系的是( C) A、R(A) B、R(A, B, C, D) C、R(AxBxCxD) D、R(A,B) 在SQL语言中的视图VIEW是数据库的(A)…...
操作系统丨单元测试
文章目录 单元测试选择题填空题单元测试 选择题 【单选题】可以实现虚拟存储器的方案是(D)。 A. 固定分区方式 B. 可变分区方式 C. 纯分页方式 D. 请求页式 【单选题】文件系统中文件存储空间的分配是以(D)为基本单位进行的。 A. 字 B. 字节 C. 文件 D. 块 【单选题】哪种…...

tcp/ip协议2实现的插图,数据结构6 (24 - 章)
(142) 142 二四1 TCP传输控制协议 tcpstat统计量与tcp 函数调用链 (143) 143 二四2 TCP传输控制协议 宏定义与常量值–上 (144) 144 二四3 TCP传输控制协议 宏定义与常量值–下 (145) 145 二四4 TCP传输控制协议 结构tcphdr,tcpiphdr (146) 146 二四5 TCP传输控制协议 结构 tcp…...

Linux链接的创建,删除,修改
目录 1. 概述2. 硬链接2.1 创建硬链接2.2 删除硬链接 3. 软链接3.1 创建软链接3.2 删除软链接 5. 常用的终端工具下载 计算机基础–Linux详解 1. 概述 在Linux系统中,链接是一种文件系统中的重要概念。链接允许用户在文件系统中创建指向另一个文件的引用,…...

HarmoryOS Ability页面的生命周期
接入穿山甲SDK app示例: android 数独小游戏 经典数独休闲益智 广告接入示例: Android 个人开发者如何接入广告SDK,实现app流量变现 Ability页面的生命周期 学习前端,第一步最重要的是要理解,页面启动和不同场景下的生命周期的…...
【Flink 从入门到成神系列 一】算子
👏作者简介:大家好,我是爱敲代码的小黄,阿里巴巴淘天Java开发工程师,CSDN博客专家📕系列专栏:Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列🔥如果感觉博主的文章还不错…...
无人机自主寻优降落在移动车辆
针对无人机寻找并降落在移动车辆上的问题,一套可能的研究总体方案: 问题定义与建模: 确定研究的具体范围和目标,包括无人机的初始条件、最大飞行距离、允许的最大追踪误差等。建立马尔科夫决策过程模型(MDP)…...

科技感十足界面模板
科技感界面 在强调简洁的科技类产品相关设计中,背景多数分为:颜色或写实图片两种。 颜色很好理解,大多以深色底为主。强调一种神秘感和沉稳感,同时可以和浅色的文字内容形成很好的对比。 而图片背景的使用,就要求其…...
pytest装饰器 @pytest.mark.parametrize 使用方法
pytest.mark.parametrize 有三种传参方法,分别是: 1.列表传参:将参数值作为列表传递给装饰器。 pytest.mark.parametrize("param", [value1, value2, ..., valuen])2.元组传参:将参数值作为元组传递给装饰器。 pytes…...

redis被攻击
之前由于redis没有修改端口,密码也比较简单,也没有绑定ip 结果被攻击了 1 redis里被写入string类型的脚本,比如:Back1 Back2 Back3 Back4 ,内容curl -fsSL http://d.powerofwish.com/pm.sh | sh的形式,如下…...

二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明
处理二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明 sitemapLocation 指明 sitemap.json 的位置;默认为 ‘sitemap.json’ 即在 app.json 同级目录下名字的 sitemap.json 文件 找到app.json这个文件 把这段代码加进去&…...

【AI视野·今日Sound 声学论文速览 第四十期】Wed, 3 Jan 2024
AI视野今日CS.Sound 声学论文速览 Wed, 3 Jan 2024 Totally 4 papers 👉上期速览✈更多精彩请移步主页 Daily Sound Papers Auffusion: Leveraging the Power of Diffusion and Large Language Models for Text-to-Audio Generation Authors Jinlong Xue, Yayue De…...

Unity组件开发--升降梯
我开发的升降梯由三个部分组成,反正适用于我的需求了,其他人想复用到自己的项目的话,不一定。写的也不是很好,感觉搞的有点复杂啦。完全可以在优化一下,项目赶工期,就先这样吧。能用就行,其他的…...

插槽slot涉及到的样式污染问题
1. 前言 本次我们主要结合一些案例研究一下vue的插槽中样式污染问题。在这篇文章中,我们主要关注以下两点: 父组件的样式是否会影响子组件的样式?子组件的样式是否会影响父组件定义的插槽部分的样式? 2. 准备代码 2.1 父组件代码 <te…...

OpenCV-Python(25):Hough直线变换
目标 理解霍夫变换的概念学习如何在一张图片中检测直线学习函数cv2.HoughLines()和cv2.HoughLinesP() 原理 霍夫变换在检测各种形状的的技术中非常流行。如果你要检测的形状可以用数学表达式写出来,你就可以是使用霍夫变换检测它。即使检测的形状存在一点破坏或者…...
python接口自动化(七)--状态码详解对照表(详解)
1.简介 我们为啥要了解状态码,从它的作用,就不言而喻了。如果不了解,我们就会像个无头苍蝇,横冲直撞。遇到问题也不知道从何处入手,就是想找别人帮忙,也不知道是找前端还是后端的工程师。 状态码的作用是&a…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)
HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...
rknn优化教程(二)
文章目录 1. 前述2. 三方库的封装2.1 xrepo中的库2.2 xrepo之外的库2.2.1 opencv2.2.2 rknnrt2.2.3 spdlog 3. rknn_engine库 1. 前述 OK,开始写第二篇的内容了。这篇博客主要能写一下: 如何给一些三方库按照xmake方式进行封装,供调用如何按…...

基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错
出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上,所以报错,到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本,cu、torch、cp 的版本一定要对…...
GitHub 趋势日报 (2025年06月08日)
📊 由 TrendForge 系统生成 | 🌐 https://trendforge.devlive.org/ 🌐 本日报中的项目描述已自动翻译为中文 📈 今日获星趋势图 今日获星趋势图 884 cognee 566 dify 414 HumanSystemOptimization 414 omni-tools 321 note-gen …...
Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信
文章目录 Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket(服务端和客户端都要)2. 绑定本地地址和端口&#x…...
CSS设置元素的宽度根据其内容自动调整
width: fit-content 是 CSS 中的一个属性值,用于设置元素的宽度根据其内容自动调整,确保宽度刚好容纳内容而不会超出。 效果对比 默认情况(width: auto): 块级元素(如 <div>)会占满父容器…...
智能AI电话机器人系统的识别能力现状与发展水平
一、引言 随着人工智能技术的飞速发展,AI电话机器人系统已经从简单的自动应答工具演变为具备复杂交互能力的智能助手。这类系统结合了语音识别、自然语言处理、情感计算和机器学习等多项前沿技术,在客户服务、营销推广、信息查询等领域发挥着越来越重要…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战
说明:这是一个机器学习实战项目(附带数据代码文档),如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下,风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...
Linux系统部署KES
1、安装准备 1.版本说明V008R006C009B0014 V008:是version产品的大版本。 R006:是release产品特性版本。 C009:是通用版 B0014:是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存:1GB 以上 硬盘…...