C++实现线程池
C++实现线程池
- 一、前言
- 二、线程池的接口设计
- 2.1、类封装
- 2.2、线程池的初始化
- 2.3、线程池的启动
- 2.4、线程池的停止
- 2.5、线程的执行函数run()
- 2.6、任务的运行函数
- 2.7、等待所有线程结束
- 三、测试线程池
- 四、源码地址
- 总结
一、前言
C++实现的线程池,可能涉及以下知识点:
- decltype。
- packaged_task。
- make_shared。
- mutex。
- unique_lock。
- notify_one。
- future。
- queue。
- bind。
- thread。
等等。
二、线程池的接口设计
(1)封装一个线程池的类。
(2)线程池的初始化:设置线程的数量。
(3)启动线程池:创建线程等工作。
(4)执行任务的函数。
(5)停止线程池。
(6)等所有任务执行完成,退出执行函数。
2.1、类封装
线程池类,采用c++11来实现。
#ifndef _CPP_THREAD_POOL_H_
#define _CPP_THREAD_POOL_H_#include <iostream>
#include <functional>
#include <memory>
#include <queue>
#include <mutex>
#include <vector>
#include <thread>
#include <future>#ifdef WIN32
#include <windows.h>
#else
#include <sys/time.h>
#endifusing namespace std;void getNow(timeval *tv);
int64_t getNowMs();#define TNOW getNow()
#define TNOWMS getNowMs()class CPP_ThreadPool{
protected:struct TaskFunc{TaskFunc(uint64_t expireTime):_expireTime(expireTime){}int64_t _expireTime=0;//超时的绝对时间function<void()> _func;};typedef shared_ptr<TaskFunc> TaskFuncPtr;/* * @brief 获取任务 ** *@return TaskFuncPtr */bool get(TaskFuncPtr& task);/** @brief 线程池是否退出*/bool isTerminate(){return _bTerminate;}/** @brief 线程运行态*/void run();public: /** @brief 构造函数 */CPP_ThreadPool(); /* * @brief 析构, 会停止所有线程 */virtual ~CPP_ThreadPool();/* * * @brief 初始化. * * @param num 工作线程个数 */bool init(size_t num);/** @brief 停止所有线程, 会等待所有线程结束 */void stop();/** @brief 启动所有线程 */bool start();/* * @brief 等待当前任务队列中, 所有工作全部结束(队列无任务). * @param millsecond 等待的时间(ms), -1:永远等待 * @return true, 所有工作都处理完毕 * false,超时退出 */bool waitForAllDone(int millsecond=-1);/** @brief 获取线程个数.* @return size_t 线程个数 */size_t getThreadNum(){unique_lock<mutex> lock(_mutex);return _threads.size();}/** @brief 获取当前线程池的任务数* @return size_t 线程池的任务数 */size_t getJobNum(){unique_lock<mutex> lock(_mutex);return _tasks.size();}/** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}protected:size_t _threadNum;//线程数量bool _bTerminate;//判定是否终止线程池mutex _mutex; //唯一锁vector<thread*> _threads; //工作线程数组queue<TaskFuncPtr> _tasks; //任务队列condition_variable _condition;//条件变量atomic<int> _atomic{0};//原子变量
};#endif
使用示例:
CPP_ThreadPool tpool;
tpool.init(5); //初始化线程池线程数
//启动线程方式
tpool.start();
//将任务丢到线程池中*
tpool.exec(testFunction, 10); //参数和start相同
//等待线程池结束
tpool.waitForAllDone(1000); //参数<0时, 表示无限等待(注意有人调用stop也会推出)
//此时: 外部需要结束线程池是调用
tpool.stop();
注意:ZERO_ThreadPool::exec执行任务返回的是个future, 因此可以通过future异步获取结果, 比如:
int testInt(int i)
{ return i;
}
auto f = tpool.exec(testInt, 5);
cout << f.get() << endl; //当testInt在线程池中执行后, f.get()会返回数值5 class Test
{
public: int test(int i);
};
Test t;
auto f = tpool.exec(std::bind(&Test::test, &t, std::placeholders::_1), 10);
//返回的future对象, 可以检查是否执行
cout << f.get() << endl;
2.2、线程池的初始化
主要是设置线程池中线程的数量,如果线程池已经存在则直接返回,防止重复初始化。
bool CPP_ThreadPool::init(size_t num)
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;_threadNum=num;return true;
}
2.3、线程池的启动
根据设置的线程数量,创建线程并保存在一个数组中。如果线程池已经存在则直接返回,防止重复启动。
bool CPP_ThreadPool::start()
{unique_lock<mutex> lock(_mutex);if(!_threads.empty())return false;for(size_t i=0;i<_threadNum;i++){_threads.push_back(new thread(&CPP_ThreadPool::run,this));}return true;
}
2.4、线程池的停止
设置线程退出条件,并通知所有线程。停止时,要等待所有线程都执行完任务,再销毁线程。
需要注意锁的粒度。
void CPP_ThreadPool::stop()
{// 注意要有这个{},不然会死锁。{unique_lock<mutex> lock(_mutex);_bTerminate=true;_condition.notify_all();}size_t thdCount=_threads.size();for(size_t i=0;i<thdCount;i++){if(_threads[i]->joinable()){_threads[i]->join();}delete _threads[i];_threads[i]=NULL;}unique_lock<mutex> lock(_mutex);_threads.clear();
}
2.5、线程的执行函数run()
读取任务:判断任务是否存在,如果任务队列为空,则进入等待状态直到任务队列不为空或退出线程池(这里需要两次判断,因为可能存在虚假唤醒)。
执行任务:调用匿名函数。
检测所有任务都是否执行完毕:这里使用了原子变量来检测任务是否都执行完,原因在于任务队列为空不代表任务已经执行完(任务可能还在运行中、也可能是任务刚弹出还没运行),使用原子变量来计数就更严谨。
bool CPP_ThreadPool::get(TaskFuncPtr& task)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())//判断任务是否存在{_condition.wait(lock,[this]{return _bTerminate || !_tasks.empty();//唤醒条件});}if(_bTerminate)return false;if(!_tasks.empty())//判断任务是否存在{task=move(_tasks.front());// 使用移动语义_tasks.pop();//弹出一个任务return true;}return false;
}// 执行任务的线程
void CPP_ThreadPool::run()
{while(!isTerminate()){TaskFuncPtr task;// 读取任务bool ok=get(task);if(ok){++_atomic;try{if(task->_expireTime!=0 && task->_expireTime < TNOWMS){// 处理超时任务}elsetask->_func();//执行任务}catch(...){}--_atomic;// 任务执行完毕,这里只是为了通知waitForAllDoneunique_lock<mutex> lock(_mutex);if(_atomic==0 && _tasks.empty())_condition.notify_all();}}
}
2.6、任务的运行函数
这里使用了可变模块参数、智能指针、bind、function、捕获列表的相关技术知识。
返回任务的future对象, 可以通过这个对象来获取返回值。
超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃。
可变模块参数对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数。
/** @brief 用线程池启用任务(F是function, Args是参数) ** * @param ParentFunctor * @param tf * @return 返回任务的future对象, 可以通过这个对象来获取返回值 */template <class F,class... Args>auto exec(F&& f, Args&&... args)->future<decltype(f(args...))>{return exec(0,f,args...);}/* * unused.** @brief 用线程池启用任务(F是function, Args是参数) * @param 超时时间 ,单位ms (为0时不做超时控制) ;若任务超时,此任务将被丢弃 * @param bind function * @return 返回任务的future对象, 可以通过这个对象来获取返回值 ** template <class F, class... Args> * 它是c++里新增的最强大的特性之一,它对参数进行了高度泛化,它能表示0到任意个数、任意类型的参数 * auto exec(F &&f, Args &&... args) -> std::future<decltype(f(args...))> * std::future<decltype(f(args...))>:返回future,调用者可以通过future获取返回值 * 返回值后置*/template<class F,class... Args>auto exec(int64_t timeoutMs,F&& f,Args&&... args) -> future<decltype(f(args...))>{//获取现在时间int64_t expireTime=(timeoutMs==0)?0:TNOWMS+timeoutMs;// 定义返回值类型using retType=decltype(f(args...));// 封装任务auto task=make_shared<packaged_task<retType()>>(bind(forward<F>(f),forward<Args>(args)...));// 封装任务指针,设置过期时间TaskFuncPtr fPtr=make_shared<TaskFunc>(expireTime);fPtr->_func=[task](){(*task)();};unique_lock<mutex> lock(_mutex);// 插入任务_tasks.push(fPtr);// 唤醒阻塞的线程,可以考虑只有任务队列为空的情 况再去notify_condition.notify_one();return task->get_future();}
2.7、等待所有线程结束
bool CPP_ThreadPool::waitForAllDone(int millsecond)
{unique_lock<mutex> lock(_mutex);if(_tasks.empty())return true;if(millsecond<0){_condition.wait(lock,[this]{ return _tasks.empty();});return true;}else{return _condition.wait_for(lock,chrono::milliseconds(millsecond),[this]{ return _tasks.empty();});}
}
三、测试线程池
#include <iostream>
#include "cppThreadPool.h"using namespace std;void func1(int a)
{ cout << "func1() a=" << a << endl;
}
void func2(int a, string b)
{ cout << "func2() a=" << a << ", b=" << b<< endl;
}void func3()
{cout<<"func3"<<endl;
}void test01()
{cout<<"test 01"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池// 执行任务threadpool.exec(func1,10);threadpool.exec(func2,20,"FLY.");threadpool.exec(1000,func3);threadpool.waitForAllDone();threadpool.stop();
}int func1_future(int a)
{ cout << "func1() a=" << a << endl; return a;
}string func2_future(int a, string b)
{ cout << "func2() a=" << a << ", b=" << b<< endl; return b;
}void test02()
{cout<<"test 02"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池future<decltype(func1_future(0))> ret01=threadpool.exec(func1_future,10);future<string> ret02=threadpool.exec(func2_future,20,"FLY.");threadpool.waitForAllDone();cout<<"ret01 = "<<ret01.get()<<endl;cout<<"ret02 = "<<ret02.get()<<endl;threadpool.stop();}class Test{
public:int test(int a){cout<<_name<<": a = "<<a<<endl;return a+1;}void setname(string name){_name=name;}string _name;};void test03()
{cout<<"test 03"<<endl;CPP_ThreadPool threadpool;threadpool.init(2);threadpool.start();//启动线程池Test t1;Test t2;t1.setname("Test 1");t2.setname("Test 2");auto f1=threadpool.exec(bind(&Test::test,&t1,placeholders::_1),10);auto f2=threadpool.exec(bind(&Test::test,&t2,placeholders::_1),20);threadpool.waitForAllDone();cout<<"f1 = "<<f1.get()<<endl;cout<<"f2 = "<<f2.get()<<endl;threadpool.stop();}int main(int argc,char **argv)
{// 简单测试线程池test01();// 测试任务函数返回值test02();// 测试类对象函数的绑定test03();return 0;
}
执行结果:
test 01
func1() a=10
func2() a=20, b=FLY.
func3
test 02
func1() a=10
func2() a=20, b=FLY.
ret01 = 10
ret02 = FLY.
test 03
Test 1: a = 10
Test 2: a = 20
f1 = 11
f2 = 21
四、源码地址
源码已经上传github。
总结
线程池的核心:初始化、线程启动、执行函数、线程停止。
相关文章:

C++实现线程池
C实现线程池一、前言二、线程池的接口设计2.1、类封装2.2、线程池的初始化2.3、线程池的启动2.4、线程池的停止2.5、线程的执行函数run()2.6、任务的运行函数2.7、等待所有线程结束三、测试线程池四、源码地址总结一、前言 C实现的线程池,可能涉及以下知识点&#…...

2023最新Java面试手册(性能优化+微服务架构+并发编程+开源框架)
Java面试手册 一、性能优化面试专栏 1.1、 tomcat性能优化整理 1.2、JVM性能优化整理 1.3、Mysql性能优化整理 二、微服务架构面试专栏 2.1、SpringCloud面试整理 2.2、SpringBoot面试整理 2.3、Dubbo面试整理 三、并发编程高级面试专栏 四、开源框架面试题专栏 4.1、Sprin…...

对灵敏度分析技术进行建模(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

完整爬虫学习笔记(第一章)
文章目录前言:fu:. 爬虫概述:hotdog:原理解剖:one: 服务器渲染:two: 前端JS渲染:fire: 第一个爬虫程序案例总结前言 最近正在学习Python网络爬虫的相关知识,鉴于本人Python水平有限 , 对Python并无太深的理解,所以此文章的主要目的在于抛砖引玉…...

会计师项目管理软件是什么,哪些必不可少的功能
欢迎阅读现代金融专业人士的会计师项目管理指南。在本文中,我们将深入探讨在基于项目的会计的各个方面使用项目管理方法的好处。我们还将教您面临哪些挑战以及如何为您的团队选择最佳工具。 为什么会计师的项目管理很重要? 在会计方面,目标始…...
第 8 章 优化
目录 8.1 优化概述 8.2 优化 SQL 语句 8.3 优化和指标 8.4 优化数据库结构 8.5 优化 InnoDB 表 8.6 优化 MyISAM 表 8.7 内存表的优化 8.8 了解查询执行计划 8.9 控制查询优化器 8.10 缓冲和缓存 8.11 优化锁定操作 8.12 优化 MySQL 服务器 8.13 衡量性能ÿ…...
剑指offer -- java题解
剑指offer -- java题解刷题地址1、数字在升序数组中出现的次数2、二叉搜索树的第k个节点3、二叉树的深度4、数组中只出现一次的两个数字5、和为S的两个数字6、左旋转字符串7、滑动窗口的最大值8、扑克牌顺子9、孩子们的游戏(圆圈中最后剩下的数)10、买卖股票的最好时机(一)刷题…...

若依ruoyi——手把手教你制作自己的管理系统【二、修改样式】
阿里图标一( ̄︶ ̄*)) 图片白嫖一((* ̄3 ̄)╭ ********* 专栏略长 爆肝万字 细节狂魔 请准备好一键三连 ********* 运行成功后: idea后台正常先挂着 我习惯用VScode操作 当然如果有两台机子 一个挂后台一个改前端就更好…...
2023.2.14每日一题——455. 分发饼干
每日一题题目描述解题核心解法一:双指针题目描述 题目链接:455. 分发饼干 假设你是一位很棒的家长,想要给你的孩子们一些小饼干。但是,每个孩子最多只能给一块饼干。 对每个孩子 i,都有一个胃口值 g[i],…...
MySQL入门篇-MySQL常用字符函数小结
备注:测试数据库版本为MySQL 8.0 这个blog我们来聊聊常见的字符函数 函数名函数用途UPPER()返回大写的字符LOWER()返回小写的字符LTRIM()左边去掉空格TRIM()去掉空格RTRIM()右边去掉空格SPACE()返回指定长度的空格CONCAT()连接字符串CONCAT_WS()指定分隔符连接字符串CHAR_LEN…...

解决不同影像裁剪后栅格数据行列不一致问题
前言在处理栅格数据时,尽管用同一个矢量文件裁剪栅格数据,不同数据来源的栅格行列数也会出现不一致的情况。如果忽略或解决不好,会导致后续数据处理出现意想不到的误差或错误,尤其是利用编程实现数据处理时。因此,应当…...

visual studio2022配置opencv
标题:在vs下配置使用opencv 流程: 1、下载安装opencv 2、添加环境变量 3、vs中配置属性 4、使用 5、可能遇到的报错和解决 1、 下载安装opencv 官网下载地址: https://opencv.org/releases/ 我这里是windows环境,所以选择点击w…...

什么是销售管理?销售管理的五大职能
销售管理听起来很简单,似乎只是负责销售并确保客户满意,但事实上,它远不止于此。 销售管理的实际职能包括监督销售团队的工作,制定计划和设定目标,通常还包括确保销售流程的效率以获得最佳业务结果。 什么是销售管理…...

[CVPR‘22] EG3D: Efficient Geometry-aware 3D Generative Adversarial Networks
paper: https://nvlabs.github.io/eg3d/media/eg3d.pdfproject: EG3D: Efficient Geometry-aware 3D GANscode: GitHub - NVlabs/eg3d总结: 本文提出一种hybrid explicit-implicit 3D representation: tri-plane hybrid 3D representation,该方法不仅有…...

Learning C++ No.9【STL No.1】
引言: 北京时间:2023/2/13/18:29,开学正式上课第一天,直接上午一节思想政治,下午一节思想政治,生怕我们……,但,我深知该课的无聊,所以充分利用时间,把我的小…...

Apifox推荐-django后台验证token配置
最近事情很多,但是我还是想写一片推荐apifox的文章。 优秀的UI,清晰地逻辑,丰富的功能。对于我们这种业余选手来说,他真的很便利。 更新新版后有了更多贴心的功能,让你感觉他是一个有温度的工具。 最重要的是…...

SAS应用入门学习笔记6
SQL (SAS): Features: 1)不需要在每个query中重复调用每个SQL; 2)每个statement都是独立去完成的; 3)我们是没有proc print和proc sort语句的;(order by) key synta…...

【3D目标检测】Pseudo-Stereo for Monocular 3D Object Detection in Autonomous Driving
目录概述细节背景与整体流程图像级别生成特征级别生成损失函数学习深度感知的特征概述 本文是基于单目图像的3D目标检测方法。 【2021】【MonoDLE】 研究的问题: 能否借助立体图像检测算法提高单目图像检测的效果如何实现右侧图像的生成 解决的方法: 受启发于伪…...
git 常用命令之 git branch
大家好,我是 17。 新建 git 分支 分支是并行开发的基础。分支名称的本质是对分支最后一个提交的引用。分支有多个,但 HEAD 只有一个,可以认为 HEAD 是"current branch"(当下的分支)。当你用git switch切换分支的时候,…...
Oracle数据泵
Oracle 数据泵:概览 作为一个基于服务器的用于高速移动数据与元数据的工具, Oracle 数据泵具有以下特点: •可通过 DBMS_DATAPUMP 调用 •可提供以下工具: – expdp – impdp – 基于 Web 的界面 •提供四种数据移动方法ÿ…...
k8s从入门到放弃之Ingress七层负载
k8s从入门到放弃之Ingress七层负载 在Kubernetes(简称K8s)中,Ingress是一个API对象,它允许你定义如何从集群外部访问集群内部的服务。Ingress可以提供负载均衡、SSL终结和基于名称的虚拟主机等功能。通过Ingress,你可…...

CentOS下的分布式内存计算Spark环境部署
一、Spark 核心架构与应用场景 1.1 分布式计算引擎的核心优势 Spark 是基于内存的分布式计算框架,相比 MapReduce 具有以下核心优势: 内存计算:数据可常驻内存,迭代计算性能提升 10-100 倍(文档段落:3-79…...
【ROS】Nav2源码之nav2_behavior_tree-行为树节点列表
1、行为树节点分类 在 Nav2(Navigation2)的行为树框架中,行为树节点插件按照功能分为 Action(动作节点)、Condition(条件节点)、Control(控制节点) 和 Decorator(装饰节点) 四类。 1.1 动作节点 Action 执行具体的机器人操作或任务,直接与硬件、传感器或外部系统…...
基于数字孪生的水厂可视化平台建设:架构与实践
分享大纲: 1、数字孪生水厂可视化平台建设背景 2、数字孪生水厂可视化平台建设架构 3、数字孪生水厂可视化平台建设成效 近几年,数字孪生水厂的建设开展的如火如荼。作为提升水厂管理效率、优化资源的调度手段,基于数字孪生的水厂可视化平台的…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)
笔记整理:刘治强,浙江大学硕士生,研究方向为知识图谱表示学习,大语言模型 论文链接:http://arxiv.org/abs/2407.16127 发表会议:ISWC 2024 1. 动机 传统的知识图谱补全(KGC)模型通过…...

有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

Docker 本地安装 mysql 数据库
Docker: Accelerated Container Application Development 下载对应操作系统版本的 docker ;并安装。 基础操作不再赘述。 打开 macOS 终端,开始 docker 安装mysql之旅 第一步 docker search mysql 》〉docker search mysql NAME DE…...

Linux 内存管理实战精讲:核心原理与面试常考点全解析
Linux 内存管理实战精讲:核心原理与面试常考点全解析 Linux 内核内存管理是系统设计中最复杂但也最核心的模块之一。它不仅支撑着虚拟内存机制、物理内存分配、进程隔离与资源复用,还直接决定系统运行的性能与稳定性。无论你是嵌入式开发者、内核调试工…...

Netty从入门到进阶(二)
二、Netty入门 1. 概述 1.1 Netty是什么 Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients. Netty是一个异步的、基于事件驱动的网络应用框架,用于…...

使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...