当前位置: 首页 > news >正文

Linux环境下实现并详细分析c/cpp线程池(附源码)

一、线程池原理

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

线程池是一种多线程处理形式,处理过程中将任务添加到任务队列,然后在创建线程后自动启动这些任务。

  • 线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
  • 如果某个线程在托管代码中空闲(如正在等待某个事件), 则线程池将插入另一个辅助线程来使所有处理器保持繁忙
  • 如果所有线程池线程都始终保持繁忙但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。
  • 超过最大值的线程可以排队,但他们要等到其他线程完成后才启动

二、线程池组成

在这里插入图片描述

  1. 任务队列(存储需要处理的任务,由工作的线程来处理这些任务)
  • 通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列或者从任务队列中删除
  • 已处理的任务会被从任务队列中删除
  • 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
  1. 工作的线程(任务队列任务的消费者,worker) ,N个
  • 线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理
  • 工作的线程相当于是任务队列的消费者角色,
  • 如果任务队列为空工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)
  • 如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作
  1. 管理者线程(不处理任务队列中的任务),1个
  • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
  • 当任务过多的时候,可以适当的创建一些新的工作线程
  • 当任务过少的时候,可以适当的销毁一些工作的线程

三、线程池C实现

3.1 cmake

cmake_minimum_required(VERSION 2.8.12)
project(Thread_Pool CXX C)
message(STATUS "CMake version: " ${CMAKE_VERSION})
message(STATUS "CMake system name: " ${CMAKE_SYSTEM_NAME})
message(STATUS "CMake system processor: " ${CMAKE_SYSTEM_PROCESSOR})
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/bin)include_directories(${CMAKE_SOURCE_DIR}/include)
file(GLOB SRC_FILES"${PROJECT_SOURCE_DIR}/src/*.c""${PROJECT_SOURCE_DIR}/src/*.cpp"
)find_package(Threads REQUIRED)
add_executable(${CMAKE_PROJECT_NAME}  ${SRC_FILES})target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT})target_link_libraries(${CMAKE_PROJECT_NAME} Threads::Threads)
target_link_libraries(${CMAKE_PROJECT_NAME} pthread)

3.2 threadPool.h

1. 创建任务结构体(task类型)

  • 任务的函数指针
  • 任务函数的参数

2. 创建线程池结构体(ThreadPoll类型)

  • 任务队列(任务结构体(Task)的指针(Task *taskQ),本质是一个环形队列)
  • 任务队列的的容量,任务队列头(存放的是任务队列数组的下标),任务队列尾(同上)
  • master线程的ID,worker线程的ID(指针,数组的头)
  • 最小线程数,最大线程数、繁忙线程数、存活线程数、销毁线程数
  • 锁:线程池的锁、繁忙线程数锁
  • 条件变量:队列是不是满、队列是不是空
  • 销毁线程池标志

3. 相关API函数的声明

  • 线程池创建初始化
  • 销毁线程池
  • 线程池添加任务
  • 获取工作线程数、存活线程数
  • 单个线程退出
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__
#include<stdio.h>
#include<stdio.h>
#include<pthread.h>//任务结构体
struct Task{void(*function)(void* arg);  //任务的函数指针void* arg;					// 任务函数的参数(void指针类型)
}; 
typedef struct Task Task;
//线程池结构体
struct ThreadPool
{ Task* taskQ;			 /* 任务队列 */int queueCapacity;      //容量int queueSize;          //当前任务个数int queueFront;         //队头 -> 取数据int queueRear;          //队尾  -> 放数据pthread_t managerID;    //管理者线程ID(只有1个!)pthread_t *threadIDs;   //工作的线程ID(有许多个!)int minNum;				//最小线程数量int maxNum;				//最大线程数量int busyNum;			//繁忙的线程数量(当前正在执行工作函数的线程)int liveNum;			//存活的线程数量(当前存在的工作线程数,liveNum = busyNum + 没有任务阻塞休眠的线程)int exitNum;			//要销毁的线程数,准备要删除的线程数量pthread_mutex_t mutexPool; //锁整个线程池 pthread_mutex_t <==> std::mutex(C++11)pthread_mutex_t mutexBusy; //锁busyNum变量 pthread_mutex_t <==> std::mutex(C++11)pthread_cond_t notFull;    //任务队列是不是满了 pthread_cond_t <==> std::condition_varibale类(C++11)pthread_cond_t notEmpty;   //任务队列是不是空了int shutdown;              //是不是要销毁线程池,销毁为1,不销毁为0 };
typedef struct ThreadPool ThreadPool;//创建线程池并且初始化ThreadPool* threadPoolCreate(int min,int max,int queueSize);//销毁线程池int threadPoolDestroy(ThreadPool* pool);//给线程池添加任务void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg);//获取线程池中工作的线程的个数int threadPoolBusyNum(ThreadPool* pool);//获取线程池中活着的线程的个数int threadPoolAliveNum(ThreadPool* pool);//注意:工作的线程一定是活着的!但是,活着的线程不一定在工作!!!//工作的线程(消费者线程)的任务函数void* worker(void* arg);//管理者线程任务函数void* manager(void* arg);// (主要用于创建和销毁线程池的!)//单个线程退出void threadExit(ThreadPool* pool);#endif

3.3 threadPool.c

1. threadPoolCreate,创建线程池

  • 输入:工作线程最小数、最大数 , 任务队列大小 ;返回值:线程池指针
  • 1 申请线程池内存,申请max*woker线程的内存,
  • 2 申请pool、busy锁;申请enmpty、full条件变量
  • 3 申请Task * queueSize任务队列内存
  • 4 创建一个master线程、min个woker线程
  • 5 如果申请过程中失败,释放申请内存

2. threadPoolDestroy,销毁线程池

  • 输入:线程池指针;返回值:0
  • 1 销毁线程池标志shutdown置1,
  • 2 回收master线程
  • 3 empty条件变量唤醒worker线程
    • woker线程没有任务的时候,empty条件变量将woker阻塞
    • worker线程因为empty条件变量阻塞有两种唤醒情况:
      • 1、添加新任务,唤醒empty阻塞线程,布置新任务;
      • 2、线程池销毁,唤醒empty阻塞线程,然后判断销毁标志shutdown == 1,woker线程自杀(使用threadExit)
  • 4 释放申请的内存
  • 5 释放锁和条件变量

3. threadPoolAdd,添加任务

  • 输入:线程池指针,任务函数指针,任务函数参数指针;返回值:空
  • 1 如果线程池任务队列数等于最大容量,使用full条件变量阻塞master线程
  • 2 任务队列添加任务(添加任务函数和函数参数)
  • 3 队尾后移(对队列容量取余,实现环形队列),队列大小++
  • 4 empty条件变量唤醒woker线程,之前因为任务队列空而empty条件休眠的woker线程

4. threadPoolBusyNum,获取忙线程个数

  • 输入:线程池指针;返回值:忙线程数

5. threadPoolAliveNum,获取活线程个数

  • 输入:线程池指针;返回值:活线程数

6. worker,工作线程函数

  • 输入:线程池指针;返回值:空
  • 1 判断任务队列是否为空,为空则empty条件变量阻塞线程
  • 2 判断销毁线程数是否大于0,如果大于该worker线程自杀(threadExit函数)
  • 3 判断销毁线程池标志shutdown是否为1,如果是该worker线程自杀(threadExit函数)
  • 4 创建新任务task,从任务队列头中取任务,然后放入新创建的task。
  • 5 移动队列头,queueSize–,busyNum++
  • 6 唤醒full条件变量阻塞的生产者线程,之前add函数中任务队列满了,full条件变量阻塞生产者线程,现在释放
  • 7 执行真正的任务,task.function(task.arg);
  • 8 任务结束busyNum–

7. manager,管理线程函数

  • 输入:线程池指针;返回值:空
  • 1 检测线程池销毁标志shutdown,没有销毁一直检测,每隔一段时间管理一下
  • 2 取出任务队列尺寸,活线程数,忙线程数
  • 3 如果 活线程数 < 最大线程数 && 活线程数 < 队列尺寸,说明忙不过来,添加新woker线程
  • 4 如果 忙线程*2 < 活线程数 && 最小线程数 < 活线程数 ,说明woker过多,设置销毁线程数,然后使用empty条件变量唤醒woker线程,唤醒的worker线程自杀

8. threadExit,线程退出函数

  • 输入:线程池指针;返回值:空
  • 1 获取当前pid
  • 2 在任务队列里对比pid
  • 3 找到自己的pid,改为0,给以后备用
  • 4 线程自杀,pthread_exit(NULL);
#include"../include/thread_pool.h"
#include<string.h>//用memset函数
#include<unistd.h>//用sleep函数
#include<pthread.h>//使用pthread_self() 打印当前线程的id的函数
#include<stdlib.h>//使用malloc操作
const int ADDORDESTROYNUMBER = 2;//每次添加/销毁的线程的number个数//创建线程池并且初始化
//形参表分别为:最小线程个数,最大线程个数以及队列大小!ThreadPool* threadPoolCreate(int min,int max,int queueSize){ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do{if(pool == NULL){//分配内存失败 return 空printf("malloc threadpool fail ...\n");break;// return NULL;}//succeed to create a thread pool!pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);//工作线程中,分别Max个空间的heap区数组空间if(pool->threadIDs == NULL){printf("malloc threadIDs fail ...\n");//分配内存失败 return 空break;// return NULL;}//初始化工作的线程IDs们memset(pool->threadIDs, 0, sizeof(pthread_t) * max);//memset为cstring中的函数,用来赋值!//把线程id数组中的元素全都赋值为0pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;pool->liveNum = min;//初始化时以线程的个数最小值来创建活着的线程的个数!pool->exitNum = 0;//一开始初始化肯定没有销毁线程的,这个数量要根据程序运行中的状态来decide!if(pthread_mutex_init(&pool->mutexPool,NULL) != 0||pthread_mutex_init(&pool->mutexBusy,NULL) != 0||pthread_cond_init(&pool->notEmpty,NULL) != 0||pthread_cond_init(&pool->notFull,NULL) != 0){//此时创建失败printf("mutex or condition inti fail ...\n");break;// return 0;}//此时创建锁mutex和条件变量cond成功!//然后创建任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);//开辟一块arr内存存放任务队列,arr所存储的任务最大值为容量那么大pool->queueCapacity = queueSize;pool->queueSize = 0;//当前任务数为0个pool->queueFront = 0;//因为没有任务,所有头部执行0indexpool->queueRear = 0;//因为没有任务,所有尾部执行0indexpool->shutdown = 0;//初始化时肯定不能销毁线程池,所有标记为0(自己规定的)//创建线程pthread_create(&pool->managerID,NULL,manager,pool);//创建管理者这一个线程for(int i=0;i<min;i++){//一开始就创建min个线程(这是线程池中的最小线程数!)pthread_create(&pool->threadIDs[i],NULL,worker,pool);}//如果能成功执行到这里,那么就表示成功执行了线程池!//此时直接返回线程池即可!return pool;}while(0);//只会执行一次!只要有开辟不成功的case,马上break出while循环!//下面再进行资源释放的工作!if(pool && pool->threadIDs) free(pool->threadIDs);//线程池存在的case下,开辟了线程IDs的空间时,就释放它!if(pool && pool->taskQ) free(pool->taskQ);//线程池存在的case下,开辟了taskQ的空间时,就释放它!if(pool) free(pool);//释放线程池return NULL;}//销毁线程池(当线程池被销毁时,线程池中的所有成员都必须被销毁!)int threadPoolDestroy(ThreadPool* pool){if(pool == NULL) return -1;//    表示此时线程池已空,不需要销毁了//  线程池不空(没被销毁时)pool->shutdown = 1;//关闭线程池!//  阻塞回收管理者线程pthread_join(pool->managerID,NULL);//  唤醒阻塞的(活着的)消费者线程//  唤醒后他们会自动退出,为什么退出呢?(因为我们写了让他们退出的条件判断代码!)for(int i = 0;i < pool->liveNum; i++){pthread_cond_signal(&pool->notEmpty);}//  释放申请的堆区内存if(pool->taskQ){free(pool->taskQ);pool->taskQ = NULL;}if(pool->threadIDs){free(pool->threadIDs);pool->threadIDs = NULL;}//再释放互斥量锁还有条件类锁pthread_mutex_destroy(&pool->mutexBusy);pthread_mutex_destroy(&pool->mutexPool);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;//return 0 就表示的是成功Destory了线程池并返回了!}//给线程池的任务队列中添加任务void threadPoolAdd(ThreadPool* pool,void(*func)(void*),void* arg){//由于你给该线程池的任务队列中添加任务时,很有可能此时你正在对任务进行读or写的操作//那么因此这里就必须要用线程池的锁mutex_pool来锁住(防止这份共享代码因为OS的调度切换搞乱了!)pthread_mutex_lock(&pool->mutexPool);while(pool->queueSize == pool->queueCapacity && !pool->shutdown){//  此时线程池的任务队列数 = 其最大容量了 并且 该线程池还没有被销毁 时//  阻塞生产者线程pthread_cond_wait(&pool->notFull,&pool->mutexPool);}//  此时被堵塞的生产者线程 被唤醒了(注意:此时该线程还是拿到了mutex互斥锁的状态的!)//  先判断线程池是否已经被销毁了!//  线程池被销毁if(pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);//   解锁return;//   并 退出程序}//  线程池没被销毁时 //  给线程池中的任务队列 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;//  让队尾index (循环)后移!pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;//    队列任务+1//生产者生产出新的任务,唤醒消费者,消费任务pthread_cond_signal(&pool->notEmpty);//pool->notEmpty用来worker中!//通知pool->notEmpty这个condition_variable条件变量的对象 返回true 让他唤醒了去工作干活了!pthread_mutex_unlock(&pool->mutexPool);}//  获取线程池中工作(忙)的线程的个数int threadPoolBusyNum(ThreadPool* pool){//  注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据//  应该加上锁!pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;}//  获取线程池中活着的线程的个数int threadPoolAliveNum(ThreadPool* pool){//  注意 读取pool中的成员变量时 也为了防止别的线程给这些变量写or读数据//  应该加上锁!且这里因为没有给aliveNum 定义对应的互斥量 so 直接用线程池的锁即可pthread_mutex_lock(&pool->mutexPool);int aliveNum = pool->liveNum ;pthread_mutex_unlock(&pool->mutexPool);return aliveNum ;}//  工作的线程(消费者线程)的任务函数void* worker(void* arg){//首先把传入进来的参数类型转换为线程池类型ThreadPool* pool = (ThreadPool*)arg;//接下来就算不断地读取任务队列中的任务(每个任务就是一个函数)while(1){pthread_mutex_lock(&pool->mutexPool);//加锁//  当前任务队列是否为空while(pool->queueSize == 0 && !pool->shutdown){//  在当前队列中任务为空 并且 线程池没有被销毁时 就阻塞在这个工作线程中pthread_cond_wait(&pool->notEmpty,&pool->mutexPool);//只有当pool->notEmpty的返回值为true时,就继续往下执行!//  当该哦工作的线程被唤醒时,判断一下该线程是否需要被销毁if(pool->exitNum > 0){pool->exitNum--;//每次判断好要让该线程自杀后,必须让exitNum--才行!//  当然,当条件变量wait被唤醒后,肯定是拿到了互斥锁才继续往下面执行的//  此时就必须要把pool->mutexPool这把锁头给unlock解锁一下,再退出该程序,否则你没解锁就退出的话该程序就直接死锁了!if(pool->liveNum > pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);//解锁// pthread_exit(NULL);//退出该线程!threadExit(pool);}}}//  判断线程池是否被关闭了if(pool->shutdown){//   若线程池被销毁了,可以直接退出线程!pthread_mutex_unlock(&pool->mutexPool);//解锁//  pthread_exit(NULL);//退出线程threadExit(pool);}//  若线程池没有被销毁了,可以直接做任务了(消费)//  从任务队列中取出一个任务(真正 do things!)struct Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;//  移动头节点 使之do循环移动 构成一个循环队列 , 取余数的目的的为了形成环形队列,到最后一个进行取余变成0,其他不变pool->queueFront = (pool->queueFront+1) % pool->queueCapacity;pool->queueSize--;//取出1个任务了,所有--//解锁,消费者消费成功,唤醒阻塞的生产者生产。pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);//  do 真正的工作(在function中)printf("thread %ld start working...\n",pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;//工作时++pthread_mutex_unlock(&pool->mutexBusy);//  do things 真正工作的函数进行执行task.function(task.arg);// <==> (*task.function)(task.arg);//  同时,因为此时这个线程在忙,所有busyNum++且用对应的锁来锁住,防止这个线程的小任务还没真正开始干活呢,//  OS就因为调度切换到别的线程去了free(task.arg);task.arg = NULL;printf("thread %ld end working...\n",pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;//工作完成后--pthread_mutex_unlock(&pool->mutexBusy);pthread_mutex_unlock(&pool->mutexPool);//解锁}return NULL;}//管理者线程任务函数(主要用于创建和销毁线程池的!)void* manager(void* arg){ThreadPool* pool = (ThreadPool*)arg;while(!pool->shutdown){//  只要线程池没有销毁 就继续管理它//  每隔3s钟 管理一下sleep(3);//linux 下要包含头文件unistd.h才能使用!//  取出线程池钟的任务数量以及当前线程的数量//  (由于你读取的过程钟有可能别的线程正在写数据),为了防止这种case你就必须要上锁才行!pthread_mutex_lock(&pool->mutexPool);//加锁int queueSize = pool->queueSize;int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);//解锁//  取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);//加锁int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);//解锁//  添加线程(此时可以根据你项目的需要来制定相应的添加线程个数的规则)//  比如:当存活的线程数 < max && 当前存活的线程数 < 当前任务队列钟的任务数量 时 就添加线程!(此时表明当前的线程数已经忙不过来了)if(liveNum < pool->maxNum && liveNum < queueSize){pthread_mutex_lock(&pool->mutexPool);//加锁int cnt = 0; for(int i=0;i< pool->maxNum && cnt < ADDORDESTROYNUMBER && pool->liveNum < pool->maxNum;i++){if(pool->threadIDs[i] == 0)//为0就 表明此时该线程ID可用!(之前没被使用,线程我可以用了!){//  创建线程元素并放进去数组中!pthread_create(&pool->threadIDs[i],NULL,worker,pool);cnt++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);//解锁}//  销毁线程(此时可以根据你项目的需要来制定相应的销毁线程个数的规则)//  比如:当 忙的线程数*2 < 存活的线程数 && 最小线程数 < 存活的线程数 时 就销毁线程if(busyNum * 2 < liveNum && pool->minNum < liveNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = ADDORDESTROYNUMBER;//要销毁的线程数字定义为ADDORDESTROYNUMBER(2)pthread_mutex_unlock(&pool->mutexPool);//  让工作的线程自杀for(int i=0;i<ADDORDESTROYNUMBER;i++){//唤醒线程,唤醒的是存活且不忙的线程pthread_cond_signal(&pool->notEmpty);// ==> condition_variable中的.notify_one()}}}return NULL;}//单个线程退出void threadExit(ThreadPool* pool){//先获取当前线程的线程idpthread_t threadId = pthread_self();// <==> std::this_thread::get_id();for(int i=0;i<pool->maxNum;i++){if(pool->threadIDs[i] == threadId){pool->threadIDs[i] = 0;//重新置为0,表示该线程ID可在后续被使用了!printf("threadExit() called,%ld existing...\n",threadId);break;}}pthread_exit(NULL);}

四、线程池CPP实现

4.1 任务队列类的声明

// 定义任务结构体
using callback = void(*)(void*);
struct Task
{Task(){function = nullptr;arg = nullptr;}Task(callback f, void* arg){function = f;this->arg = arg;}callback function;void* arg;
};// 任务队列
class TaskQueue
{
public:TaskQueue();~TaskQueue();// 添加任务void addTask(Task& task);void addTask(callback func, void* arg);// 取出一个任务Task takeTask();// 获取当前队列中任务个数inline int taskNumber(){return m_queue.size();}private:pthread_mutex_t m_mutex;    // 互斥锁std::queue<Task> m_queue;   // 任务队列
};

4.2 任务队列类的定义

TaskQueue::TaskQueue()
{pthread_mutex_init(&m_mutex, NULL);
}TaskQueue::~TaskQueue()
{pthread_mutex_destroy(&m_mutex);
}void TaskQueue::addTask(Task& task)
{pthread_mutex_lock(&m_mutex);m_queue.push(task);pthread_mutex_unlock(&m_mutex);
}void TaskQueue::addTask(callback func, void* arg)
{pthread_mutex_lock(&m_mutex);Task task;task.function = func;task.arg = arg;m_queue.push(task);pthread_mutex_unlock(&m_mutex);
}Task TaskQueue::takeTask()
{Task t;pthread_mutex_lock(&m_mutex);if (m_queue.size() > 0){t = m_queue.front();m_queue.pop();}pthread_mutex_unlock(&m_mutex);return t;
}

4.3 线程池类的声明

class ThreadPool
{
public:ThreadPool(int min, int max);~ThreadPool();// 添加任务void addTask(Task task);// 获取忙线程的个数int getBusyNumber();// 获取活着的线程个数int getAliveNumber();private:// 工作的线程的任务函数static void* worker(void* arg);// 管理者线程的任务函数static void* manager(void* arg);void threadExit();private:pthread_mutex_t m_lock;pthread_cond_t m_notEmpty;pthread_t* m_threadIDs;pthread_t m_managerID;TaskQueue* m_taskQ;int m_minNum;int m_maxNum;int m_busyNum;int m_aliveNum;int m_exitNum;bool m_shutdown = false;
};

4.4 线程池类的定义

ThreadPool::ThreadPool(int minNum, int maxNum)
{// 实例化任务队列m_taskQ = new TaskQueue;do {// 初始化线程池m_minNum = minNum;m_maxNum = maxNum;m_busyNum = 0;m_aliveNum = minNum;// 根据线程的最大上限给线程数组分配内存m_threadIDs = new pthread_t[maxNum];if (m_threadIDs == nullptr){cout << "malloc thread_t[] 失败...." << endl;;break;}// 初始化memset(m_threadIDs, 0, sizeof(pthread_t) * maxNum);// 初始化互斥锁,条件变量if (pthread_mutex_init(&m_lock, NULL) != 0 ||pthread_cond_init(&m_notEmpty, NULL) != 0){cout << "init mutex or condition fail..." << endl;break;}/// 创建线程 //// 根据最小线程个数, 创建线程for (int i = 0; i < minNum; ++i){pthread_create(&m_threadIDs[i], NULL, worker, this);cout << "创建子线程, ID: " << to_string(m_threadIDs[i]) << endl;}// 创建管理者线程, 1个pthread_create(&m_managerID, NULL, manager, this);} while (0);
}ThreadPool::~ThreadPool()
{m_shutdown = 1;// 销毁管理者线程pthread_join(m_managerID, NULL);// 唤醒所有消费者线程for (int i = 0; i < m_aliveNum; ++i){pthread_cond_signal(&m_notEmpty);}if (m_taskQ) delete m_taskQ;if (m_threadIDs) delete[]m_threadIDs;pthread_mutex_destroy(&m_lock);pthread_cond_destroy(&m_notEmpty);
}void ThreadPool::addTask(Task task)
{if (m_shutdown){return;}// 添加任务,不需要加锁,任务队列中有锁m_taskQ->addTask(task);// 唤醒工作的线程pthread_cond_signal(&m_notEmpty);
}int ThreadPool::getAliveNumber()
{int threadNum = 0;pthread_mutex_lock(&m_lock);threadNum = m_aliveNum;pthread_mutex_unlock(&m_lock);return threadNum;
}int ThreadPool::getBusyNumber()
{int busyNum = 0;pthread_mutex_lock(&m_lock);busyNum = m_busyNum;pthread_mutex_unlock(&m_lock);return busyNum;
}// 工作线程任务函数
void* ThreadPool::worker(void* arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);// 一直不停的工作while (true){// 访问任务队列(共享资源)加锁pthread_mutex_lock(&pool->m_lock);// 判断任务队列是否为空, 如果为空工作线程阻塞while (pool->m_taskQ->taskNumber() == 0 && !pool->m_shutdown){cout << "thread " << to_string(pthread_self()) << " waiting..." << endl;// 阻塞线程pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);// 解除阻塞之后, 判断是否要销毁线程if (pool->m_exitNum > 0){pool->m_exitNum--;if (pool->m_aliveNum > pool->m_minNum){pool->m_aliveNum--;pthread_mutex_unlock(&pool->m_lock);pool->threadExit();}}}// 判断线程池是否被关闭了if (pool->m_shutdown){pthread_mutex_unlock(&pool->m_lock);pool->threadExit();}// 从任务队列中取出一个任务Task task = pool->m_taskQ->takeTask();// 工作的线程+1pool->m_busyNum++;// 线程池解锁pthread_mutex_unlock(&pool->m_lock);// 执行任务cout << "thread " << to_string(pthread_self()) << " start working..." << endl;task.function(task.arg);delete task.arg;task.arg = nullptr;// 任务处理结束cout << "thread " << to_string(pthread_self()) << " end working...";pthread_mutex_lock(&pool->m_lock);pool->m_busyNum--;pthread_mutex_unlock(&pool->m_lock);}return nullptr;
}// 管理者线程任务函数
void* ThreadPool::manager(void* arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);// 如果线程池没有关闭, 就一直检测while (!pool->m_shutdown){// 每隔5s检测一次sleep(5);// 取出线程池中的任务数和线程数量//  取出工作的线程池数量pthread_mutex_lock(&pool->m_lock);int queueSize = pool->m_taskQ->taskNumber();int liveNum = pool->m_aliveNum;int busyNum = pool->m_busyNum;pthread_mutex_unlock(&pool->m_lock);// 创建线程const int NUMBER = 2;// 当前任务个数>存活的线程数 && 存活的线程数<最大线程个数if (queueSize > liveNum && liveNum < pool->m_maxNum){// 线程池加锁pthread_mutex_lock(&pool->m_lock);int num = 0;for (int i = 0; i < pool->m_maxNum && num < NUMBER&& pool->m_aliveNum < pool->m_maxNum; ++i){if (pool->m_threadIDs[i] == 0){pthread_create(&pool->m_threadIDs[i], NULL, worker, pool);num++;pool->m_aliveNum++;}}pthread_mutex_unlock(&pool->m_lock);}// 销毁多余的线程// 忙线程*2 < 存活的线程数目 && 存活的线程数 > 最小线程数量if (busyNum * 2 < liveNum && liveNum > pool->m_minNum){pthread_mutex_lock(&pool->m_lock);pool->m_exitNum = NUMBER;pthread_mutex_unlock(&pool->m_lock);for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->m_notEmpty);}}}return nullptr;
}// 线程退出
void ThreadPool::threadExit()
{pthread_t tid = pthread_self();for (int i = 0; i < m_maxNum; ++i){if (m_threadIDs[i] == tid){cout << "threadExit() function: thread " << to_string(pthread_self()) << " exiting..." << endl;m_threadIDs[i] = 0;break;}}pthread_exit(NULL);
}

五、线程池实现难点

4.1 empty条件变量

判断任务队列是否满了

  • 阻塞休眠
    • 当worker函数消费者,消耗任务队列为空,empty条件变量让该woker线程阻塞休眠
  • 释放唤醒
    1. 当add函数添加新任务到队列中,empty条件变量唤醒阻塞woker线程,执行新任务
    2. 当threadPoolDestroy函数销毁线程池,shutdown标志为1,empty条件变量唤醒阻塞woker线程。woker线程醒来后发现shutdown标志为1,进行自杀
    3. 当manager函数判断当前woker线程过多,设置销毁线程数exitNum,empty条件变量唤醒阻塞woker线程。woker线程醒来后发现s销毁线程数exitNum 大于0,进行自杀

4.2 full条件变量

判断任务队列是否空了

  • 阻塞休眠
    • 当add函数发现任务队列满了,无法添加新任务时,full条件变量让该生产者线程(调用add函数的线程)阻塞休眠
  • 释放唤醒
    • 当worker函数从任务队列中取出新任务,任务队列有空间的时候,full条件变量唤醒阻塞生产者线程,创建新任务

相关文章:

Linux环境下实现并详细分析c/cpp线程池(附源码)

一、线程池原理 如果并发的线程数量很多&#xff0c;并且每个线程都是执行一个时间很短的任务就结束了&#xff0c;这样频繁创建线程就会大大降低系统的效率&#xff0c;因为频繁创建线程和销毁线程需要时间。 线程池是一种多线程处理形式&#xff0c;处理过程中将任务添加到…...

移动web(三)

her~~llo&#xff0c;我是你们的好朋友Lyle&#xff0c;是名梦想成为计算机大佬的男人&#xff01; 博客是为了记录自我的学习历程&#xff0c;加强记忆方便复习&#xff0c;如有不足之处还望多多包涵&#xff01;非常欢迎大家的批评指正。 媒体查询 目标&#xff1a;能够根据…...

macbook怎么运行exe文件 mac打开exe文件的三大方法

exe文件是Windows系统的可执行文件&#xff0c;虽然Mac系统上无法直接打开exe文件&#xff0c;但是你可以在Mac电脑上安装双系统或者虚拟机来实现mac电脑上运行exe文件。除了这两种方法之外&#xff0c;你还可以在Mac电脑上使用类虚拟机软件打开exe文件&#xff0c;这三种方法各…...

GoldenGate(OGG)高可用XAG部署

前言: 本文档主要描述通过Oracle Grid Infrastructure Agents (XAG)基于Oracle RAC实现GoldenGate(OGG)软件高可用的实施操作 环境信息&#xff1a; 源端 目标端 节点一IP 节点二IP 192.168.1.84 192.168.1.86 节点一IP 节点二IP 192.168.1.200 192.168.1.210 VIP 192.…...

如何使用Docker容器部署O2OA(翱途)开发平台与OnlyOffice的集成版本?

O2OA(翱途)开发平台[下称O2OA平台或者O2OA]默认可以和OnlyOffice进行集成来实现在线文档编辑以及流程集成。开发者可以直接安装O2OA官网的OnlyOfficeO2Server的Docker版本用于体验。本文将详细介绍如何安装O2OA OnlyOffice的Docker版本。OnlyOffice Docs Sever可以单独安装,O2…...

springboot复习(黑马)(持续更新)

学习目标基于SpringBoot框架的程序开发步骤熟练使用SpringBoot配置信息修改服务器配置基于SpringBoot的完成SSM整合项目开发一、SpringBoot简介1. 入门案例问题导入SpringMVC的HelloWord程序大家还记得吗&#xff1f;SpringBoot是由Pivotal团队提供的全新框架&#xff0c;其设计…...

K_A16_001 基于STM32等单片机驱动HX711称重模块 串口与OLED0.96双显示

K_A16_001 基于STM32等单片机驱动HX711称重模块 串口与OLED0.96双显示一、资源说明二、基本参数参数引脚说明三、驱动说明对应程序:四、部分代码说明1、接线引脚定义1.1、STC89C52RCHX711称重模块1.2、STM32F103C8T6HX711称重模块五、基础知识学习与相关资料下载六、视频效果展…...

单例模式之饿汉式

目录 1 单例模式的程序结构 2 饿汉式单例模式的实现 3 饿汉式线程安全 4 防止反射破坏单例 5 总结 单例模式&#xff08;Singleton Pattern&#xff09;是 Java 中最简单的设计模式之一。所谓单例就是在系统中只有一个该类的实例&#xff0c;并且提供一个访问该实例的全局…...

软件测试培训三个月,找到工作了11K,面试总结分享给大家

功能方面&#xff1a;问的最多的就是测试流程&#xff0c;测试计划包含哪些内容&#xff0c;公司人员配置&#xff0c;有bug开发认为不是 bug怎么处理&#xff0c;怎样才算是好的用例&#xff0c;测试用例设计方法&#xff08;等价类&#xff0c;边界值等概念方法&#xff09;&…...

Hbase备份与恢复工具Snapshot的基本概念与工作原理

数据库都有相对完善的备份与恢复功能。备份与恢复功能是数据库在数据意外丢失、损坏下的最后一根救命稻草。数据库定期备份、定期演练恢复是当下很多重要业务都在慢慢接受的最佳实践&#xff0c;也是数据库管理者推荐的一种管理规范。HBase数据库最核心的备份与恢复工具——Sna…...

RTOS中事件集的实现原理以及实用应用

事件集的原理 RTOS中事件集的实现原理是通过位掩码来实现的。事件集是一种用于在任务之间传递信号的机制。在RTOS中&#xff0c;事件集通常是一个32位的二进制位向量。每个位都代表一个特定的事件&#xff0c;例如信号、标志、定时器等。 当一个任务等待一个或多个事件时&…...

计及新能源出力不确定性的电气设备综合能源系统协同优化(Matlab代码实现)

运行视频及运行结果&#xff1a; 计及碳排放成本的电-气-热综合能源系纷充节点能价计算方法研究&#xff08;Matlab代码实现&#xff09;目录 第一部分 文献一《计及新能源出力不确定性的电气设备综合能源系统协同优化》 0 引言 &#xff11; 新能源出力不确定性处理 1.1 新…...

推荐几个超实用的开源自动化测试框架

有什么好的开源自动化测试框架可以推荐&#xff1f;为了让大家看文章不蒙圈&#xff0c;文章我将围绕3个方面来阐述&#xff1a; 1、通用自动化测试框架介绍 2、Java语言下的自动化测试框架 3、Python语言下的自动化测试框架 随着计算机技术人员的大量增加&#xff0c;通过编写…...

Mac 上解压缩 RAR 文件

RAR 在十几年前的互联网曾叱咤风云般的存在。在那时&#xff0c;你所能见到的压缩文件几乎都是 RAR 格式&#xff0c;大家在 Windows 上使用的压缩、解压缩软件基本都是 WinRAR。虽然这些年使用 RAR 格式的压缩包的情况在逐渐减少&#xff0c;但是你还是经常能在国内各种网站下…...

C++核心编程<引用>(2)

c核心编程<引用>2.引用2.1引用的基本使用2.2引用注意事项2.3引用做函数参数2.4引用做函数返回值2.5引用的本质2.6常量引用2.引用 2.1引用的基本使用 作用: 给变量起别名语法:数据类型 &别名 原名演示#include<iostream> using namespace std; void func();i…...

零入门kubernetes网络实战-20->golang编程syscall操作tun设备介绍

《零入门kubernetes网络实战》视频专栏地址 https://www.ixigua.com/7193641905282875942 本篇文章视频地址(稍后上传) 本篇文章主要是使用golang自带的syscall包来创建tun类型的虚拟网络设备。 注意&#xff1a; 目前只能使用syscall包来创建tun类型的虚拟设备。 tun虚拟网…...

springboot之自动配置

文章目录前言一、配置文件及自动配置原理1、配置文件2、yaml1、注解注入方式给属性赋值2、yaml给实体类赋值3、Properties给属性赋值二、springboot的多环境配置四、自动配置总结前言 1、自动装配原理 2、多种方式给属性赋值 3、多环境配置 4、自动配置 一、配置文件及自动配置…...

wxpython设计GUI:wxFormBuilder工具常用布局结构介绍之布局四—面板拼接式

python借助wxFormBuilder工具搭建基础的GUI界面—wxFormBuilder工具使用介绍&#xff1a;https://blog.csdn.net/Logintern09/article/details/126685315 布局四&#xff1a;面板拼接式&#xff0c;先Panel面板构图&#xff0c;再使用程序代码在Frame框架上拼接面板 下面讲一下…...

全网最全之接口测试【加密解密攻防完整版】实战教程详解

看视频讲的更详细&#xff1a;https://www.bilibili.com/video/BV1zr4y1E7V5/? 一、对称加密 对称加密算法是共享密钥加密算法&#xff0c;在加密解密过程中&#xff0c;使用的密钥只有一个。发送和接收双方事先都知道加密的密钥&#xff0c;均使用这个密钥对数据进行加密和解…...

Python - 目录文件(OS模块) 常用操作

目录os模块的方法os.path()模块的方法使用示例示例一&#xff1a;简单使用示例二&#xff1a;获取文件夹下指定条件的文件os模块的方法 方法说明os.listdir(path)取得指定文件夹下的文件列表os.mkdir(path)创建一个名为path的文件夹os.open(file, flags)打开一个文件&#xff…...

变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析

一、变量声明设计&#xff1a;let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性&#xff0c;这种设计体现了语言的核心哲学。以下是深度解析&#xff1a; 1.1 设计理念剖析 安全优先原则&#xff1a;默认不可变强制开发者明确声明意图 let x 5; …...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

工业安全零事故的智能守护者:一体化AI智能安防平台

前言&#xff1a; 通过AI视觉技术&#xff0c;为船厂提供全面的安全监控解决方案&#xff0c;涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面&#xff0c;能够实现对应负责人反馈机制&#xff0c;并最终实现数据的统计报表。提升船厂…...

在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能

下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能&#xff0c;包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...

【Java_EE】Spring MVC

目录 Spring Web MVC ​编辑注解 RestController RequestMapping RequestParam RequestParam RequestBody PathVariable RequestPart 参数传递 注意事项 ​编辑参数重命名 RequestParam ​编辑​编辑传递集合 RequestParam 传递JSON数据 ​编辑RequestBody ​…...

c#开发AI模型对话

AI模型 前面已经介绍了一般AI模型本地部署&#xff0c;直接调用现成的模型数据。这里主要讲述讲接口集成到我们自己的程序中使用方式。 微软提供了ML.NET来开发和使用AI模型&#xff0c;但是目前国内可能使用不多&#xff0c;至少实践例子很少看见。开发训练模型就不介绍了&am…...

Caliper 配置文件解析:config.yaml

Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...

Android Bitmap治理全解析:从加载优化到泄漏防控的全生命周期管理

引言 Bitmap&#xff08;位图&#xff09;是Android应用内存占用的“头号杀手”。一张1080P&#xff08;1920x1080&#xff09;的图片以ARGB_8888格式加载时&#xff0c;内存占用高达8MB&#xff08;192010804字节&#xff09;。据统计&#xff0c;超过60%的应用OOM崩溃与Bitm…...

Maven 概述、安装、配置、仓库、私服详解

目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...

AI,如何重构理解、匹配与决策?

AI 时代&#xff0c;我们如何理解消费&#xff1f; 作者&#xff5c;王彬 封面&#xff5c;Unplash 人们通过信息理解世界。 曾几何时&#xff0c;PC 与移动互联网重塑了人们的购物路径&#xff1a;信息变得唾手可得&#xff0c;商品决策变得高度依赖内容。 但 AI 时代的来…...