【Linux】系统编程生产者消费者模型(C++)
目录
【1】生产消费模型
【1.1】为何要使用生产者消费者模型
【1.2】生产者消费者模型优点
【2】基于阻塞队列的生产消费者模型
【2.1】生产消费模型打印模型
【2.2】生产消费模型计算公式模型
【2.3】生产消费模型计算公式加保存任务模型
【2.3】生产消费模型多生产多消费
【1】生产消费模型
生产消费模型的321原则(便于记忆)。
【解释】
-
3种关系:生产者和生产者(互斥)、消费者和消费者(互斥)、生产者和消费者(互斥|[保证共享资源的安全性]|同步)。
-
2种角色:生产者线程、消费者线程。
-
1种交易场所:一段特定结构的缓冲区。
【1.1】为何要使用生产者消费者模型
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
【1.2】生产者消费者模型优点
-
生产线程和消费线程进行解耦。
-
支持并发。
-
提高效率。
-
支持生产和消费的一段时间的忙闲不均的问题。
【2】基于阻塞队列的生产消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
【2.1】生产消费模型打印模型
【makefile文件】
# 创建关联关系
cc=g++
standard=-std=c++11# 创建依赖关系
myBlockQueue:BlockQueue.cc $(cc) -o $@ $^ $(standard) -l pthread# 创建删除关系
.PHONY:clean
clean:rm -rf myBlockQueue
【BlockQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:/* 构造函数 */BlockQueue(const int maxCapacity = g_maxCapacity): _maxCapacity(maxCapacity){// 初始化锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者信号量pthread_cond_init(&_pCond, nullptr);// 初始化消费者信号量pthread_cond_init(&_cCond, nullptr);}/* 析构函数 */~BlockQueue() {// 销毁锁pthread_mutex_destroy(&_mutex);// 销毁生产者信号量pthread_cond_destroy(&_pCond);// 销毁消费者信号量pthread_cond_destroy(&_cCond);}public:/* 新增任务 */void PushTask(const T &in){// 加锁pthread_mutex_lock(&_mutex);// 判断是否满// 细节二:充当条件判断的语法必须是while,不能是if while(IsFull()) {// 如果容器满了,生产者就不能继续生产了!// 细节一:// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!pthread_cond_wait(&_pCond, &_mutex);}// 程序走到这里,容器一定是没有满的!_q.push(in);// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_cCond); // 通知消费者已经生产了!// 解锁pthread_mutex_unlock(&_mutex);}/* 执行任务 */void PopTask(T* out) {// 加锁pthread_mutex_lock(&_mutex);// 判断是否空// 细节二:与PushTask一致while(IsEmpty()) { // 如果容器空了,消费者就不能继续消费了!// 细节一:与PushTask一致pthread_cond_wait(&_cCond, &_mutex);}// 程序走到这里,容器一定是没有空的!*out = _q.front();_q.pop();// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_pCond); // 通知生产者已经消费了!// 解锁pthread_mutex_unlock(&_mutex);}private:/* 判断容器满 */bool IsFull() {return _q.size() == _maxCapacity;}/* 判断容器空 */bool IsEmpty() {return _q.empty();}private:std::queue<T> _q; // 存储容器int _maxCapacity; // 标识存储重启最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pCond; // 生产者条件变量pthread_cond_t _cCond; // 消费者条件变量
};
【BlockQueue.cc文件】
#include "BlockQueue.hpp"/* 定义生产者线程 */
void *Producer(void *args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);// 生产者进行生产while(true) {int num = rand() % 10 + 1;bq->PushTask(num);std:: cout << "生产者在生产:" << num << std::endl;sleep(1);}
}/* 定义消费者线程 */
void *Consumer(void *args)
{BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);// 消费者进行消费while(true) {int num = 0;bq->PopTask(&num);std:: cout << "消费者在消费:" << num << std::endl;sleep(2);}
}/* 入口函数 */
int main()
{// 随机数种子srand((unsigned int)time(nullptr) ^ getpid());// 共享资源BlockQueue<int> *bqs = new BlockQueue<int>();pthread_t tP;pthread_t tC;pthread_create(&tP, nullptr, Producer, (void *)bqs);pthread_create(&tC, nullptr, Consumer, (void *)bqs);pthread_join(tP, nullptr);pthread_join(tC, nullptr);return 0;
}
【2.2】生产消费模型计算公式模型
【makefile文件】
# 创建关联关系
cc=g++
standard=-std=c++11# 创建依赖关系
myBlockQueue:BlockQueue.cc$(cc) -o $@ $^ $(standard) -l pthread# 创建删除关系
.PHONY:clean
clean:rm -rf myBlockQueue
【BlockQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 5;
template <class T>
class BlockQueue
{
public:/* 构造函数 */BlockQueue(const int maxCapacity = g_maxCapacity): _maxCapacity(maxCapacity){// 初始化锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者信号量pthread_cond_init(&_pCond, nullptr);// 初始化消费者信号量pthread_cond_init(&_cCond, nullptr);}/* 析构函数 */~BlockQueue() {// 销毁锁pthread_mutex_destroy(&_mutex);// 销毁生产者信号量pthread_cond_destroy(&_pCond);// 销毁消费者信号量pthread_cond_destroy(&_cCond);}public:/* 新增任务 */void PushTask(const T &in){// 加锁pthread_mutex_lock(&_mutex);// 判断是否满// 细节二:充当条件判断的语法必须是while,不能是if while(IsFull()) {// 如果容器满了,生产者就不能继续生产了!// 细节一:// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!pthread_cond_wait(&_pCond, &_mutex);}// 程序走到这里,容器一定是没有满的!_q.push(in);// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_cCond); // 通知消费者已经生产了!// 解锁pthread_mutex_unlock(&_mutex);}/* 执行任务 */void PopTask(T* out) {// 加锁pthread_mutex_lock(&_mutex);// 判断是否空// 细节二:与PushTask一致while(IsEmpty()) { // 如果容器空了,消费者就不能继续消费了!// 细节一:与PushTask一致pthread_cond_wait(&_cCond, &_mutex);}// 程序走到这里,容器一定是没有空的!*out = _q.front();_q.pop();// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_pCond); // 通知生产者已经消费了!// 解锁pthread_mutex_unlock(&_mutex);}private:/* 判断容器满 */bool IsFull() {return _q.size() == _maxCapacity;}/* 判断容器空 */bool IsEmpty() {return _q.empty();}private:std::queue<T> _q; // 存储容器int _maxCapacity; // 标识存储重启最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pCond; // 生产者条件变量pthread_cond_t _cCond; // 消费者条件变量
};
【Task.hpp文件】
#pragma once
#include <iostream>
#include <string>
#include <functional>/* 仿函数类 */
class Task
{
public:using func_t = std::function<int(int, int, char)>;public:/* 构造函数 */Task() {}/* 构造函数 */Task(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callBalk(func){}public:/* 仿函数 */std::string operator()(){int result = _callBalk(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);return buffer;}public:/* 返回打印公式 */std::string ToTaskString(){char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callBalk;
};/* 任务执行的种类 */
int MyCalculate(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}else{result = x / y;}break;}case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}else{result = x % y;}break;}default:break;}return result;
}
【BlockQueue.cc文件】
#include "BlockQueue.hpp"
#include "Task.hpp"
class Task;
int MyCalculate(int x, int y, char op);const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);// 生产者进行生产while(true) {int x = rand() % 10 + 1;int y = rand() % 10 + 1;int op = rand() % oper.size();Task t(x, y, oper[op], MyCalculate);calBq->PushTask(t);std:: cout << "生产任务:" << t.ToTaskString() << std::endl;sleep(2);}
}/* 定义消费者线程 */
void *Consumer(void *args)
{BlockQueue<Task>* calBq = static_cast<BlockQueue<Task>*>(args);// 消费者进行消费while(true) {Task t;calBq->PopTask(&t);std:: cout << "消费任务:" << t() << std::endl;sleep(1);}
}/* 入口函数 */
int main()
{// 随机数种子srand((unsigned int)time(nullptr) ^ getpid());// 共享资源BlockQueue<Task> *bqs = new BlockQueue<Task>();pthread_t tP;pthread_t tC;pthread_create(&tP, nullptr, Producer, (void *)bqs);pthread_create(&tC, nullptr, Consumer, (void *)bqs);pthread_join(tP, nullptr);pthread_join(tC, nullptr);return 0;
}
【2.3】生产消费模型计算公式加保存任务模型
【makefile文件】
# 创建关联关系
cc=g++
standard=-std=c++11# 创建依赖关系
myBlockQueue:BlockQueue.cc$(cc) -o $@ $^ $(standard) -l pthread# 创建删除关系
.PHONY:clean
clean:rm -rf myBlockQueue
【BlockQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <ctime>
#include <cstdlib>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>/* 阻塞队列生产者与消费者模型 */
const int g_maxCapacity = 500;
template <class T>
class BlockQueue
{
public:/* 构造函数 */BlockQueue(const int maxCapacity = g_maxCapacity): _maxCapacity(maxCapacity){// 初始化锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者信号量pthread_cond_init(&_pCond, nullptr);// 初始化消费者信号量pthread_cond_init(&_cCond, nullptr);}/* 析构函数 */~BlockQueue() {// 销毁锁pthread_mutex_destroy(&_mutex);// 销毁生产者信号量pthread_cond_destroy(&_pCond);// 销毁消费者信号量pthread_cond_destroy(&_cCond);}public:/* 新增任务 */void PushTask(const T &in){// 加锁pthread_mutex_lock(&_mutex);// 判断是否满// 细节二:充当条件判断的语法必须是while,不能是if while(IsFull()) {// 如果容器满了,生产者就不能继续生产了!// 细节一:// pthread_cond_wait这个函数第二个参数,必须是我们正在使用的互斥锁!// pthread_cond_wait该函数调用的时候,会以原子性的方式,将锁释放,并将自己挂起!// pthread_cond_wait该函数在被唤醒返回的时候,会自动的重新获取你传入的锁!pthread_cond_wait(&_pCond, &_mutex);}// 程序走到这里,容器一定是没有满的!_q.push(in);// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_cCond); // 通知消费者已经生产了!// 解锁pthread_mutex_unlock(&_mutex);}/* 执行任务 */void PopTask(T* out) {// 加锁pthread_mutex_lock(&_mutex);// 判断是否空// 细节二:与PushTask一致while(IsEmpty()) { // 如果容器空了,消费者就不能继续消费了!// 细节一:与PushTask一致pthread_cond_wait(&_cCond, &_mutex);}// 程序走到这里,容器一定是没有空的!*out = _q.front();_q.pop();// 细节3:pthread_cond_signal这个函数可以放在临界区内部,也可以放在外部!pthread_cond_signal(&_pCond); // 通知生产者已经消费了!// 解锁pthread_mutex_unlock(&_mutex);}private:/* 判断容器满 */bool IsFull() {return _q.size() == _maxCapacity;}/* 判断容器空 */bool IsEmpty() {return _q.empty();}private:std::queue<T> _q; // 存储容器int _maxCapacity; // 标识存储重启最大容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pCond; // 生产者条件变量pthread_cond_t _cCond; // 消费者条件变量
};
【Task.hpp文件】
#pragma once
#include <iostream>
#include <string>
#include <functional>/* 计算任务 */
class CalTask
{
public:using func_t = std::function<int(int, int, char)>;public:/* 构造函数 */CalTask() {}/* 构造函数 */CalTask(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callBalk(func){}public:/* 仿函数 */std::string operator()(){int result = _callBalk(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _x, _op, _y, result);return buffer;}public:/* 返回打印公式 */std::string ToTaskString(){char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callBalk;
};/* 执行计算的方法 */
int MyCalculate(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}else{result = x / y;}break;}case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}else{result = x % y;}break;}default:break;}return result;
}/* 保存任务 */
class SaveTask
{
public:using func_t = std::function<void(const std::string &)>;public:/* 构造函数 */SaveTask() {}/* 构造函数 */SaveTask(const std::string &message, func_t func): _message(message), _callBalk(func){}public:/* 仿函数 */void operator()(){_callBalk(_message);}private:std::string _message;func_t _callBalk;
};/* 保存方法 */
void Save(const std::string& massage){std::string target = "./log.txt";FILE *fp = fopen(target.c_str(), "a+");if(fp == NULL){std::cerr << "fopen fail!" << std::endl;return;}fputs(massage.c_str(), fp);fputs("\n", fp);fclose(fp);
}
【BlockQueue.cc文件】
#include "BlockQueue.hpp"
#include "Task.hpp"/* 共享资源Queue封装 */
template <class C, class S>
class BlockQueues
{
public:BlockQueue<C> *c_bq;BlockQueue<S> *s_bq;
};const std::string oper = "+-*/%";
/* 定义生产者线程 */
void *Producer(void *args)
{BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;// 生产者进行生产while (true){int x = rand() % 10 + 1;int y = rand() % 10 + 1;int op = rand() % oper.size();CalTask t(x, y, oper[op], MyCalculate);calBq->PushTask(t);std::cout << "Producer-生产任务:" << t.ToTaskString() << std::endl;sleep(2);}return nullptr;
}/* 定义消费者线程 */
void *Consumer(void *args)
{BlockQueue<CalTask> *calBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->c_bq;BlockQueue<SaveTask> *serverBq = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;// 消费者进行消费while (true){CalTask t;calBq->PopTask(&t);std::string messgae = t();std::cout << "Consumer-消费任务:" << messgae << std::endl;SaveTask server(messgae, Save);serverBq->PushTask(server);std::cout << "Consumer-推送保存完成..." << std::endl;} return nullptr;
}/* 定义保存线程 */
void *Saver(void *args)
{BlockQueue<SaveTask> *saveQ = (static_cast<BlockQueues<CalTask, SaveTask> *>(args))->s_bq;while(true){SaveTask t;saveQ->PopTask(&t);t();std::cout << "SAVER-保存任务完成..." << std::endl;}return nullptr;
}#define T_PRODUCER 5
#define T_CONSUMER 10
/* 入口函数 */
int main()
{// 随机数种子srand((unsigned int)time(nullptr) ^ getpid());// 共享资源BlockQueues<CalTask, SaveTask> bqs;bqs.c_bq = new BlockQueue<CalTask>();bqs.s_bq = new BlockQueue<SaveTask>();// pthread_t tP;// pthread_t tC;// pthread_t tS;// pthread_create(&tP, nullptr, Producer, (void *)&bqs);// pthread_create(&tC, nullptr, Consumer, (void *)&bqs);// pthread_create(&tS, nullptr, Saver, (void *)&bqs);// pthread_join(tP, nullptr);// pthread_join(tC, nullptr);// pthread_join(tS, nullptr);// 创建生产者线程pthread_t tP[T_PRODUCER];for (int i = 0; i < T_PRODUCER; i++){pthread_create(&tP[i], nullptr, Producer, (void *)&bqs);}// 创建消费者线程pthread_t tC[T_CONSUMER];for (int i = 0; i < T_CONSUMER; i++){pthread_create(&tC[i], nullptr, Consumer, (void *)&bqs);}// 创建保存线程pthread_t tS;pthread_create(&tS, nullptr, Saver, (void *)&bqs);// 等待生产者线程回收for(int i = 0; i < T_PRODUCER; i++){pthread_join(*(tP + 1), nullptr);}// 等待消费者线程回收for(int i = 0; i < T_CONSUMER; i++){pthread_join(*(tC + 1), nullptr);}// 等待保存线程回收pthread_join(tS, nullptr);delete bqs.c_bq;delete bqs.s_bq;return 0;
}
【2.3】生产消费模型多生产多消费
【Makefile文件】
# 定义变量与参数字符串进行关联
cc=g++
standard=-std=c++11# 定义编译关系
myBackQueue: ThreadBackQueue.cc $(cc) -o $@ $^ $(standard) -lpthread# 定义命令
clean:rm -rf myBackQueue# 配置指令与系统指令分离
.PHONY: clean
【ThreadBackQueue.hpp文件】
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
#include "ThreadMutex.hpp"const int g_capacityMax = 100;/* 生产消费者模型封装类 */
template<class T>
class ThreadBackQueue
{
public:/* - 构造函数*/ThreadBackQueue(const int& capacity = g_capacityMax): _qCapacity(capacity){// 初始化互斥锁pthread_mutex_init(&_mutex, nullptr);// 初始化生产者条件变量pthread_cond_init(&_pCond, nullptr);// 初始化消费者条件变量pthread_cond_init(&_cCond, nullptr);}/* - 析构函数*/~ThreadBackQueue() {// 释放互斥锁pthread_mutex_destroy(&_mutex);// 释放生产者条件变量pthread_cond_destroy(&_pCond);// 释放消费者条件变量pthread_cond_destroy(&_cCond);}public:/* - 增加任务*/void Push(const T& in){// 加锁pthread_mutex_lock(&_mutex);// LockGuardMutex(&_mutex);// 判断是否满while(IsFull())pthread_cond_wait(&_pCond, &_mutex); // 去等待// 一定有空位置_q.push(in);// 一定有任务pthread_cond_signal(&_cCond);pthread_mutex_unlock(&_mutex);}/* - 处理任务*/void Pop(T* out){// 加锁pthread_mutex_lock(&_mutex);// LockGuardMutex(&_mutex);// 判断是否空while(IsEmpty())pthread_cond_wait(&_cCond, &_mutex); // 去等待// 一定有任务*out = _q.front(); _q.pop();// 一定有空位置if(GetTaskSize() == 1)pthread_cond_signal(&_pCond);pthread_mutex_unlock(&_mutex);}public: /* - 判断队列满*/bool IsFull(){return _q.size() == _qCapacity;}/* - 判断队列空*/bool IsEmpty(){return _q.empty();}/* - 获取队列中任务个数*/size_t GetTaskSize(){return _q.size();}private:std::queue<T> _q; // 消息队列缓冲区size_t _qCapacity; // 消息队列容量pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pCond; // 生产者条件变量pthread_cond_t _cCond; // 消费者条件变量
};
【ThreadTask.hpp文件】
#pragma once
#include <cstdio>
#include <iostream>
#include <string>
#include <functional>
#include "ThreadBackQueue.hpp"
class TaskCalculate;
class TaskSave;template<class C, class S>
class ThreadBackQueues
{
public:ThreadBackQueue<C>* _cTask;ThreadBackQueue<S>* _sTask;
};class TaskCalculate
{
private:// 定义仿函数using func_t = std::function<int(const int, const int, const char)>;public:/* - 无参构造函数 */TaskCalculate(){}/* - 带参数的构造函数*/TaskCalculate(func_t func, const int x, const int y, const char op): _func(func), _x(x), _y(y), _op(op){}public:/* - ()运算符重载*/std::string operator()(){int result = _func(_x, _y, _op);char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = %d", _x, _op, _y, result);return buffer;}public:std::string TaskString(){char buffer[64];snprintf(buffer, sizeof(buffer), "%d %c %d = ?", _x, _op, _y);return buffer;}private:int _x; // 第一个计算值int _y; // 第二个计算值char _op; // 第三个计算值func_t _func; // 仿函数类型
};int Calculate(const int x, const int y, const char op)
{int calRet = 0;switch(op){case '+':{calRet = x + y;break;}case '-':{calRet = x - y;break;}case '*':{calRet = x * y;break;}case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;calRet = -1;}else{calRet = x / y;}break;}case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;calRet = -1;}else{calRet = x % y;}break; }default:{break;}}return calRet;
};class TaskSave
{
private:using func_t = std::function<void(const std::string&)>;public:/* - 无参构造函数 */TaskSave() {}/* - 带参构造函数 */TaskSave(func_t func, const std::string& msg) : _func(func), _msg(msg){}public:/* - ()运算符重载*/void operator()(){_func(_msg);}private:std::string _msg; func_t _func;
};void FileSave(const std::string& msg)
{// 创建打开文件目录std::string target = "./Log.txt";// 打开文件FILE* fpath = fopen(target.c_str(), "a+");if(fpath == nullptr){std::cerr << "fopen fail!" << std::endl;return;}// 写入文件fputs(msg.c_str(), fpath);fputs("\n", fpath);// 关闭文件fclose(fpath);
}
【ThreadBase.hpp文件】
#pragma once
#include <cstdio>
#include <cassert>
#include <iostream>
#include <functional>
#include <string>#include <pthread.h>
class ThreadBase;/* 线程上下文数据封装类 */
class ThreadBaseConnectText
{
public:ThreadBaseConnectText(): _textThis(nullptr), _textArgs(nullptr){}
public: ThreadBase* _textThis;void* _textArgs;
};/* 基于原生线程库的线程封装类 */
class ThreadBase
{
private:const int ctNum = 64;public:// 定义仿函数using func_t = std::function<void*(void*)>;public:public:/* - 构造函数* - func: 线程回调函数* - args:线程回调函数参数* - num : 编写线程名称设定的编号 */ThreadBase(func_t func, void* args = nullptr, const int& num = 1): _threadCallBack(func), _threadArgs(args){ // 自定义线程名称char nameBuffer[ctNum];snprintf(nameBuffer, sizeof(nameBuffer), "thread-%d", num);_threadName = nameBuffer;// 创建线程连接上下文 - 手动释放内存 - 【01】ThreadBaseConnectText* connectText = new ThreadBaseConnectText();connectText->_textThis = this;connectText->_textArgs = _threadArgs;int state = pthread_create(&_threadId, nullptr, StartRoutine, (void*)connectText);assert(state == 0); (void)state;}/* - 析构函数*/~ThreadBase() {}public:/* - 线程等待*/void Join(){int state = pthread_join(_threadId, nullptr);assert(state == 0); (void)state; }public: /* - 获取线程名称*/std::string GetThreadName(){return _threadName;}/* - 获取线程Id*/std::string GetThreadId(){char buffer[ctNum];snprintf(buffer, sizeof(buffer), "0x%x", _threadId);return buffer;}public:/* - 线程函数*/static void* StartRoutine(void* args){ThreadBaseConnectText* connectText = static_cast<ThreadBaseConnectText*>(args);void* retVal = connectText->_textThis->Run(connectText->_textArgs);// 释放内存 - 【01】delete connectText;// 返回return retVal;}private:/* - StartRoutine专用函数(因为C/C++混编的原因)*/void* Run(void* args){// 调用回调线程return _threadCallBack(args);}private:std::string _threadName; // 线程名称pthread_t _threadId; // 线程Idfunc_t _threadCallBack; // 线程回调函数void* _threadArgs; // 线程回调函数参数
};
【ThreadMutex.hpp文件】
#pragma once
#include <pthread.h>/* 原生线程锁类封装 */
class Mutex
{
public:/* - 构造函数*/Mutex(pthread_mutex_t* mutex): _pMutex(mutex){}/* - 析构函数*/~Mutex() {}public:/* - 加锁函数*/void Lock() { pthread_mutex_lock(_pMutex); }/* - 解锁函数*/void UnLock() { pthread_mutex_unlock(_pMutex); }private:pthread_mutex_t* _pMutex; // 内部的线程锁
};class LockGuardMutex
{
public:/* - 构造函数*/LockGuardMutex(pthread_mutex_t* mutex): _mutex(mutex){_mutex.Lock();}/* - 析构函数*/~LockGuardMutex(){_mutex.UnLock();}private:Mutex _mutex;
};
【ThreadBackQueue.cc文件】
#include <ctime>
#include <iostream>
#include <string>
#include <memory>
#include <unistd.h>
#include "ThreadBase.hpp"
#include "ThreadBackQueue.hpp"
#include "ThreadTask.hpp"std::string g_ops = "+-*/%";/* - 生产者线程函数*/
void* ProducerThread(void* args)
{ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;// 生产while(true){int x = rand() % 100;int y = rand() % 100;int o = rand() % g_ops.size();TaskCalculate task(Calculate, x, y, g_ops[o]);tBQ->Push(task);std::cout << "生产者在生产任务-> " << "[" << task.TaskString() << "] - - 队列任务个数为:" << tBQ->GetTaskSize() << std::endl;sleep(1);}return nullptr;
}/* - 消费者线程函数*/
void* ConsumeThread(void* args)
{ThreadBackQueue<TaskCalculate>* tBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_cTask;ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;// 消费while(true){TaskCalculate calculateTask;tBQ->Pop(&calculateTask);std::string strRet = calculateTask();std::cout << "消费者在消费任务-> " << "[" << strRet << "] - - 队列任务个数为:" << tBQ->GetTaskSize() << std::endl;TaskSave saveTask(FileSave, strRet);sBQ->Push(saveTask);std::cout << "消费者在保存任务-> " << "[" << strRet << "] - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;sleep(3);}return nullptr;
}/* - 保存者线程函数*/
void* SaveThread(void* args)
{ThreadBackQueue<TaskSave>* sBQ = (static_cast<ThreadBackQueues<TaskCalculate, TaskSave>*>(args))->_sTask;while(true){// 消费保存任务TaskSave saveTask;sBQ->Pop(&saveTask);// 执行保存任务saveTask();// 保存完成任务std::cout << "保存任务完成"<< " - - 队列任务个数为:" << sBQ->GetTaskSize() << std::endl;}return nullptr;
}/* - 程序入口函数*/
int main()
{// 创建随机数种子srand((unsigned int)time(nullptr));// 创建共享资源ThreadBackQueues<TaskCalculate, TaskSave>* pBQ = new ThreadBackQueues<TaskCalculate, TaskSave>();pBQ->_cTask = new ThreadBackQueue<TaskCalculate>;pBQ->_sTask = new ThreadBackQueue<TaskSave>;// 创建生产者线程std::unique_ptr<ThreadBase> ptr_pt1(new ThreadBase(ProducerThread, (void*)pBQ, 1));std::unique_ptr<ThreadBase> ptr_pt2(new ThreadBase(ProducerThread, (void*)pBQ, 2));std::unique_ptr<ThreadBase> ptr_pt3(new ThreadBase(ProducerThread, (void*)pBQ, 3));std::unique_ptr<ThreadBase> ptr_pt4(new ThreadBase(ProducerThread, (void*)pBQ, 4));std::unique_ptr<ThreadBase> ptr_pt5(new ThreadBase(ProducerThread, (void*)pBQ, 5));std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt1->GetThreadName() << " 线程Id:" << ptr_pt1->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt2->GetThreadName() << " 线程Id:" << ptr_pt2->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt3->GetThreadName() << " 线程Id:" << ptr_pt3->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt4->GetThreadName() << " 线程Id:" << ptr_pt4->GetThreadId() << std::endl;std::cout << "创建生产者线程完成-> 线程名:" << ptr_pt5->GetThreadName() << " 线程Id:" << ptr_pt5->GetThreadId() << std::endl;sleep(10);// 创建消费者线程std::unique_ptr<ThreadBase> ptr_ct1(new ThreadBase(ConsumeThread, (void*)pBQ, 11));std::unique_ptr<ThreadBase> ptr_ct2(new ThreadBase(ConsumeThread, (void*)pBQ, 12));std::unique_ptr<ThreadBase> ptr_ct3(new ThreadBase(ConsumeThread, (void*)pBQ, 13));std::unique_ptr<ThreadBase> ptr_ct4(new ThreadBase(ConsumeThread, (void*)pBQ, 14));std::unique_ptr<ThreadBase> ptr_ct5(new ThreadBase(ConsumeThread, (void*)pBQ, 15));std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct1->GetThreadName() << " 线程Id:" << ptr_ct1->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct2->GetThreadName() << " 线程Id:" << ptr_ct2->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct3->GetThreadName() << " 线程Id:" << ptr_ct3->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct4->GetThreadName() << " 线程Id:" << ptr_ct4->GetThreadId() << std::endl;std::cout << "创建消费者线程完成-> 线程名:" << ptr_ct5->GetThreadName() << " 线程Id:" << ptr_ct5->GetThreadId() << std::endl;sleep(10);// 创建保存者线程std::unique_ptr<ThreadBase> ptr_st1(new ThreadBase(SaveThread, (void*)pBQ, 21));std::unique_ptr<ThreadBase> ptr_st2(new ThreadBase(SaveThread, (void*)pBQ, 22));std::unique_ptr<ThreadBase> ptr_st3(new ThreadBase(SaveThread, (void*)pBQ, 23));std::unique_ptr<ThreadBase> ptr_st4(new ThreadBase(SaveThread, (void*)pBQ, 24));std::unique_ptr<ThreadBase> ptr_st5(new ThreadBase(SaveThread, (void*)pBQ, 25));std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st1->GetThreadName() << " 线程Id:" << ptr_st1->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st2->GetThreadName() << " 线程Id:" << ptr_st2->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st3->GetThreadName() << " 线程Id:" << ptr_st3->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st4->GetThreadName() << " 线程Id:" << ptr_st4->GetThreadId() << std::endl;std::cout << "创建保存者者线程完成-> 线程名:" << ptr_st5->GetThreadName() << " 线程Id:" << ptr_st5->GetThreadId() << std::endl;// 等待线程结束ptr_pt1->Join();ptr_pt2->Join();ptr_pt3->Join();ptr_pt4->Join();ptr_pt5->Join();ptr_ct1->Join();ptr_ct2->Join();ptr_ct3->Join();ptr_ct4->Join();ptr_ct5->Join();ptr_st1->Join();ptr_st2->Join();ptr_st3->Join();ptr_st4->Join();ptr_st5->Join();delete pBQ->_cTask;delete pBQ->_sTask;delete pBQ;return 0;
}
相关文章:
【Linux】系统编程生产者消费者模型(C++)
目录 【1】生产消费模型 【1.1】为何要使用生产者消费者模型 【1.2】生产者消费者模型优点 【2】基于阻塞队列的生产消费者模型 【2.1】生产消费模型打印模型 【2.2】生产消费模型计算公式模型 【2.3】生产消费模型计算公式加保存任务模型 【2.3】生产消费模型多生产多…...
【数据结构】图的应用:最小生成树;最短路径;有向无环图描述表达式;拓扑排序;逆拓扑排序;关键路径
目录 1、最小生成树 1.1 概念 1.2 普利姆算法(Prim) 1.3 克鲁斯卡尔算法(Kruskal) 2、最短路径 2.1 迪杰斯特拉算法(Dijkstra) 2.2 弗洛伊德算法(Floyd) 2.3 BFS算法&…...
大数据驱动业务增长:数据分析和洞察力的新纪元
文章目录 大数据的崛起大数据的特点大数据技术 大数据驱动业务增长1. 洞察力和决策支持2. 个性化营销3. 风险管理4. 产品创新 大数据分析的新纪元1. 云计算和大数据示例代码:使用AWS的Elastic MapReduce(EMR)进行大数据分析。 2. 人工智能和机…...
科技云报道:分布式存储红海中,看天翼云HBlock如何突围?
科技云报道原创。 过去十年,随着技术的颠覆性创新和新应用场景的大量涌现,企业IT架构出现了稳态和敏态的混合化趋势。 在持续产生海量数据的同时,这些新应用、新场景在基础设施层也普遍基于敏态的分布式架构构建,从而对存储技术…...
Java高级-动态代理
动态代理 1.介绍2.案例 1.介绍 public interface Star {String sing(String name);void dance(); }public class BigStar implements Star{private String name;public BigStar(String name) {this.name name;}public String sing(String name) {System.out.println(this.name…...
时序预测 | MATLAB实现POA-CNN-LSTM鹈鹕算法优化卷积长短期记忆神经网络时间序列预测
时序预测 | MATLAB实现POA-CNN-LSTM鹈鹕算法优化卷积长短期记忆神经网络时间序列预测 目录 时序预测 | MATLAB实现POA-CNN-LSTM鹈鹕算法优化卷积长短期记忆神经网络时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 MATLAB实现POA-CNN-LSTM鹈鹕算法优化卷积长短…...
n个不同元素进栈,求出栈元素的【不同排列】以及【排列的数量】?
我在网上看的博客大部分是告诉你这是卡特兰数,然后只给出了如何求解有多少种排列,没有给出具体排列是怎么样的。如果你还不知道卡特兰数,请查看:https://leetcode.cn/circle/discuss/lWYCzv/ 这里记录一下如何生成每种具体的排列…...
Python中TensorFlow的长短期记忆神经网络(LSTM)、指数移动平均法预测股票市场和可视化...
原文链接:http://tecdat.cn/?p23689 本文探索Python中的长短期记忆(LSTM)网络,以及如何使用它们来进行股市预测(点击文末“阅读原文”获取完整代码数据)。 相关视频 在本文中,你将看到如何使用…...
多线程的学习第二篇
多线程 线程是为了解决并发编程引入的机制. 线程相比于进程来说,更轻量 ~~ 更轻量的体现: 创建线程比创建进程,开销更小销毁线程比销毁进程,开销更小调度线程比调度进程,开销更小 进程是包含线程的. 同一个进程里的若干线程之间,共享着内存资源和文件描述符表 每个线程被独…...
git之撤销工作区的修改和版本回溯
有时候在工作区做了一些修改和代码调试不想要了,可如下做 (1)步骤1:删除目录代码,确保.git目录不能修改 (2)git log 得到相关的commit sha值 可配合git reflog 得到相要的sha值 (3)执行git reset --hard sha值,可以得到时间轴任意版本的代码 git reset --hard sha值干净的代…...
sed awk使用简介
简介 本文主要介绍 Linux 系统的两个神级工具:sed 和 awk ,他们是Linux高手们必备的技能,很值得我们去研究的东西。 这里是我在网上书上收集的相关资料,因为这两个工具很有名也很重要,所以这些资料会帮助我更好的了解…...
竞赛选题 基于深度学习的人脸识别系统
前言 🔥 优质竞赛项目系列,今天要分享的是 基于深度学习的人脸识别系统 该项目较为新颖,适合作为竞赛课题方向,学长非常推荐! 🧿 更多资料, 项目分享: https://gitee.com/dancheng-senior/…...
idea Terminal 回退历史版本 Git指令 git reset
——————强制回滚历史版本—————— 一、idea Terminal 第一步:复制版本号 (右击项目–> Git --> Show History -->选中要回退的版本–>Copy Revision Number,直接复制;) 第二步:ide…...
华为云云耀云服务器L实例评测|华为云上安装监控服务Prometheus三件套安装
文章目录 华为云云耀云服务器L实例评测|华为云上试用监控服务Prometheus一、监控服务Prometheus三件套介绍二、华为云主机准备三、Prometheus安装四、Grafana安装五、alertmanager安装六、三个服务的启停管理1. Prometheus、Alertmanager 和 Grafana 启动顺序2. 使用…...
C语言基础知识点(八)联合体和大小端模式
以下程序的输出是() union myun {struct { int x, y, z;} u;int k; } a; int main() {a.u.x 4;a.u.y 5;a.u.z 6;a.k 0;printf("%d\n", a.u.x); } 小端模式 数据的低位放在低地址空间,数据的高位放在高地址空间 简记ÿ…...
一个线程运行时发生异常会怎样?
如果一个线程在运行时发生异常而没有被捕获(即未被适当的异常处理代码处理),则会导致以下几种情况之一: 线程终止:线程会立即终止其执行,并将异常信息打印到标准错误输出(System.err)。这通常包括异常的类型、堆栈跟踪信息以及异常消息。 ThreadDeath 异常:在某些情况…...
CSS中去掉li前面的圆点方法
1. 引言 在网页开发中,我们经常会使用无序列表(<ul>)来展示一系列的项目。默认情况下,每个列表项(<li>)前面都会有一个圆点作为标记。然而,在某些情况下,我们可能希望去…...
Python:获取当前目录下所有文件夹名称及文件夹下所有文件名称
获取当前目录下所有文件夹名称 def get_group_list(folder_path):group_list []for root, dirs, files in os.walk(folder_path):for dir in dirs:group_list.append(dir)return group_list获取文件夹下所有文件名称 def get_file_list(folder_path, group_name):file_list …...
系统架构设计师-数据库系统(1)
目录 一、数据库模式 1、集中式数据库 2、分布式数据库 二、数据库设计过程 1、E-R模型 2、概念结构设计 3、逻辑结构设计 三、关系代数 1、并交差 2、投影和选择 3、笛卡尔积 4、自然连接 一、数据库模式 1、集中式数据库 三级模式: (1)外…...
Docker的相关知识介绍以及mac环境的安装
一、什么是Docker 大型项目组件较多,运行环境也较为复杂,部署时会碰到一些问题: 依赖关系复杂,容易出现兼容性问题开发、测试、生产环境有差异 Docker就是来解决这些问题的。Docker是一个快速交付应用、运行应用的技术&#x…...
Android设计支持库
本文所有的代码均存于 https://github.com/MADMAX110/BitsandPizzas 设计支持库(Design Support Library)是 Google 在 2015 年的 I/O 大会上发布的全新 Material Design 支持库,在这个 support 库里面主要包含了 8 个新的 Material Design …...
【Java 基础篇】Java实现文件搜索详解
文件搜索是计算机应用中的一个常见任务,它允许用户查找特定文件或目录,以便更轻松地管理文件系统中的内容。在Java中,您可以使用各种方法来实现文件搜索。本文将详细介绍如何使用Java编写文件搜索功能,以及一些相关的内容。 文件…...
会C++还需要再去学Python吗?
提到的C、数据结构与算法、操作系统、计算机网络和数据库技术等确实是计算机科学中非常重要的基础知识领域,对于软件开发和计算机工程师来说,它们是必备的核心知识。掌握这些知识对于开发高性能、可靠和安全的应用程序非常重要。Python作为一种脚本语言&…...
vue部分/所有内容全屏切换展示
需求:就是把一个页面的某一部分内容点击全屏操作按钮后全屏展示,并非所有内容全屏,所有内容的话那肯定就所有全屏展示啊,可以做切换 1.部分全屏代码 element.requestFullscreen();这个就是全屏的代码了,注意前面的ele…...
8.gec6818开发板通过并发多线程实现电子相册 智能家居 小游戏三合一完整项目
并发 前面编写的程序都是从mian函数开始,从上往下执行,称为顺序执行 假设一个程序需要I输入 C计算 P输出,以顺序执行三个上述程序,则其执行过程如下: 程序内部的语句是一条一条的执行,如果要运行多个程序…...
角度回归——角度编码方式
文章目录 1.为什么研究角度的编码方式?1.1 角度本身具有周期性1.2 深度学习的损失函数因为角度本身的周期性,在周期性的点上可能产生很大的Loss,造成训练不稳定1.3 那么如何处理边界问题呢:(以θ的边界问题为例&#x…...
【C# Programming】值类型、良构类型
值类型 1、值类型 值类型的变量直接包含值。换言之, 变量引用的位置就是值内存中实际存储的位置。 2、引用类型 引用类型的变量存储的是对一个对象实例的引用(通常为内存地址)。 复制引用类型的值时,复制的只是引用。这个引用非常小…...
Linux Day18 TCP_UDP协议及相关知识
一、网络基础概念 1.1 网络 网络是由若干结点和连接这些结点的链路组成,网络中的结点可以是计算机,交换机、 路由器等设备。 1.2 互联网 把多个网络连接起来就构成了互联网。目前最大的互联网就是因特网。 网络设备有:交换机、路由器、…...
【Java 基础篇】Java网络编程实时数据流处理
在现代计算机应用程序中,处理实时数据流是一项关键任务。这种数据流可以是来自传感器、网络、文件或其他源头的数据,需要即时处理并做出相应的决策。Java提供了强大的网络编程工具和库,可以用于处理实时数据流。本文将详细介绍如何使用Java进…...
Oracle 和 mysql 增加字段SQL
在Oracle和MySQL中,可以使用ALTER TABLE语句来增加字段。下面是分别是两种数据库增加字段的SQL示例: 在Oracle中增加字段的SQL示例: ALTER TABLE 表名ADD (新字段名 数据类型);例如,如果要在名为"employees"的表中添加…...
wordpress个人电脑搭建/代写文案的软件
模型/视图编程 模型/视图编程简介 Qt包含一组项目视图类,这些项目视图类使用模型/视图架构来管理数据及其向用户呈现方式之间的关系。此体系结构引入的功能分离为开发人员提供了更大的灵活性,可以自定义项目的表示形式,并提供标准的模型界面&…...
兰州市科协网站/热搜榜排名今日第一
Step-index fiber and Graded-index fiber 定义光纤截面图像折射率一维分布图为定义 Step-index fiber:可以理解为纤芯(core)是均匀的(uniform),即折射率保持为一个值。 Graded-index fiber:可以理解为core的中心到包层(cladding)的折射率分布是一个梯度的,即渐变的,通常…...
好用的wordpress企业模版/百度官网下载
一个项目只有一给仓库,状态也只能有一个,但是组件会非常之多,我们为了每个组件的共享状态便于统一管理,需要将多个reducer进行合并 export default function combineReducers(reducers) {const reducerKeys Object.keys(reducer…...
网站开发实战项目/学开网店哪个培训机构好正规
概述 Floodlight内部定义报文格式的代码位于net.floodlightcontroller.packet,其中定义的报文类型有ARP,BPDU,BSN,BSNPROBE,DHCP,Ethernet,ICMP,IPv4,LLC,LLDP,TCP,UDP。 其中定义了一个名为IPacket的接口,该接口结构如图: 包…...
酒店网站怎么做/企业品牌网站营销
很多朋友看完macOS Big Sur的介绍就把系统升级到big sur了,面对半成品的开发者预览版(Developer Preview),很多人表示无法接受,可降回10.15.5 的时候,提示不能回退老版本,那么macOS Big Sur如何…...
辽宁省精神文明建设工作三大创建活动网站/360推广
当满足以下三个条件时,两者会输出相同信息。 1. 服务器为80端口 2. apache的conf中ServerName设置正确 3. HTTP/1.1协议规范 不同点: 通常情况: _SERVER[“HTTP_HOST”] 在HTTP/1.1协议规范下,会根据客户端的HTTP请求输出信息…...