晋城两学一做网站/最近发生的热点新闻
线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念:
同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。
通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示的是一个线程在疯狂的抢票,这就导致了其他线程抢占不到临界资源而导致的饥饿问题。
所以在抢代码的代码逻辑中,我们需要保证各个线程以同步的方式进行抢票。那如何实现呢?就需要用到信号量:
条件变量的接口函数:
条件变量的使用:
#include <iostream>
#include <string>
#include <unistd.h>
#include <pthread.h>int tickets = 1000;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;void *start_routine(void *args)
{std::string name = static_cast<const char *>(args);while (true){pthread_mutex_lock(&mutex);pthread_cond_wait(&cond, &mutex); // 为什么要有mutex,后面就说// 判断暂时省略std::cout << name << " -> " << tickets << std::endl;tickets--;pthread_mutex_unlock(&mutex);}
}int main()
{// 通过条件变量控制线程的执行pthread_t t[5];for (int i = 0; i < 5; i++){char *name = new char[64];snprintf(name, 64, "thread %d", i + 1);pthread_create(t+i, nullptr, start_routine, name);}while (true){sleep(1);// pthread_cond_signal(&cond);pthread_cond_broadcast(&cond);std::cout << "main thread wakeup one thread..." << std::endl;}for (int i = 0; i < 5; i++){pthread_join(t[i], nullptr);}return 0;
}
条件变量的理解:
我们用一个例子来说明:
没有条件变量就好比是面试官在一个办公室面试,只要应聘者得到门外挂着的钥匙进来就可以面试。这时一个应聘者抢到了钥匙进入了办公室,面试结束后想要将钥匙归还时感觉自己面的不好,这时自己离钥匙也是最近的,所以刚出门又拿起了钥匙继续面试。这钥匙就导致了其他应聘者拿不到而这个应聘者在疯狂的面试,很明显这不合理。
但是有了条件变量就好比是面试官在外面设置了一个等待区,面试完的人必须归还钥匙并且再次回到等待队列的最末尾重新排队,这就保证了各个应聘者都有机会面试。
条件不满足的时候,线程必须去某些定义好的条件变量上进行等待
。
pthread_cond_wait第二个参数的意义(将锁传进去):该函数调用的时候,会以原子性的方式,将锁释放,并将线程挂起等待。用pthread_cond_sign将线程唤醒时,该线程会拿着你所传入的锁将代码继续向下运行。通过以上知识的铺垫让我们进入今天的正题:
生产者消费者模型
什么是生产者消费者模型呢?我们用一个形象的图来说明:
生产者消费者模型就是生产者可以将生产的数据存放在一个共享区,消费者从共享区中获得数据进行消费。由于是共享区,就要保证数据的安全性。当生产者生产一个数据时,另一个生产者不能在同一个数据上生产,不然会导致数据的不安全性。因此生产者和生产者之间是互斥关系的。同样消费者也不能同时消费同一个数据,不然可能会导致数据的不一致性,因此消费者和消费者之间是互斥关系的。如果这时一个生产者正在生产一个数据,而同时消费者也正在消费这个数据。就可能导致数据还没生产完全就已经被消费了,也会导致数据的不安全性。因此生产者和消费者之间是互斥关系,同时我们还希望生产者生产以后就有消费者来消费,消费者消费完一个就有生产者来生产,因此生产者和消费者之间是同步关系。
总结一下就是3种关系、两种角色、一个交易场所:
这一段特殊的缓冲区可以提前存放一批数据,这样消费者想消费的时候消费,生产者想什么时候生产就什么时候生产。解决了生产和消费两批线程忙闲不均的问题,是它们不具有强耦合的关系。
基于blockqueue的生产和消费模型
基于普通方式处理数据
代码实现:
//Main.cc#include <iostream>
#include <unistd.h>
#include <time.h>
#include "BlockQueue.hpp"using namespace std;void* consumer(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =0;bq->pop(&val);cout<<"我是消费者,我消费了一个数字:"<<val<<endl;sleep(1);}return nullptr;
}void* productor(void* args)
{BlockQueue<int>* bq =static_cast<BlockQueue<int>*>(args);while(true){int val =rand()%10;bq->push(val);cout<<"我是生产者,我生产了一个数字:"<<val<<endl;//sleep(1);}return nullptr;
}int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueue<int>* queue =new BlockQueue<int>(5);pthread_t c,p;pthread_create(&c,nullptr,consumer,(void*)queue);pthread_create(&p,nullptr,productor,(void*)queue);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}//BlockQueue.hpp
#include <queue>
#include <pthread.h>
#include <iostream>
using namespace std;const int NUM =5;
template<class T>
class BlockQueue
{
public:BlockQueue(const int numsize =NUM):_maxsize(NUM){pthread_mutex_init(&_lock,nullptr);pthread_cond_init(&_ccond,nullptr);pthread_cond_init(&_ccond,nullptr);}void push(const T& in){pthread_mutex_lock(&_lock);while(is_Full()){//这里说明阻塞队列是满的,需要让生产者等待pthread_cond_wait(&_pcond,&_lock);}//这里说明阻塞队列至少有一个空位可以插入_queue.push(in);//唤醒消费者去消费pthread_cond_signal(&_ccond);pthread_mutex_unlock(&_lock);}void pop(T* out){pthread_mutex_lock(&_lock);while(is_Empty()){//这里说明阻塞队列是空的,需要让消费者等待pthread_cond_wait(&_ccond,&_lock);}//这里说明阻塞队列至少有一个数据*out=_queue.front();_queue.pop();//唤醒生产者生产数据pthread_cond_signal(&_pcond);pthread_mutex_unlock(&_lock);}~BlockQueue(){pthread_mutex_destroy(&_lock);pthread_cond_destroy(&_ccond);pthread_cond_destroy(&_pcond);}private:bool is_Full(){return _queue.size()==_maxsize;}bool is_Empty(){return _queue.empty();}private:queue<T> _queue;int _maxsize;pthread_mutex_t _lock; //保护临界资源的锁pthread_cond_t _ccond; //消费者的条件变量pthread_cond_t _pcond; //生产者的条件变量
};
代码细节:
基于计算器任务的Task
我们得创建一个task.hpp,里面定义一个CalTask类:
class CalTask
{
public:using func_t =std::function<int(int,int,char)>;CalTask(){}CalTask(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){}std::string operator()(){int result =_callback(_x,_y,_op);char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d =%d",_x,_op,_y,result);return buffer;}std::string to_string(){char buffer[64];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}~CalTask(){}private:int _x;int _y;char _op;func_t _callback;};
生产者任务:
void* productor(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){int x =rand()%10;int y =rand()%10;char op =op_str[rand()%op_str.size()];CalTask task(x,y,op,mymath);bq->push(task);cout<<"productor task:"<<task.to_string()<<endl;//sleep(1);}return nullptr;
}
消费者任务:
void* consumer(void* args)
{BlockQueue<CalTask>* bq =static_cast<BlockQueue<CalTask>*>(args);while(true){CalTask t;bq->pop(&t);string result =t();cout<<"consumer result:"<<result<<endl;sleep(1);}return nullptr;
}
基于两个阻塞队列实现计算与存储:
将计算和存储的两个队列放进同一个类中:
template<class c,class s>
class BlockQueues
{public:BlockQueue<c>* c_bq; BlockQueue<s>* s_bq;
};
main函数:
int main()
{srand((unsigned long)time(nullptr)^getpid());BlockQueues<CalTask,SaveTask> bqs;bqs.c_bq =new BlockQueue<CalTask>;bqs.s_bq =new BlockQueue<SaveTask>;//BlockQueue<CalTask>* bqs =new BlockQueue<CalTask>(5);pthread_t c,p,s;pthread_create(&c,nullptr,consumer,(void*)&bqs);pthread_create(&p,nullptr,productor,(void*)&bqs);pthread_create(&s,nullptr,saver,(void*)&bqs);pthread_join(c,nullptr);pthread_join(p,nullptr);pthread_join(s,nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}
存储任务:
class SaveTask
{typedef std::function<void(const std::string&)> func_t;
public:SaveTask(){}SaveTask(const std::string &message, func_t func): _message(message), _func(func){}void operator()(){_func(_message);}
private:std::string _message;func_t _func;
};void Save(const std::string &message)
{const std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(!fp){std::cerr << "fopen error" << std::endl;return;}fputs(message.c_str(), fp);fputs("\n", fp);fclose(fp);
}
存储线程执行的任务:
void *saver(void *bqs_)
{BlockQueue<SaveTask> *save_bq = (static_cast<BlockQueues<CalTask, SaveTask> *>(bqs_))->s_bq;while(true){SaveTask t;save_bq->pop(&t);t();std::cout << "save thread,保存任务完成..." << std::endl; }return nullptr;
}
代码的执行结果:
生产者与消费者模型优势总结:
这个模型的优势不在于多个线程能够并发式的对共享资源里面的数据进行访问和处理,而是多个线程能够在加载任务和处理任务的时候进行并发处理。
在我们上面的代码逻辑中很简单,就是随机构造x和y构成一个任务最后放入阻塞队列中。而在实际情况中加载任务的时候没有那么简单,有时需要从网络或者数据库中加载,这就需要消耗很长的一段时间。这个模型的优势就是在于多个生产者能同时加载多个任务,随后竞争出一名生产者将任务放入共享资源(阻塞队列)中,然后在竞争出一名消费者取出任务。因此模型的优势不在于多个线程能并发的从阻塞队列中拿数据处理,而是在加载任务的时候做到并发节省时间。同理处理任务,当一个线程处理任务的同时,另一个线程仍可以从阻塞队列取出任务处理,不影响之前的进程处理任务,因此消费者处理任务的环节也做到了并发,实现了加载任务和处理任务的解耦,提高了整个程序的运行效率和速度。
到这里本章的内容就全部结束了,创作不易,希望大家多多点赞支持。
相关文章:

基于blockqueue的生产和消费模型
线程篇下讲的是基于阻塞队列的生产者消费者模型。在学习这个之前我们先了解一些其他概念: 同步:在保证数据安全的条件下,让线程按某种特定的顺序依次访问临界资源。 通过上一节的代码我们实现了一个多线程抢票的程序,但结果显示…...

Editors(Vim)
文章目录 Editors(Vim)学哪一个编辑器?Vim Philosophy of VimModal editing 模态编辑Basics 基础知识Inserting text 插入文本Buffers, tabs, and windows 缓冲区、选项卡和窗口Command-line 命令行 Vim’s interface is a programming language. Vim的接口是一种编…...

【Leetcode】134.加油站
一、题目 1、题目描述 在一条环路上有 n 个加油站,其中第 i 个加油站有汽油 gas[i] 升。 你有一辆油箱容量无限的的汽车,从第 i 个加油站开往第 i+1 个加油站需要消耗汽油 cost[i] 升。你从其中的一个加油站出发,开始时油箱为空。 给定两个整数数组 gas 和 cost,如果你…...

设计模式-建造者(生成器)模式
文章目录 简介建造者模式的核心概念产品(Product)建造者(Builder)指挥者(Director)建造者模式与其他设计模式的关系工厂模式和建造者模式uml对比 建造者模式的实现步骤建造者模式的应用场景spring中应用 建…...

内存泄露排查思路
1、泄露情况 启动闪退运行一段时间宕机 2、排查步骤 获取堆内存快照dump使用VisualVM分析dump文件通过查看堆信息的情况,定位内存溢出问题 jmap -dump:formatb,fileheap.hprof pid -XX:HeapDumpOnOutOfMemoryError -XX:HeapDumpPath输出路径 3、在VisualVM中分…...

kafka学习-概念与简单实战
目录 1、核心概念 消息和批次 Topic和Partition Replicas Offset broker和集群 生产者和消费者 2、开发实战 2.1、消息发送 介绍 代码实现 2.2、消息消费 介绍 代码实现 2.3、SpringBoot Kafka pom application.yaml KafkaConfig producer consumer 1、核心…...

爬虫进阶-反爬破解5(selenium的优势和点击操作+chrome的远程调试能力+通过Chrome隔离实现一台电脑登陆多个账号)
目录 一、selenium的优势和点击操作 二、chrome的远程调试能力 三、通过Chrome隔离实现一台电脑登陆多个账号 一、selenium的优势和点击操作 1.环境搭建 工具:Chrome浏览器chromedriverselenium win用户:chromedriver.exe放在python.exe旁边 MacO…...

音视频编码格式-AAC ADT
例子:config 1408 1408(16进制) : 0001 0100 0000 1000 audioObjectType(5bit)为 00010 , 即 2, profie (audioObjectType -1 ) AAC LC samplingFrequencyIndex (4bit) 为 1000 , 即 8 , 对应的采样频率为 16000 channelConfiguration (…...

【计算机网络】网络编程接口 Socket API 解读(3)
Socket 是网络协议栈暴露给编程人员的 API,相比复杂的计算机网络协议,API 对关键操作和配置数据进行了抽象,简化了程序编程。 本文讲述的 socket 内容源自 Linux 发行版 centos 9 上的 man 工具,和其他平台(比如 os-x …...

kafka知识小结
1.为什么分区数只能增加,不能减少? 按照Kafka现有的代码逻辑而言,此功能完全可以实现,不过也会使得代码的复杂度急剧增大。 另外实现此功能需要考虑的因素很多,比如删除掉的分区中的消息该作何处理? 如果随着分区一起消失则消息的可靠性得不到保障; 如果需要保留则又需…...

算法刷题记录-DP(LeetCode)
746. Min Cost Climbing Stairs 代码 int minCostClimbingStairs(vector<int>& cost) {if (cost.size()<2){return 0;}int cache[cost.size()1];cache[0]0;cache[1]0;for (int i 2; i < cost.size(); i) {cache[i] min(cache[i-2]cost[i-2],cache[i-1]cost[i…...

Springboot整合Neo4J图数据库
1.引入依赖 JDK11, neo4J4.4.23 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/> <!-- lookup parent …...

Unity 2018发布在iOS 16.3偶尔出现画面不动的问题
1)Unity 2018发布在iOS 16.3偶尔出现画面不动的问题 2)IL2CPP在Xcode下增量编译问题 3)帧同步实现PuppetMaster布娃娃系统的问题 这是第351篇UWA技术知识分享的推送,精选了UWA社区的热门话题,涵盖了UWA问答、社区帖子等…...

蠕虫病毒流量分析案例
背景 某供排水集团的网络管理员对其网络的健康状况持认可态度,表示网络运行正常,没有发现异常行为。然而,由于网络环境变得越来越复杂,仅凭借传统的网络经验已经不能全面了解网络情况。因此,我们为供排水集团安装了Ne…...

Transformer(一)—— Attention Batch Normalization
Transformer详解 一、RNN循环神经网络二、seq2seq模型三、Attention(注意力机制)四、Transformer4.1 self attention4.2 self-attention的变形——Multi-head Self-attention4.3 Masked Attention4.4 Positional Encoding4.5 Batch Normalization4.6 Lay…...

2023高教社杯数学建模C题思路代码 - 蔬菜类商品的自动定价与补货决策
# 1 赛题 在生鲜商超中,一般蔬菜类商品的保鲜期都比较短,且品相随销售时间的增加而变差, 大部分品种如当日未售出,隔日就无法再售。因此, 商超通常会根据各商品的历史销售和需 求情况每天进行补货。 由于商超销售的蔬菜…...

【C++漂流记】一文搞懂类与对象的封装
本篇文章主要说明了类与对象中封装的有关知识,包括属性和行为作为整体、访问权限、class与struct的区别、成员属性的私有化,希望这篇文章可以帮助你更好的了解类与对象这方面的知识。 文章目录 一、属性和行为作为整体二、访问权限三、class与struct的区…...

ctfshow 反序列化
PHP反序列化前置知识 序列化和反序列化 对象是不能在字节流中传输的,序列化就是把对象转化为字符串以便存储和传输,反序列化就是将字符串转化为对象 魔术方法 __construct() //构造,当对象new时调用 __wakeup() //执行unserialize()时&am…...

数据结构:线性表之-单向链表(无头)
目录 什么是单向链表 顺序表和链表的区别和联系 顺序表: 链表: 链表表示(单项)和实现 1.1 链表的概念及结构 1.2单链表(无头)的实现 所用文件 将有以下功能: 链表定义 创建新链表元素 尾插 头插 尾删 头删 查找-给一个节点的…...

为IT服务台构建自定义Zia操作
Zia是manageengine的商业人工智能助手,是ServiceDesk Plus Cloud的虚拟会话支持代理。使用Zia,您可以优化帮助台管理,还可以缩小最终用户与其帮助台之间的差距,Zia通过执行预配置的操作来帮助用户完成他们的服务台任务。 例如&…...
【C/C++】BMP格式32位转24位
问题 如题 解决方法 bmp文件格式参考:【C/C++】BITMAP格式分析_vc++ bitmap头文件_sunriver2000的博客-CSDN博客BITMAP文件大体上分成四个部分,如下表所示。文件部分长度(字节)位图文件头 Bitmap File Header14位图信息数据头 Bitmap Info Header40调色板 Palette4*n (n≥…...

合宙Air724UG LuatOS-Air LVGL API控件-滑动条 (Slider)
滑动条 (Slider) 滑动条看起来和进度条是有些是有些像,但不同的是滑动条可以进行数值选择。 示例代码 -- 回调函数 slider_event_cb function(obj, event)if event lvgl.EVENT_VALUE_CHANGED then local val (lvgl.slider_get_value(obj) or "0")..&…...

SQLAlchemy 封装的工具类,数据库pgsql(数据库连接池)
1.SQLAlchemy是什么? SQLAlchemy 是 Python 著名的 ORM 工具包。通过 ORM,开发者可以用面向对象的方式来操作数据库,不再需要编写 SQL 语句。 SQLAlchemy 支持多种数据库,除 sqlite 外,其它数据库需要安装第三方驱动。…...

【Git】Git 基础
Git 基础 参考 Git 中文文档 — https://git-scm.com/book/zh/v2 1.介绍 Git 是目前世界上最先进的分布式版本控制系统,有这么几个特点: 分布式:是用来保存工程源代码历史状态的命令行工具保存点:保存点可以追溯源码中的文件…...

腾讯云AI绘画:探究AI创意与技术的新边界
目录 一、2023的“网红词汇”——AI绘画二、智能文生图1、智能文生图的应用场景2、风格和配置的多样性3、输入一段话,腾讯云AI绘画给你生成一张图4、文本描述生成图像,惊艳全场 三、智能图生图:重新定义图像美学1、智能图生图的多元应用场景2…...

离线数仓同步数据1
用户行为表数据同步 2.1.4 日志消费Flume测试 [gpbhadoop104 ~]$ cd /opt/module/flume/ [gpbhadoop104 flume]$ cd job/ [gpbhadoop104 job]$ rm file_to_kafka.confcom.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder #定义组件 a1.sourcesr1 a1.channelsc1…...

c语言开篇---跟着视频学C语言
标识符 标识符必须声明定义,可以是变量、函数或其他实体。 Int是标识符吗? 不是,int是c语言关键词,不是随意命名的 C语言关键词如下: 常量 不需要被声明,不能赋值更改。 printf函数 printf是由print打印…...

本地yum源-如学
学不学? 如学~ 到底学不学? 如学~ 学? 如学~ 配置本地的镜像yum 使用到的 rpm 包 是根据centos8 里面自带的 在 /dev/cdrom 中包含着 一些系统自带的 rpm # 先将 /dev/cdrom 设备进行挂载 mkdir /up # 在…...

【实训】“宅急送”订餐管理系统(程序设计综合能力实训)
👀樊梓慕:个人主页 🎥个人专栏:《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》 🌝每一个不曾起舞的日子,都是对生命的辜负 前言 大一小学期,我迎来了人生中的第一次实训…...

openeuler上安装polarismesh集群
1、安装MySQL数据库 数据库连接地址10.10.10.168 用户root 密码123456 MySQL安装参考搭建DSS环境(六)之安装基础环境MySQL_linux安装dss_青春不流名的博客-CSDN博客 2、安装Redis集群 IPResid PORTSentinel PORTPASSWORDCluster NAME10.10.10.110637…...