当前位置: 首页 > news >正文

仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)

@TOC

介绍        

        std::future 是C++11标准库中的一个模板类,他表示一个异步操作的结果,当我们在多线程编程中使用异步任务时,std::future可以帮助我们在需要的时候,获取任务的执行结果,std::future 的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保我们在获取结果时不会遇到未完成的操作。

应用场景

  • 异步任务:当我们需要在后台执行一些耗时操作时,如网络请求或计算密集型任务等,std::future可以用来表示这些异步任务的结果,通过将任务与主线程分离,我们可以实现任务的并行操作,从而提高程序的执行效率
  • 并发操作:在多线程编程中,我们可能需要等待某些任务完成后才能执行其他操作,通过使用std::future,我们可以实现线程之间的同步,确保任务完成后再获取结果并继续执行后续结果
  • 结果获取:std::future提供了一种安全的方式来获取异步任务的结果。我们可以使用std::future::get()函数来获取任务的结果,此函数会阻塞当前线程,直到异步操作完成。这样,在调用get()函数时,我们可以确保已经获取到了所需的结果

使用方法

std::async

        std::async是一种将任务与std::future关联的简单方法,它创建并运行一个异步任务,并返回一个与该任务结果关联的std::future对象。默认情况下,std::async是否启动了一个新的线程,或者在等待future时,任务是否同步运行取决于你给的参数,这个参数为std::launch类型:

  1. std::launch::deferred 表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务
  2. std::launch::async表明函数会在自己创建的线程上运行
  3. std::launch::deferred | std::launch::async 内部通过系统等条件自动选择策略
#include <iostream>
#include <future>
#include <thread>
#include <chrono>using namespace std;int async_task()
{std::this_thread::sleep_for(std::chrono::seconds(2));return 2;
}int main()
{// 关联异步任务async_task 和 futurestd::future<int> result = std::async(std::launch::deferred | std::launch::async, async_task);// 此时可以执行其他操作cout << "干其他事情" << endl;// 获取异步任务结果int ret = result.get();cout << ret << endl;return 0;
}

std::packaged_task

        std::packaged_task就是将任务和std::future绑定在一起的模板,是一种对任务的封装,我们可以通过std::packaged_task对象获取任务相关联的std::future对象,通过调用get_future()方法获取。std::packaged_task的模板参数是函数签名。

所谓函数签名就是一个函数头去掉函数名

下面介绍一下std::packaged_task,首先这个类型的对象是不可复制的

可以看到拷贝构造函数被delete了。

        std::packaged_task是用来包装异步任务的工具,它的本质是将一个可调用对象封装起来,和std::future结合起来,这个不能被直接调用,因为这样的实质是同步调用任务,而不是异步调用,并且std::packaged_task对象是没有返回值的,因为是不可拷贝的, 所以std::packaged_task对象在使用的时候,需要创建一个线程,然后使用智能指针或者move函数来进行传递。注意因为创建了一个新的线程,并且需要获取到这个新的线程执行任务的结果,所以我们就需要进行等待或者分离,即join和detach()。

使用move进行移动

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>int Add(int num1, int num2)
{std::this_thread::sleep_for(std::chrono::seconds(2));return num1 + num2;
}int main()
{std::packaged_task<int(int, int)> task(Add);std::future<int> fu = task.get_future();std::thread th(std::move(task), 11, 22);int ret = fu.get();std::cout << ret << std::endl;th.join();return 0;
}

使用智能指针

#include <iostream>
#include <future>
#include <thread>
#include <memory>
#include <chrono>int Add(int num1, int num2)
{std::this_thread::sleep_for(std::chrono::seconds(2));return num1 + num2;
}int main()
{auto ptask = std::make_shared<std::packaged_task<int(int, int)>>(Add);std::future<int> fu = ptask->get_future();std::thread th([ptask](){(*ptask)(11, 22);});int ret = fu.get();std::cout << ret << std::endl;th.join();return 0;
}

std::promise

        std::promise提供了一种设置值的方式,它可以在设置之后通过相关联的std::future对象进行读取,换种说法来说就是之前说过的std::future可以读取一个异步函数的返回值了,但是要等待就绪,而std::promise就提供了一个方式手动让std::future就绪

#include <iostream>
#include <future>
#include <chrono>void task(std::promise<int> result_promise)
{int result = 2;result_promise.set_value(result);std::cout << result << std::endl;
}int main()
{std::promise<int> result;std::future<int> reuslt_future = result.get_future();std::thread th(task, std::move(result));int ret = reuslt_future.get();std::cout << ret << std::endl;th.join();return 0;
}

线程池设计

#include <iostream>
#include <functional>
#include <memory>
#include <thread>
#include <future>
#include <mutex>
#include <condition_variable>
#include <vector>class threadPool
{
public:using ptr = std::shared_ptr<threadPool>;using Functor = std::function<void(void)>;threadPool(int thr_count = 1): _stop(false){for (int i = 0; i < thr_count; i++){_threads.emplace_back(&threadPool::entry, this);}}~threadPool(){stop();}// push传入的是首先有一个函数--用户要执行的函数,接下来是不定参,标识要处理的数据就是要传入到函数中的参数// push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),// 使用 lambda 生成一个可调用对象(内部执行异步程序)抛入到任务池中,由工作线程取出进行执行template <typename F, typename ...Args>auto push(F &&func, Args &&...args) -> std::future<decltype(func(args...))>{// 1. 将传入的函数封装成一个packaged_task任务using return_type = decltype(func(args...));auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);//auto task = std::make_shared<std::packaged_task<return_type(std::forward<Args>(args)...)>>(std::forward(func));auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);std::future<return_type> fu = task->get_future();// 2. 构造一个lambda 匿名函数(捕获任务对象),函数内执行任务对象{std::unique_lock<std::mutex> lock(_mutex);// 3. 将构造出来的匿名函数,抛入到任务池中_taskpool.push_back([task](){ (*task)(); });_cv.notify_one();}return fu;}void stop(){if(_stop == true) return ;_stop = true;_cv.notify_all();for (auto &thread : _threads){thread.join();}}private:// 线程入口函数---内部不断的从任务池中取出任务进行执行void entry(){while (!_stop){std::vector<Functor> tmp_taskpool;{// 加锁std::unique_lock<std::mutex> lock(_mutex);// 等待任务池不为空,或者_stop 被置位返回true_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });// 取出任务进行执行tmp_taskpool.swap(_taskpool);   // 这里是将全局任务池里面的任务全部给一个线程里面的任务池    }for (auto &task : tmp_taskpool){task();}}}private:std::atomic<bool> _stop;std::vector<Functor> _taskpool;   // 任务池std::mutex _mutex;              // 互斥锁std::condition_variable _cv;    // 条件变量std::vector<std::thread> _threads;
};

main函数

#include "threadpool.hpp"int Add(int num1, int num2)
{return num1 + num2;
}int main()
{threadPool pool;for(int i = 0; i < 10; i++){std::future<int> fu = pool.push(Add, 11, i);std::cout << fu.get() << std::endl;}pool.stop();return 0;
}

相关文章:

仿RabbitMq实现简易消息队列基础篇(future操作实现异步线程池)

TOC 介绍 std::future 是C11标准库中的一个模板类&#xff0c;他表示一个异步操作的结果&#xff0c;当我们在多线程编程中使用异步任务时&#xff0c;std::future可以帮助我们在需要的时候&#xff0c;获取任务的执行结果&#xff0c;std::future 的一个重要特性是能…...

经典算法题总结:数组常用技巧(双指针,二分查找和位运算)篇

双指针 在处理数组和链表相关问题时&#xff0c;双指针技巧是经常用到的&#xff0c;双指针技巧主要分为两类&#xff1a;左右指针和快慢指针。所谓左右指针&#xff0c;就是两个指针相向而行或者相背而行&#xff1b;而所谓快慢指针&#xff0c;就是两个指针同向而行&#xf…...

版本控制基础理论

一、本地版本控制 在本地记录文件每次的更新&#xff0c;可以对每个版本做一个快照&#xff0c;或是记录补丁文件&#xff0c;适合个人使用&#xff0c;如RCS. 二、集中式版本控制&#xff08;代表SVN&#xff09; 所有的版本数据都保存在服务器上&#xff0c;协同开发者从…...

微分方程(Blanchard Differential Equations 4th)中文版Section1.4

1.4 NUMERICAL TECHNIQUE: EULER’S METHOD 上一节中讨论的斜率场的几何概念与近似微分方程解的基本数值方法密切相关。给定一个初值问题 d y d t = f ( t , y ) , y ( t 0 ) = y 0 , \frac{dy}{dt}=f(t,y), \quad y(t_0) = y_0, dtdy​=f(t,y),y(t0​)=y0​, 我们可以通过首…...

求职Leetcode算法题(7)

1.搜索旋转排序数组 这道题要求时间复杂度为o&#xff08;log n&#xff09;&#xff0c;那么第一时间想到的就是二分法&#xff0c;二分法有个前提条件是在有序数组下&#xff0c;我们发现在这个数组中存在两部分是有序的&#xff0c;所以我们只需要对前半部分和后半部分分别…...

ActiveMQ、RabbitMQ、Kafka、RocketMQ在事务性消息、性能、高可用和容错、定时消息、负载均衡、刷盘策略的区别

ActiveMQ、RabbitMQ、Kafka、RocketMQ这四种消息队列在事务性消息、性能、高可用和容错、定时消息、负载均衡、刷盘策略等方面各有其特点和差异。以下是对这些方面的详细比较&#xff1a; 1. 事务性消息 ActiveMQ&#xff1a;支持事务性消息。ActiveMQ可以基于JMS&#xff08…...

HanLP分词的使用与注意事项

1 概述 HanLP是一个自然语言处理工具包&#xff0c;它提供的主要功能如下&#xff1a; 分词转化为拼音繁转简、简转繁提取关键词提取短语提取词语自动摘要依存文法分析 下面将介绍其分词功能的使用。 2 依赖 下面是依赖的jar包。 <dependency><groupId>com.ha…...

Python 的进程、线程、协程的区别和联系是什么?

一、区别 1. 进程 • 定义&#xff1a;进程是操作系统分配资源的基本单位。 • 资源独立性&#xff1a;每个进程都有独立的内存空间&#xff0c;包括代码、数据和运行时的环境。 • 并发性&#xff1a;可以同时运行多个进程&#xff0c;操作系统通过时间片轮转等方式在不同…...

实时数据推送:Spring Boot 中两种 SSE 实战方案

在 Web 开发中&#xff0c;实时数据交互变得越来越普遍。无论是股票价格的波动、比赛比分的更新&#xff0c;还是聊天消息的传递&#xff0c;都需要服务器能够及时地将数据推送给客户端。传统的 HTTP 请求-响应模式在处理这类需求时显得力不从心&#xff0c;而服务器推送事件&a…...

数据守护者:SQL一致性检查的艺术与实践

标题&#xff1a;数据守护者&#xff1a;SQL一致性检查的艺术与实践 在数据驱动的商业世界中&#xff0c;数据的一致性是确保决策准确性和业务流程顺畅的关键。SQL作为数据查询和操作的基石&#xff0c;提供了多种工具来维护数据的一致性。本文将深入探讨如何使用SQL进行数据一…...

jenkins配置+vue打包多环境切换

jenkins配置流水线过程 1.新建item 加入相关的参数就行了。 流水线脚本设置 后端脚本 node {stage checkoutsh"""#每次打包清空工作空间目录rm -rf $workspace/*cd $workspace#到工作空间下从远端svn服务端拉取代码svn co svn://10.1.19.21/repo/技术中台/低…...

idea和jdk的安装教程

1.JDK的安装 下载 进入官网&#xff0c;找到你需要的JDK版本 Java Downloads | Oracle 中国 我这里是windows的jdk17&#xff0c;选择以下 安装 点击下一步&#xff0c;安装完成 配置环境变量 打开查看高级系统设置 在系统变量中添加两个配置 一个变量名是 JAVA_HOME …...

HTML静态网页成品作业(HTML+CSS)——电影网首页网页设计制作(1个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有1个页面。 二、作品演示 三、代…...

大数据系列之:Flink Doris Connector,实时同步数据到Doris数据库

大数据系列之&#xff1a;Flink Doris Connector&#xff0c;实时同步数据到Doris数据库 一、版本兼容性二、使用三、Flink SQL四、DataStream五、Lookup Join六、配置通用配置项接收器配置项查找Join配置项 七、Doris 和 Flink 列类型映射八、使用Flink CDC访问Doris的示例九、…...

LabVIEW VI 多语言动态加载与运行的实现

在多语言应用程序开发中&#xff0c;确保用户界面能够根据用户的语言偏好动态切换是一个关键需求。本文通过分析一个LabVIEW程序框图&#xff0c;详细说明了如何使用LabVIEW中的属性节点和调用节点来实现VI&#xff08;虚拟仪器&#xff09;界面语言的动态加载与运行。此程序允…...

Unity引擎基础知识

目录 Unity基础知识概要 1. 创建工程 2. 工程目录介绍 3. Unity界面和五大面板 4. 游戏物体创建与操作 5. 场景和层管理 6. 组件系统 7. 脚本语言C# 8. 物理引擎和UI系统 学习资源推荐 Unity引擎中如何优化大型游戏项目的性能&#xff1f; Unity C#脚本语言的高级编…...

练习题- 探索正则表达式对象和对象匹配

正则表达式(Regular Expressions)是一种强大而灵活的文本处理工具,它允许我们通过模式匹配来处理字符串。这在数据清理、文本分析等领域有着广泛的应用。在Python中,正则表达式通过re模块提供支持,学习和掌握正则表达式对于处理复杂的文本数据至关重要。 本文将探索如何在…...

Java集合提升

1. 手写ArrayList 1.1. ArrayList底层原理细节 底层结构是一个长度可以动态增长的数组&#xff08;顺序表&#xff09;transient Object[] elementData; 特点&#xff1a;在内存中分配连续的空间&#xff0c;只存储数据&#xff0c;不存储地址信息。位置就隐含着地址。优点 节…...

uniapp 微信小程序生成水印图片

效果 源码 <template><view style"overflow: hidden;"><camera device-position"back" flash"auto" class"camera"><cover-view class"text-white padding water-mark"><cover-view class"…...

ElasticSearch相关知识点

ElasticSearch中的倒排索引是如何工作的&#xff1f; 倒排索引是ElasticSearch中用于全文检索的一种数据结构&#xff0c;与正排索引不同的是&#xff0c;正排索引将文档按照词汇顺序组织。而倒排索引是将词汇映射到包含该词汇的文档中。 在ElasticSearch中&#xff0c;倒排索…...

Cursor实现用excel数据填充word模版的方法

cursor主页&#xff1a;https://www.cursor.com/ 任务目标&#xff1a;把excel格式的数据里的单元格&#xff0c;按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例&#xff0c;…...

蓝牙 BLE 扫描面试题大全(2):进阶面试题与实战演练

前文覆盖了 BLE 扫描的基础概念与经典问题蓝牙 BLE 扫描面试题大全(1)&#xff1a;从基础到实战的深度解析-CSDN博客&#xff0c;但实际面试中&#xff0c;企业更关注候选人对复杂场景的应对能力&#xff08;如多设备并发扫描、低功耗与高发现率的平衡&#xff09;和前沿技术的…...

srs linux

下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935&#xff0c;SRS管理页面端口是8080&#xff0c;可…...

JavaScript 数据类型详解

JavaScript 数据类型详解 JavaScript 数据类型分为 原始类型&#xff08;Primitive&#xff09; 和 对象类型&#xff08;Object&#xff09; 两大类&#xff0c;共 8 种&#xff08;ES11&#xff09;&#xff1a; 一、原始类型&#xff08;7种&#xff09; 1. undefined 定…...

【Linux手册】探秘系统世界:从用户交互到硬件底层的全链路工作之旅

目录 前言 操作系统与驱动程序 是什么&#xff0c;为什么 怎么做 system call 用户操作接口 总结 前言 日常生活中&#xff0c;我们在使用电子设备时&#xff0c;我们所输入执行的每一条指令最终大多都会作用到硬件上&#xff0c;比如下载一款软件最终会下载到硬盘上&am…...

jdbc查询mysql数据库时,出现id顺序错误的情况

我在repository中的查询语句如下所示&#xff0c;即传入一个List<intager>的数据&#xff0c;返回这些id的问题列表。但是由于数据库查询时ID列表的顺序与预期不一致&#xff0c;会导致返回的id是从小到大排列的&#xff0c;但我不希望这样。 Query("SELECT NEW com…...

Vue 3 + WebSocket 实战:公司通知实时推送功能详解

&#x1f4e2; Vue 3 WebSocket 实战&#xff1a;公司通知实时推送功能详解 &#x1f4cc; 收藏 点赞 关注&#xff0c;项目中要用到推送功能时就不怕找不到了&#xff01; 实时通知是企业系统中常见的功能&#xff0c;比如&#xff1a;管理员发布通知后&#xff0c;所有用户…...

鸿蒙HarmonyOS 5军旗小游戏实现指南

1. 项目概述 本军旗小游戏基于鸿蒙HarmonyOS 5开发&#xff0c;采用DevEco Studio实现&#xff0c;包含完整的游戏逻辑和UI界面。 2. 项目结构 /src/main/java/com/example/militarychess/├── MainAbilitySlice.java // 主界面├── GameView.java // 游戏核…...

node.js的初步学习

那什么是node.js呢&#xff1f; 和JavaScript又是什么关系呢&#xff1f; node.js 提供了 JavaScript的运行环境。当JavaScript作为后端开发语言来说&#xff0c; 需要在node.js的环境上进行当JavaScript作为前端开发语言来说&#xff0c;需要在浏览器的环境上进行 Node.js 可…...

LangChain【6】之输出解析器:结构化LLM响应的关键工具

文章目录 一 LangChain输出解析器概述1.1 什么是输出解析器&#xff1f;1.2 主要功能与工作原理1.3 常用解析器类型 二 主要输出解析器类型2.1 Pydantic/Json输出解析器2.2 结构化输出解析器2.3 列表解析器2.4 日期解析器2.5 Json输出解析器2.6 xml输出解析器 三 高级使用技巧3…...