当前位置: 首页 > news >正文

python线程池提交任务

1. 线程池参数设置

  1. CPU数量:N
  2. 线程池的核心线程数量
    IO密集型的话,一般设置为 2 * N + 1
    CPU密集型的话,一般设置为 N + 1 或者 使用进程池。
  3. 线程池的最大任务队列长度
    (线程池的核心线程数 / 单个任务的执行时间)* 2
    如果线程池有10个核心线程,单个任务的执行时间为0.1s,那么最大任务队列长度设置为200。
from concurrent.futures import ThreadPoolExecutor
thread_pool = ThreadPoolExecutor(max_workers=10)

2. submit方式提交

submit 这种提交方式是一条一条地提交任务:
1. 可以提交不同的任务函数;
2. 线程池的线程在执行任务时出现异常,程序不会停止,而且也看不到对应的报错信息;
3. 得到的结果是乱序的。

import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef run_task(delay):print(f"------------> start to execute task {delay} <------------")time.sleep(delay)print(f"------------> task {delay} execute over !!! <------------")return delay + 10000task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 10      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)############################### 方式1. 虽然是异步提交任务,但是却是同步执行任务。
for p in task_params:future = thread_pool.submit(run_task, p)print(future.result())      # 直接阻塞当前线程,直到任务完成并返回结果,即变成同步############################### 方式2. 异步提交任务,而且异步执行任务,乱序执行,结果乱序。
future_list = []
for p in task_params:future = thread_pool.submit(run_task, p)future_list.append(future)for res in as_completed(future_list):       # 等待子线程执行完毕,先完成的会先打印出来结果,结果是无序的print(f"get last result is {res.result()}")

3. map方式提交

map 这种提交方式可以分批次提交任务:

  1. 每个批次提价的任务函数都相同;
  2. 线程池的线程在执行任务时出现异常,程序终止并打印报错信息;
  3. 得到的结果是有序的。
import time
from concurrent.futures import ThreadPoolExecutor, as_completeddef run_task(delay):print(f"------------> start to execute task {delay} <------------")time.sleep(delay)print(f"------------> task {delay} execute over !!! <------------")return delay + 10000task_params = [1, 4, 2, 5, 3, 6] * 10
threadpool_max_worker = 5      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)task_res = thread_pool.map(run_task, task_params)		# 批量提交任务,乱序执行
print(f"main thread run finished!")
for res in task_res:		# 虽然任务是乱序执行的,但是得到的结果却是有序的。print(f"get last result is {res}")

4. 防止一次性提交的任务量过多

import time
from concurrent.futures import ThreadPoolExecutordef run_task(delay):print(f"------------> start to execute task <------------")time.sleep(delay)print(f"------------> task execute over !!! <------------")task_params = [1, 4, 2, 5, 3, 6] * 100
threadpool_max_worker = 10      # io密集型:cpu数量*2+1;cpu密集型:cpu数量+1
thread_pool = ThreadPoolExecutor(max_workers=threadpool_max_worker)
threadpool_max_queue_size = 200     # 线程池任务队列长度一般设置为  (线程池核心线程数/单个任务执行时间)* 2for p in task_params:print(f"*****************> 1. current queue size of thread pool is {thread_pool._work_queue.qsize()}")while thread_pool._work_queue.qsize() >= threadpool_max_queue_size:time.sleep(1)		# sleep时间要超过单个任务的执行时间print(f"*****************> 2. current queue size of thread pool is {thread_pool._work_queue.qsize()}")thread_pool.submit(run_task, p)print(f"main thread run finished!")

5. 案例分享

案例背景:由于kafka一个topic的一个分区数据只能由一个消费者组中的一个消费者消费,所以现在使用线程池,从kafka里消费某一个分区的数据,将数据提取出来并存于mysql或者redis,然后手动提交offset。

import time
import queue
import concurrent.futures
from threading import Thread
from concurrent.futures import ThreadPoolExecutordef send_task_to_queue(q, params):for idx, p in enumerate(params):q.put((idx, p))     # 把kafka数据put到queue里,如果queue满了就先阻塞着,等待第15行get数据后腾出空间,这里继续put数据print(f"\n set p: {p} into task queue, queue size is {q.qsize()}")def run_task(param_queue):idx, p = param_queue.get()      # 这里一直get数据,即使queue空了,只要kafka持续产生数据,第10行就会持续put数据到queue里print(f"\n ------------> start to execute task {idx} <------------")time.sleep(p)print(f"\n ------------> task {idx} execute over !!! <------------")return idxtask_params = [1, 4, 2, 5, 3] * 20      # 数据模拟kafka中消费得到的数据
thread_pool = ThreadPoolExecutor(max_workers=10)
task_param_queue = queue.Queue(maxsize=10)# 这里启动一个子线程一直往queue里put数据
thread_send_task = Thread(target=send_task_to_queue, args=(task_param_queue, task_params))
thread_send_task.start()while True:     # 这里为什么一直死循环:只要生产者生产数据存储在kafka中,那么消费者就一直能获取到数据future_list = []# 分批去消费queue里的数据for i in range(10):# 这里的子线程从queue里get任务后,queue腾出空间,上面的子线程继续往里面put数据future_list.append(thread_pool.submit(run_task, task_param_queue))# 子线程任务执行结束后,从结果里取最大的索引值,可以用于redis记录,并用户手动提交offset...complete_res, uncomplete_res = concurrent.futures.wait(future_list)future_max_idx = max([future_complete.result() for future_complete in complete_res])print(f"\n ######################################## every batch's max idx is {future_max_idx}")...		# 自行使用 future_max_idx 这个值做处理...

6. 案例优化

上面的案例,是将数据存储于线程队列中,保证每个子线程get()到的数据不重复。
既然线程队列可以保证每个子线程get到的数据不重复,那么利用生成器的一次性特性(使用完一次就没了),是不是也能达到这个效果呢?
试着优化下:

import time
import concurrent.futures
from concurrent.futures import ThreadPoolExecutordef run_task(param_generator):try:idx, p = next(param_generator)print(f"\n ------------> start to execute task idx {idx} <------------")time.sleep(p)print(f"\n ------------> task value {p} execute over !!! <------------")return idxexcept StopIteration:return -1# 这里将task_params变成生成器,使用生成器的一次性特性:消费完一次后数据就消失了
task_params_generator = ((idx, val) for idx, val in enumerate([1, 4, 2, 5, 3] * 20))
thread_pool = ThreadPoolExecutor(max_workers=10)while True:     # 这里为什么一直死循环:只要生产者生产数据存储在kafka中,那么消费者就一直能获取到数据future_list = []# 分批去消费生成器中的数据for i in range(10):# 这里的子线程从生成器中消费数据future_list.append(thread_pool.submit(run_task, task_params_generator))# 子线程任务执行结束后,从结果里取最大的索引用于redis记录,并手动提交offsetcomplete_res, uncomplete_res = concurrent.futures.wait(future_list)future_max_idx = max([future_complete.result() for future_complete in complete_res])print(f"\n ######################################## every batch's max idx is {future_max_idx}")if future_max_idx == -1:        # 如果为-1,说明生成器的数据已经迭代完了,等待kafka新生成数据print(f"\n generator has empty !!!!!")time.sleep(60)

相关文章:

python线程池提交任务

1. 线程池参数设置 CPU数量&#xff1a;N线程池的核心线程数量 IO密集型的话&#xff0c;一般设置为 2 * N 1&#xff1b; CPU密集型的话&#xff0c;一般设置为 N 1 或者 使用进程池。线程池的最大任务队列长度 &#xff08;线程池的核心线程数 / 单个任务的执行时间&#…...

跨境电商企业客户服务优化指南:关键步骤与实用建议

随着全球经济一体化的加强&#xff0c;跨境电子商务产业在过去几年蓬勃发展。但是&#xff0c;为应对激烈竞争&#xff0c;提供全方面的客户服务成为了跨境电子商务卖家在市场中获得优势的关键因素之一。本文将介绍跨境电商企业优化客户服务有哪些步骤&#xff1f;以助力企业提…...

Visual Studio Code 常用快捷键

Visual Studio Code 常用快捷键 文章目录 Visual Studio Code 常用快捷键1. 主命令框2. 常用快捷键2.1 编辑器与窗口管理2.2 代码编辑格式调整光标相关重构代码查找替换显示相关其他 1. 主命令框 F1 或 CtrlShiftP : 打开命令面板。在打开的输入框内&#xff0c;可以输入任何命…...

ubuntu创建pytorch-gpu的docker环境

文章目录 安装docker创建镜像创建容器 合作推广&#xff0c;分享一个人工智能学习网站。计划系统性学习的同学可以了解下&#xff0c;点击助力博主脱贫( •̀ ω •́ )✧ 使用docker的好处就是可以将你的环境和别人的分开&#xff0c;特别是共用的情况下。本文介绍了ubuntu环境…...

数据库原理与应用期末复习试卷2

数据库原理技术与应用 一.单项选择题 设有属性A&#xff0c;B&#xff0c;C&#xff0c;D&#xff0c;以下表示中不是关系的是( C) ​ A、R(A) B、R(A, B, C, D) C、R&#xff08;AxBxCxD&#xff09; D、R(A&#xff0c;B) 在SQL语言中的视图VIEW是数据库的(A&#xff09;…...

操作系统丨单元测试

文章目录 单元测试选择题填空题单元测试 选择题 【单选题】可以实现虚拟存储器的方案是(D)。 A. 固定分区方式 B. 可变分区方式 C. 纯分页方式 D. 请求页式 【单选题】文件系统中文件存储空间的分配是以(D)为基本单位进行的。 A. 字 B. 字节 C. 文件 D. 块 【单选题】哪种…...

tcp/ip协议2实现的插图,数据结构6 (24 - 章)

(142) 142 二四1 TCP传输控制协议 tcpstat统计量与tcp 函数调用链 (143) 143 二四2 TCP传输控制协议 宏定义与常量值–上 (144) 144 二四3 TCP传输控制协议 宏定义与常量值–下 (145) 145 二四4 TCP传输控制协议 结构tcphdr,tcpiphdr (146) 146 二四5 TCP传输控制协议 结构 tcp…...

Linux链接的创建,删除,修改

目录 1. 概述2. 硬链接2.1 创建硬链接2.2 删除硬链接 3. 软链接3.1 创建软链接3.2 删除软链接 5. 常用的终端工具下载 计算机基础–Linux详解 1. 概述 在Linux系统中&#xff0c;链接是一种文件系统中的重要概念。链接允许用户在文件系统中创建指向另一个文件的引用&#xff0c…...

HarmoryOS Ability页面的生命周期

接入穿山甲SDK app示例&#xff1a; android 数独小游戏 经典数独休闲益智 广告接入示例: Android 个人开发者如何接入广告SDK&#xff0c;实现app流量变现 Ability页面的生命周期 学习前端&#xff0c;第一步最重要的是要理解&#xff0c;页面启动和不同场景下的生命周期的…...

【Flink 从入门到成神系列 一】算子

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱敲代码的小黄&#xff0c;阿里巴巴淘天Java开发工程师&#xff0c;CSDN博客专家&#x1f4d5;系列专栏&#xff1a;Spring源码、Netty源码、Kafka源码、JUC源码、dubbo源码系列&#x1f525;如果感觉博主的文章还不错…...

无人机自主寻优降落在移动车辆

针对无人机寻找并降落在移动车辆上的问题&#xff0c;一套可能的研究总体方案&#xff1a; 问题定义与建模&#xff1a; 确定研究的具体范围和目标&#xff0c;包括无人机的初始条件、最大飞行距离、允许的最大追踪误差等。建立马尔科夫决策过程模型&#xff08;MDP&#xff09…...

科技感十足界面模板

科技感界面 在强调简洁的科技类产品相关设计中&#xff0c;背景多数分为&#xff1a;颜色或写实图片两种。 颜色很好理解&#xff0c;大多以深色底为主。强调一种神秘感和沉稳感&#xff0c;同时可以和浅色的文字内容形成很好的对比。 而图片背景的使用&#xff0c;就要求其…...

pytest装饰器 @pytest.mark.parametrize 使用方法

pytest.mark.parametrize 有三种传参方法&#xff0c;分别是&#xff1a; 1.列表传参&#xff1a;将参数值作为列表传递给装饰器。 pytest.mark.parametrize("param", [value1, value2, ..., valuen])2.元组传参&#xff1a;将参数值作为元组传递给装饰器。 pytes…...

redis被攻击

之前由于redis没有修改端口&#xff0c;密码也比较简单&#xff0c;也没有绑定ip 结果被攻击了 1 redis里被写入string类型的脚本&#xff0c;比如&#xff1a;Back1 Back2 Back3 Back4 &#xff0c;内容curl -fsSL http://d.powerofwish.com/pm.sh | sh的形式&#xff0c;如下…...

二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明

处理二手买卖、废品回收小程序 在app.json中声明permission scope.userLocation字段 教程说明 sitemapLocation 指明 sitemap.json 的位置&#xff1b;默认为 ‘sitemap.json’ 即在 app.json 同级目录下名字的 sitemap.json 文件 找到app.json这个文件 把这段代码加进去&…...

【AI视野·今日Sound 声学论文速览 第四十期】Wed, 3 Jan 2024

AI视野今日CS.Sound 声学论文速览 Wed, 3 Jan 2024 Totally 4 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Sound Papers Auffusion: Leveraging the Power of Diffusion and Large Language Models for Text-to-Audio Generation Authors Jinlong Xue, Yayue De…...

Unity组件开发--升降梯

我开发的升降梯由三个部分组成&#xff0c;反正适用于我的需求了&#xff0c;其他人想复用到自己的项目的话&#xff0c;不一定。写的也不是很好&#xff0c;感觉搞的有点复杂啦。完全可以在优化一下&#xff0c;项目赶工期&#xff0c;就先这样吧。能用就行&#xff0c;其他的…...

插槽slot涉及到的样式污染问题

1. 前言 本次我们主要结合一些案例研究一下vue的插槽中样式污染问题。在这篇文章中&#xff0c;我们主要关注以下两点: 父组件的样式是否会影响子组件的样式&#xff1f;子组件的样式是否会影响父组件定义的插槽部分的样式&#xff1f; 2. 准备代码 2.1 父组件代码 <te…...

OpenCV-Python(25):Hough直线变换

目标 理解霍夫变换的概念学习如何在一张图片中检测直线学习函数cv2.HoughLines()和cv2.HoughLinesP() 原理 霍夫变换在检测各种形状的的技术中非常流行。如果你要检测的形状可以用数学表达式写出来&#xff0c;你就可以是使用霍夫变换检测它。即使检测的形状存在一点破坏或者…...

python接口自动化(七)--状态码详解对照表(详解)

1.简介 我们为啥要了解状态码&#xff0c;从它的作用&#xff0c;就不言而喻了。如果不了解&#xff0c;我们就会像个无头苍蝇&#xff0c;横冲直撞。遇到问题也不知道从何处入手&#xff0c;就是想找别人帮忙&#xff0c;也不知道是找前端还是后端的工程师。 状态码的作用是&a…...

Android 实现动态申请各项权限

在Android应用中&#xff0c;如果需要使用一些敏感的权限&#xff08;例如相机、位置等&#xff09;&#xff0c;需要经过用户的授权才能访问。在Android 6.0&#xff08;API级别23&#xff09;及以上的版本中&#xff0c;引入了动态权限申请机制。以下是在Android应用中实现动…...

【leetcode】力扣热门之合并两个有序列表【简单难度】

题目描述 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 用例 输入&#xff1a;l1 [1,2,4], l2 [1,3,4] 输出&#xff1a;[1,1,2,3,4,4] 输入&#xff1a;l1 [], l2 [] 输出&#xff1a;[] 输入&#xff1a;l1 []…...

安全与认证Week3 Tutorial+历年题补充

目录 1) 什么是重放攻击? 2)什么是Kerberos系统?它提供什么安全服务? 3)服务器验证客户端身份的一种简单方法是要求提供密码。在Kerberos中不使用这种身份验证&#xff0c;为什么?Kerberos如何对服务器和客户机进行身份验证? 4) Kerberos的四个要求是什么?Kerberos系…...

【Kotlin】协程

Kotlin协程 背景定义实践GlobalScope.launchrunBlocking业务实践 背景 在项目实践过程中&#xff0c;笔者发现很多异步或者耗时的操作&#xff0c;都使用了Kotlin中的协程&#xff0c;所以特地研究了一番。 定义 关于协程&#xff08;Coroutine&#xff09;&#xff0c;其实…...

Scikit-Learn线性回归(五)

Scikit-Learn线性回归五:岭回归与Lasso回归 1、误差与模型复杂度2、范数与正则化2.1、范数2.2、正则化3、Scikit-Learn Ridge回归(岭回归)4、Scikit-Learn Lasso回归1、误差与模型复杂度 在第二篇文章 Scikit-Learn线性回归(二) 中,我们已经给出了过拟合与模型泛化的概念并…...

React(2): 使用 html2canvas 生成图片

使用 html2canvas 生成图片 需求 将所需的内容生成图片div 中包括 svg 等 前置准备 "react": "^18.2.0","react-dom": "^18.2.0","html2canvas": "^1.4.1",实现 <div ref{payRef}></div>const pa…...

CAN物理层协议介绍

目录 ​编辑 1. CAN协议简介 2. CAN物理层 3. 通讯节点 4. 差分信号 5. CAN协议中的差分信号 1. CAN协议简介 CAN是控制器局域网络(Controller Area Network)的简称,它是由研发和生产汽车电子产品著称的德国BOSCH公司开发的,并最终成为国际标准(ISO11519) &#xff0…...

华为OD机试真题-计算面积-2023年OD统一考试(C卷)

题目描述: 绘图机器的绘图笔初始位置在原点(0, 0),机器启动后其绘图笔按下面规则绘制直线: 1)尝试沿着横向坐标轴正向绘制直线,直到给定的终点值E。 2)期间可通过指令在纵坐标轴方向进行偏移,并同时绘制直线,偏移后按规则1 绘制直线;指令的格式为X offsetY,表示在横…...

设计模式之策略模式【行为型模式】

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档> 学习的最大理由是想摆脱平庸&#xff0c;早一天就多一份人生的精彩&#xff1b;迟一天就多一天平庸的困扰。各位小伙伴&#xff0c;如果您&#xff1a; 想系统/深入学习某…...

git使用(完整流程)

1. 新建仓库 1.右击 git bash 后 输入 git init (仓库为:当前目录) git init name (仓库为:name文件夹) git clone https://github.com/Winnie996/calculate.git //https2.工作区域 工作目录 3. 添加 提交 git add . //工作区添加至暂存区 git commit -m "注释内容&q…...

安徽宿州住房与建设网站/服务外包公司

java作用域public ,private ,protected 及不写时的区别 在说明这四个关键字之前&#xff0c;我想就class之间的关系做一个简单的定义&#xff0c;对于继承自己的class&#xff0c;base class可以认为他们都是自己的子女&#xff0c;而对于和自己一个目录下的classes&#xff0…...

wordpress如何接入支付接口/基本营销策略有哪些

参照文章&#xff0c;生成keystore和导出证书 http://blog.csdn.net/simonchi/article/details/44170875 tomcat配置 访问成功 火狐 IE...

企业网站建设基本流程图/沈阳网站优化

这种情况一般发生在使用代理的情况下 解决办法 将https://asdasd.com 替换为 http://sdadas.com附上stackoverflow链接...

济南高端网站建设公司/佛山百度关键词seo外包

eCos系统中嵌入式软件包的层次结构的最底层是硬件抽象层&#xff08;Hardware Abstraction Layer&#xff09;&#xff0c;简称为HAL&#xff0c;它负责对目标系统硬件平台进行操作和控制&#xff0c;包括对中断和例外的处理&#xff0c;为上层软件提供硬件操作接口。只需提供新…...

怎么用动图做网站背景/seo排名优化怎么样

当数据库需要进行介质恢复时&#xff0c;为了确保数据库能够顺利的执行恢复过程&#xff0c;恢复数据库到当前状态。我们要做的就是验证&#xff01;验证什么呢&#xff1f;当然是验证备份集和归档是否能够进行有效的恢复。防止我们restore后&#xff0c;执行recover时却发现归…...

如何自己做彩票网站/网站域名综合查询

笔者一段时间没接触Hadoop后&#xff0c;想登录HUE页面时&#xff0c;竟然忘了账号和密码&#xff0c;这可如何是好&#xff1f; 通过一番查找&#xff0c;笔者发现解决这个问题只需要用到MySQL即可…… 首先&#xff0c;登录MySQL,切换到hue数据库&#xff0c;找到auth_user…...