当前位置: 首页 > news >正文

一文搞懂系列——Linux C线程池技术

背景

最近在走读诊断项目代码时,发现其用到了线程池技术,感觉耳目一新。以前基本只是听过线程池,但是并没有实际应用。对它有一丝的好奇,于是趁这个机会深入了解一下线程池的实现原理。

线程池的优点

线程池出现的背景,其实对应CPU性能优化——“瑞士军刀“文章中提到的短时应用。

即短时间内通过创建线程处理大量请求,但是请求业务的执行时间过短,会造成一些缺陷。

  • 浪费系统资源。比如我们创建一个线程,再销毁一个线程耗时10ms,但是业务的执行时间只有10ms。这就导致系统有效利用率较低。
  • 系统不稳定。如果短时间内,来了大量的请求,每一个请求都通过创建线程的方式执行。可能存在瞬时负载很高,请求响应降低,从而导致系统不稳定。

于是我们可以通过线程池技术,减少线程创建和消耗的耗时,提高系统的资源利用;控制线程并行数量,确保系统的稳定性;

线程池实现

线程池的核心包括以下内容:

  • 线程池任务节点结构。
  • 线程池控制器。
  • 线程池的控制流程。

线程池任务节点结构

线程池任务结点用来保存用户投递过来的的任务,并放入线程池中的线程来执行,任务结构如下:

struct worker_t {void * (* process)(void * arg); /*回调函数*/int    paratype;                /*函数类型(预留)*/void * arg;                     /*回调函数参数*/struct worker_t * next;         /*链接下一个任务节点*/
};

线程池控制器

线程池控制器用来对线程池进行控制管理,描述当前线程池的最基本信息,包括任务的投递,线程池状态的更新与查询,线程池的销毁等,其结构如下:

/*线程控制器*/
struct CThread_pool_t {pthread_mutex_t queue_lock;     /*互斥锁*/pthread_cond_t  queue_ready;    /*条件变量*/worker_t * queue_head;          /*任务节点链表 保存所有投递的任务*/int shutdown;                   /*线程池销毁标志 1-销毁*/pthread_t * threadid;           /*线程ID*/int max_thread_num;             /*线程池可容纳最大线程数*/int current_pthread_num;        /*当前线程池存放的线程*/int current_pthread_task_num;   /*当前已经执行任务和已分配任务的线程数目和*/int current_wait_queue_num;     /*当前等待队列的的任务数目*/int free_pthread_num;           /*线程池允许最大的空闲线程数/*//***  function:       ThreadPoolAddWorkUnlimit*  description:    向线程池投递任务*  input param:    pthis   线程池指针*                  process 回调函数*                  arg     回调函数参数*  return Valr:    0       成功*                  -1      失败*/     int (* AddWorkUnlimit)(void * pthis, void * (* process)(void * arg), void * arg);/***  function:       ThreadPoolAddWorkLimit*  description:    向线程池投递任务,无空闲线程则阻塞*  input param:    pthis   线程池指针*                  process 回调函数*                  arg     回调函数参数*  return Val:     0       成功*                  -1      失败*/     int (* AddWorkLimit)(void * pthis, void * (* process)(void * arg), void * arg);/***  function:       ThreadPoolGetThreadMaxNum*  description:    获取线程池可容纳的最大线程数*  input param:    pthis   线程池指针*/     int (* GetThreadMaxNum)(void * pthis);/***  function:       ThreadPoolGetCurrentThreadNum*  description:    获取线程池存放的线程数*  input param:    pthis   线程池指针*  return Val:     线程池存放的线程数*/     int (* GetCurrentThreadNum)(void * pthis);/***  function:       ThreadPoolGetCurrentTaskThreadNum*  description:    获取当前正在执行任务和已经分配任务的线程数目和*  input param:    pthis   线程池指针*  return Val:     当前正在执行任务和已经分配任务的线程数目和*/     int (* GetCurrentTaskThreadNum)(void * pthis);/***  function:       ThreadPoolGetCurrentWaitTaskNum*  description:    获取线程池等待队列任务数*  input param:    pthis   线程池指针*  return Val:     等待队列任务数*/     int (* GetCurrentWaitTaskNum)(void * pthis);/***  function:       ThreadPoolDestroy*  description:    销毁线程池*  input param:    pthis   线程池指针*  return Val:     0       成功*                  -1      失败*/     int (* Destroy)(void * pthis);    
};

线程池的控制流程

线程池的控制流程可以分为三个步骤:

  1. 线程池创建。即创建max_num个线程ThreadPoolRoutine,即空闲线程:
/***  function:       ThreadPoolConstruct*  description:    构建线程池*  input param:    max_num   线程池可容纳的最大线程数*                  free_num  线程池允许存在的最大空闲线程,超过则将线程释放回操作系统*  return Val:     线程池指针                 */     
CThread_pool_t * 
ThreadPoolConstruct(int max_num, int free_num)
{int i = 0;CThread_pool_t * pool = (CThread_pool_t *)malloc(sizeof(CThread_pool_t));if(NULL == pool)return NULL;memset(pool, 0, sizeof(CThread_pool_t));/*初始化互斥锁*/pthread_mutex_init(&(pool->queue_lock), NULL);/*初始化条件变量*/pthread_cond_init(&(pool->queue_ready), NULL);pool->queue_head                = NULL;pool->max_thread_num            = max_num; // 线程池可容纳的最大线程数pool->current_wait_queue_num    = 0;pool->current_pthread_task_num  = 0;pool->shutdown                  = 0;pool->current_pthread_num       = 0;pool->free_pthread_num          = free_num; // 线程池允许存在最大空闲线程pool->threadid                  = NULL;pool->threadid                  = (pthread_t *)malloc(max_num*sizeof(pthread_t));/*该函数指针赋值*/pool->AddWorkUnlimit            = ThreadPoolAddWorkUnlimit;pool->AddWorkLimit              = ThreadPoolAddWorkLimit;pool->Destroy                   = ThreadPoolDestroy;pool->GetThreadMaxNum           = ThreadPoolGetThreadMaxNum;pool->GetCurrentThreadNum       = ThreadPoolGetCurrentThreadNum;pool->GetCurrentTaskThreadNum   = ThreadPoolGetCurrentTaskThreadNum;pool->GetCurrentWaitTaskNum     = ThreadPoolGetCurrentWaitTaskNum;for(i=0; i<max_num; i++) {pool->current_pthread_num++;    // 当前池中的线程数/*创建线程*/pthread_create(&(pool->threadid[i]), NULL, ThreadPoolRoutine, (void *)pool);usleep(1000);        }return pool;
}
  1. 投递任务。即将任务生产者,将任务节点投入线程池中。实现如下:
/***  function:       ThreadPoolAddWorkLimit*  description:    向线程池投递任务,无空闲线程则阻塞*  input param:    pthis   线程池指针*                  process 回调函数*                  arg     回调函数参数*  return Val:     0       成功*                  -1      失败*/     
int
ThreadPoolAddWorkLimit(void * pthis, void * (* process)(void * arg), void * arg)
{ // int FreeThreadNum = 0;// int CurrentPthreadNum = 0;CThread_pool_t * pool = (CThread_pool_t *)pthis;/*为添加的任务队列节点分配内存*/worker_t * newworker  = (worker_t *)malloc(sizeof(worker_t)); if(NULL == newworker) return -1;newworker->process  = process;  // 回调函数,在线程ThreadPoolRoutine()中执行newworker->arg      = arg;      // 回调函数参数newworker->next     = NULL;      pthread_mutex_lock(&(pool->queue_lock));/*插入新任务队列节点*/worker_t * member = pool->queue_head;   // 指向任务队列链表整体if(member != NULL) {while(member->next != NULL) // 队列中有节点member = member->next;  // member指针往后移动member->next = newworker;   // 插入到队列链表尾部} else pool->queue_head = newworker; // 插入到队列链表头assert(pool->queue_head != NULL);pool->current_wait_queue_num++; // 等待队列加1/*空闲的线程= 当前线程池存放的线程 - 当前已经执行任务和已分配任务的线程数目和*/int FreeThreadNum = pool->current_pthread_num - pool->current_pthread_task_num;/*如果没有空闲线程且池中当前线程数不超过可容纳最大线程*/if((0 == FreeThreadNum) && (pool->current_pthread_num < pool->max_thread_num)) {  //-> 条件为真进行新线程创建int CurrentPthreadNum = pool->current_pthread_num;/*新增线程*/pool->threadid = (pthread_t *)realloc(pool->threadid, (CurrentPthreadNum+1) * sizeof(pthread_t));pthread_create(&(pool->threadid[CurrentPthreadNum]),NULL, ThreadPoolRoutine, (void *)pool);/*当前线程池中线程总数加1*/                                   pool->current_pthread_num++;/*分配任务线程数加1*/pool->current_pthread_task_num++;pthread_mutex_unlock(&(pool->queue_lock));/*发送信号给一个处与条件阻塞等待状态的线程*/pthread_cond_signal(&(pool->queue_ready));return 0;}pool->current_pthread_task_num++;pthread_mutex_unlock(&(pool->queue_lock));/*发送信号给一个处与条件阻塞等待状态的线程*/pthread_cond_signal(&(pool->queue_ready));
//  usleep(10);  //看情况  return 0;
}
  1. 线程执行。即每一个线程的执行逻辑。实现如下:
/***  function:       ThreadPoolRoutine*  description:    线程池中执行的线程*  input param:    arg  线程池指针*/     
void * 
ThreadPoolRoutine(void * arg)
{CThread_pool_t * pool = (CThread_pool_t *)arg;while(1) {/*上锁,pthread_cond_wait()调用会解锁*/pthread_mutex_lock(&(pool->queue_lock));/*队列没有等待任务*/while((pool->current_wait_queue_num == 0) && (!pool->shutdown)) {/*条件锁阻塞等待条件信号*/pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));}if(pool->shutdown) {pthread_mutex_unlock(&(pool->queue_lock));pthread_exit(NULL);         // 释放线程}assert(pool->current_wait_queue_num != 0);assert(pool->queue_head != NULL);pool->current_wait_queue_num--; // 等待任务减1,准备执行任务worker_t * worker = pool->queue_head;   // 去等待队列任务节点头pool->queue_head = worker->next;        // 链表后移     pthread_mutex_unlock(&(pool->queue_lock));(* (worker->process))(worker->arg);      // 执行回调函数pthread_mutex_lock(&(pool->queue_lock));pool->current_pthread_task_num--;       // 函数执行结束free(worker);   // 释放任务结点worker = NULL;if((pool->current_pthread_num - pool->current_pthread_task_num) > pool->free_pthread_num) {pthread_mutex_unlock(&(pool->queue_lock));break;  // 当线程池中空闲线程超过 free_pthread_num 则将线程释放回操作系统}pthread_mutex_unlock(&(pool->queue_lock));    }pool->current_pthread_num--;    // 当前线程数减1pthread_exit(NULL);             // 释放线程return (void *)NULL;
}

这个就是用来执行任务的线程,在初始化创建线程时所有线程都全部阻塞在pthread_cond_wait()处,此时的线程就为空闲线程,也就是线程被挂起,当收到信号并取得互斥锁时,表明任务投递过来
则获取等待队列里的任务结点并执行回调函数;函数执行结束后回去判断当前等待队列是否还有任务,有则接下去执行,否则重新阻塞回到空闲线程状态。

若我的内容对您有所帮助,还请关注我的公众号。不定期分享干活,剖析案例,也可以一起讨论分享。
我的宗旨:
踩完您工作中的所有坑并分享给您,让你的工作无bug,人生尽是坦途

在这里插入图片描述

总结

实际上,我觉得在诊断项目中,线程池技术是非必要的。因此它不会涉及到大量的请求,以及每一个请求处理,一般都会比较耗时。

参考:https://www.cnblogs.com/zhaoosheLBJ/p/9337291.html

相关文章:

一文搞懂系列——Linux C线程池技术

背景 最近在走读诊断项目代码时&#xff0c;发现其用到了线程池技术&#xff0c;感觉耳目一新。以前基本只是听过线程池&#xff0c;但是并没有实际应用。对它有一丝的好奇&#xff0c;于是趁这个机会深入了解一下线程池的实现原理。 线程池的优点 线程池出现的背景&#xf…...

stable diffusion代码学习笔记

前言&#xff1a;本文没有太多公式推理&#xff0c;只有一些简单的公式&#xff0c;以及公式和代码的对应关系。本文仅做个人学习笔记&#xff0c;如有理解错误的地方&#xff0c;请指出。 本文包含stable diffusion入门文献和不同版本的代码。 文献资源 本文学习的代码&…...

腾讯云服务器怎么买?两种购买方式更省钱

腾讯云服务器购买流程很简单&#xff0c;有两种购买方式&#xff0c;直接在官方活动上购买比较划算&#xff0c;在云服务器CVM或轻量应用服务器页面自定义购买价格比较贵&#xff0c;但是自定义购买云服务器CPU内存带宽配置选择范围广&#xff0c;活动上购买只能选择固定的活动…...

基于SpringBoot自定义控制是否需要开启定时功能

在基于SpringBoot的开发过程中&#xff0c;有时候会在应用中使用定时任务&#xff0c;然后服务器上启动定时任务&#xff0c;本地就不需要开启定时任务&#xff0c;使用一个参数进行控制&#xff0c;通过查资料得知非常简单。 参数配置 在application-dev.yml中加入如下配置 …...

“确定要在不复制其属性的情况下复制此文件?”解决方案(将U盘格式由FAT格式转换为NTFS格式)

文章目录 1.问题描述2.问题分析3.问题解决3.1 方法一3.2 方法二3.3 方法三 1.问题描述 从电脑上复制文件到U盘里会出现“确定要在不复制其属性的情况下复制此文件&#xff1f;”提示。 2.问题分析 如果这个文件在NTFS分区上&#xff0c;且存在特殊的安全属性。那么把它从NT…...

视频监控系统EasyCVR如何通过调用API接口查询和下载设备录像?

智慧安防平台EasyCVR是基于各种IP流媒体协议传输的视频汇聚和融合管理平台。视频流媒体服务器EasyCVR采用了开放式的网络结构&#xff0c;支持高清视频的接入和传输、分发&#xff0c;平台提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联…...

15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条

15.鸿蒙HarmonyOS App(JAVA)进度条与圆形进度条 progressBar2.setIndeterminate(true);//设置无限模式,运行查看动态效果 //创建并设置无限模式元素 ShapeElement element new ShapeElement(); element.setBounds(0,0,50,50); element.setRgbColor(new RgbColor(255,0,0)); …...

【FastAPI】路径参数

路径参数 from fastapi import FastAPIapp FastAPI()app.get("/items/{item_id}") async def read_item(item_id):return {"item_id": item_id}其中{item_id}就为路径参数 运行以上程序当访问 &#xff1a;http://127.0.0.1:8000/items/fastapi时候 将会…...

【docker笔记】DockerFile

DockerFile Docker镜像结构的分层 镜像不是一个单一的文件&#xff0c;而是有多层构成。 容器其实是在镜像的最上面加了一层读写层&#xff0c;在运行容器里做的任何文件改动&#xff0c;都会写到这个读写层。 如果删除了容器&#xff0c;也就是删除了其最上面的读写层&…...

React项目搭建流程

第一步 利用脚手架创建ts类型的react项目&#xff1a; 执行如下的命令&#xff1a;create-react-app myDemo --template typescript &#xff1b; 第二步 清理项目目录结构&#xff1a; src/ index.tsx, app.txs, react-app-env.d.ts public/index.ht…...

QT DAY1作业

1.QQ登录界面 头文件代码 #ifndef MYWIDGET_H #define MYWIDGET_H#include <QWidget> #include <QIcon> #include <QLabel> #include <QPushButton> #include <QMovie> #include <QLineEdit>class MyWidget : public QWidget {Q_OBJECTpu…...

Java后端开发——Mybatis实验

文章目录 Java后端开发——Mybatis实验一、MyBatis入门程序1.创建工程2.引入相关依赖3.数据库准备4.编写数据库连接信息配置文件5.创建POJO实体6.编写核心配置文件和映射文件 二、MyBatis案例&#xff1a;员工管理系统1.在mybatis数据库中创建employee表2.创建持久化类Employee…...

【UE Niagara 网格体粒子系列】02-自定义网格

目录 步骤 一、创建自定义网格体 二、创建Niagara系统 步骤 一、创建自定义网格体 1. 打开Blender&#xff0c;按下ShiftA来创建一个平面 将该平面旋转90 导出为fbx 设置导出选定的物体&#xff0c;这里命名为“SM_PlaneFaceCamera.fbx” 按H隐藏刚才创建的平面&#x…...

k8s 检测node节点内存使用率平衡调度脚本 —— 筑梦之路

直接上脚本&#xff1a; #! /bin/bash#对实际使用内存大于85%的机器停止调度&#xff0c;对实际使用内存小于70%的 关闭调度# 获取实际内存小于或等于70%的机器 memory_lt_70kubectl top nodes |awk NR>1{if($50<70) print $1} # 获取实际内存大于或等于85%的机器 memor…...

React Native集成到现有原生应用

本篇文章以MacOS环境开发iOS平台为例&#xff0c;记录一下在原生APP基础上集成React Native React Native中文网 详细介绍了搭建环境和集成RN的步骤。 环境搭建 必须安装的依赖有&#xff1a;Node、Watchman、Xcode 和 CocoaPods。 安装Homebrew Homebrew是一款Mac OS平台下…...

完全卸载grafana

先停掉grafana sudo systemctl stop grafana-server 查看要卸载的包的名字 yum list installed yum remove grafana-enterprise.x86_64 成功 删除grafana的数据目录 sudo rm -rf /etc/grafana/sudo rm -rf /usr/share/grafana/sudo rm -rf /var/lib/grafana/...

Vue2.组件通信

样式冲突 写在组件中的样式默认会全局生效。容易造成多个组件之间的样式冲突问题。 可以给组件加上scoped属性&#xff0c;让样式只作用于当前组件。 原理&#xff1a; 给当前组件模板的所有元素&#xff0c;加上一个自定义属性data-v-hash值&#xff0c;用以区分不同的组件。…...

CAS的超~详细介绍

什么是CAS CAS全称Compare and swap,是一种比较特殊的CPU指令. 字面意思:"比较并交换", 一个CAS涉及到以下操作: 我们假设内存中的原数据为V,旧的预期值A,需要修改的新值B. 1.比较A和V是否相等(比较) 2.如果相等,将B写入V.(交换) 3.返回操作是否成功. 伪代码 下面…...

Scott用户数据表的分析

Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 如果想要知道某个用户所有的数据表: select * from tab; 此时结果中一共返回了四张数据表&#xff0c;分别为部门表&#xff08;dept&#xff09; &#xff0c;员工表&#xff08;emp&a…...

网络基础学习(3):交换机

1.交换机结构 &#xff08;1&#xff09;网线接口和后面的电路部分加在一起称为一个端口&#xff0c;也就是说交换机的一个端口就相当于计算机上的一块网卡。 如果在计算机上安装多个网卡&#xff0c;并让网卡接收所有网络包&#xff0c;再安装具备交换机功能的软件&#xff0…...

Redis相关知识总结(缓存雪崩,缓存穿透,缓存击穿,Redis实现分布式锁,如何保持数据库和缓存一致)

文章目录 1.什么是Redis&#xff1f;2.为什么要使用redis作为mysql的缓存&#xff1f;3.什么是缓存雪崩、缓存穿透、缓存击穿&#xff1f;3.1缓存雪崩3.1.1 大量缓存同时过期3.1.2 Redis宕机 3.2 缓存击穿3.3 缓存穿透3.4 总结 4. 数据库和缓存如何保持一致性5. Redis实现分布式…...

涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战

“&#x1f916;手搓TuyaAI语音指令 &#x1f60d;秒变表情包大师&#xff0c;让萌系Otto机器人&#x1f525;玩出智能新花样&#xff01;开整&#xff01;” &#x1f916; Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制&#xff08;TuyaAI…...

CMake 从 GitHub 下载第三方库并使用

有时我们希望直接使用 GitHub 上的开源库,而不想手动下载、编译和安装。 可以利用 CMake 提供的 FetchContent 模块来实现自动下载、构建和链接第三方库。 FetchContent 命令官方文档✅ 示例代码 我们将以 fmt 这个流行的格式化库为例,演示如何: 使用 FetchContent 从 GitH…...

Unit 1 深度强化学习简介

Deep RL Course ——Unit 1 Introduction 从理论和实践层面深入学习深度强化学习。学会使用知名的深度强化学习库&#xff0c;例如 Stable Baselines3、RL Baselines3 Zoo、Sample Factory 和 CleanRL。在独特的环境中训练智能体&#xff0c;比如 SnowballFight、Huggy the Do…...

Rapidio门铃消息FIFO溢出机制

关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系&#xff0c;以下是深入解析&#xff1a; 门铃FIFO溢出的本质 在RapidIO系统中&#xff0c;门铃消息FIFO是硬件控制器内部的缓冲区&#xff0c;用于临时存储接收到的门铃消息&#xff08;Doorbell Message&#xff09;。…...

分布式增量爬虫实现方案

之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面&#xff0c;避免重复抓取&#xff0c;以节省资源和时间。 在分布式环境下&#xff0c;增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路&#xff1a;将增量判…...

Java线上CPU飙高问题排查全指南

一、引言 在Java应用的线上运行环境中&#xff0c;CPU飙高是一个常见且棘手的性能问题。当系统出现CPU飙高时&#xff0c;通常会导致应用响应缓慢&#xff0c;甚至服务不可用&#xff0c;严重影响用户体验和业务运行。因此&#xff0c;掌握一套科学有效的CPU飙高问题排查方法&…...

人机融合智能 | “人智交互”跨学科新领域

本文系统地提出基于“以人为中心AI(HCAI)”理念的人-人工智能交互(人智交互)这一跨学科新领域及框架,定义人智交互领域的理念、基本理论和关键问题、方法、开发流程和参与团队等,阐述提出人智交互新领域的意义。然后,提出人智交互研究的三种新范式取向以及它们的意义。最后,总结…...

[免费]微信小程序问卷调查系统(SpringBoot后端+Vue管理端)【论文+源码+SQL脚本】

大家好&#xff0c;我是java1234_小锋老师&#xff0c;看到一个不错的微信小程序问卷调查系统(SpringBoot后端Vue管理端)【论文源码SQL脚本】&#xff0c;分享下哈。 项目视频演示 【免费】微信小程序问卷调查系统(SpringBoot后端Vue管理端) Java毕业设计_哔哩哔哩_bilibili 项…...

scikit-learn机器学习

# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...