网站域名的设置/宁波seo优化费用
简介
boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。
同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对一些组件进行简单封装。
fiber封装
boost::fiber支持设置pthread和fiber的比例是1:n还是m:n,同时也支持设置调度方式是随机调度还是抢占调度。
本文中选择使用抢占式调度,并且是m:n的比例,这种选择适用面更加广。
既然pthread和fiber比例是m:n,那么这个m一般等于逻辑核数量,也就是需要设置fiber调度的线程控制在大小为固定的线程池中。fiber中抢占式调度方式也要求固定的线程池数量,外部前程加入时,可能会影响抢占式调度,即不能在外部线程中调用fiber,不然这个线程就加入到了fiber调度的pthread中了。
这时,需要一个设置一个队列,外部线程往这个队列中添加任务;内部线程池从队列中取任务,同时触发fiber,在fiber中可以继续触发fiber。触发队列、内部队列、工作线程、外部线程的关系如下图所示:
运行逻辑被装箱到一个任务中,然后被添加到任务队列,这一步利用模板和上转型实现,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | class IFiberTask {public:IFiberTask() = default;virtual ~IFiberTask() = default;IFiberTask(const IFiberTask& rhs) = delete;IFiberTask& operator=(const IFiberTask& rhs) = delete;IFiberTask(IFiberTask&& other) = default;IFiberTask& operator=(IFiberTask&& other) = default;virtual void execute() = 0;public:inline static std::atomic_size_t fibers_size {0}; };template <typename Func> class FiberTask: public IFiberTask {public:explicit FiberTask(Func&& func) :func_{std::move(func)} { }~FiberTask() override = default;FiberTask(const FiberTask& rhs) = delete;FiberTask& operator=(const FiberTask& rhs) = delete;FiberTask(FiberTask&& other) noexcept = default;FiberTask& operator=(FiberTask&& other) noexcept = default;void execute() override {fibers_size.fetch_add(1);func_();fibers_size.fetch_sub(1);}private:Func func_; }; |
IFiberTask是任务基类,不可拷贝;FiberTask是模板类,成员变量func_存储算子。使用IFiberTask类指针指向特化后的FiberTask对象,这时就实现的装箱操作,调用execute时,实际调用了子类的execute,触发封装的func_对象。
外部队列基于boost::fibers::buffered_channel
实现,这是一个支持并发的队列,队列的元素类型为std::tuple<boost::fibers::launch, std::unique_ptr>
,其中tuple第一元素存储任务的触发形式,进入队列还是立即触发。
接着是任务装箱,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | template<typename Func, typename... Args> auto Submit(boost::fibers::launch launch_policy, Func&& func, Args&&... args) {// 捕获lambda极其参数auto capture = [func = std::forward<Func>(func),args = std::make_tuple(std::forward<Args>(args)...)]() mutable {return std::apply(std::move(func), std::move(args));};// 任务的返回值类型using task_result_t = std::invoke_result_t<decltype(capture)>;// 该任务packaged_task的using packaged_task_t = boost::fibers::packaged_task<task_result_t()>;// 创建任务对象packaged_task_t task {std::move(capture)};// 装箱到FiberTask中using task_t = fiber::FiberTask<packaged_task_t>;// 获取packaged_task的futureauto result_future = task.get_future();// 添加到buffered_channel中auto status = work_queue_.push(std::make_tuple(launch_policy, std::make_unique<task_t>(std::move(task))));if (status != boost::fibers::channel_op_status::success) {return std::optional<std::decay_t<decltype(result_future)>> {};}return std::make_optional(std::move(result_future)); } |
代码中,先捕获lambda表达式及其参数,获取返回值类型并添加到packaged_task中,然后装箱到FiberTask中,使用packaged_task获取future并返回,FiberTask对象添加到队列中,使用IFiberTask的指针指向这个对象,实现装箱操作。
接着是内部任务触发的逻辑,首先创建一个线程池,每个线程注册调度器,接着从队列中获取任务,触发fiber。
工作线程的执行函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | // 注册调度算法为抢占式调度 boost::fibers::use_scheduling_algorithm<boost::fibers::algo::work_stealing>(threads_size_, true); // 创建task类型 auto task_tuple = typename decltype(work_queue_)::value_type {};// 从队列中获取任务 while(boost::fibers::channel_op_status::success == work_queue_.pop(task_tuple)) {// 解包auto& [launch_policy, task_to_run] = task_tuple;// 触发 fiber并detachboost::fibers::fiber(launch_policy, [task = std::move(task_to_run)]() {task->execute();}).detach(); } |
抢占式调度在注册时需要指定线程池大小,这时不能在外部线程中调用fiber,因为调用fiber的时候会把该线程添加到fiber调度的线程中,也就调整了fiber的worker线程数量。
以上代码实现了fiber触发器、任务队列、工作线程池等逻辑。
理论上可以创建多个fiber调度组件对象,每个组件根据自己的需要设置资源情况。
但实际应用中,还是建议使用一个全局调度组件,因为当A调度器中的任务依赖B调度器的任务的同时,就会出现阻塞工作线程,影响实际性能。
下面封装一个全局调度器,提供递交任务的接口和结束调度的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | class DefaultPool {private:static auto* Pool() {const static size_t size = std::thread::hardware_concurrency();static fiber::FiberPool pool(size, size*8);return &pool;}public:template<typename Func, typename... Args>static auto SubmitJob(boost::fibers::launch launch_policy, Func &&func, Args &&... args) {return Pool()->Submit(launch_policy, std::forward<Func>(func), std::forward<Args>(args)...);}template<typename Func, typename... Args>static auto SubmitJob(Func &&func, Args &&... args) {return Pool()->Submit(std::forward<Func>(func), std::forward<Args>(args)...);}static void Close() {Pool()->CloseQueue();}private:DefaultPool() = default; }; |
其他组件封装
上面对boost::fiber进行封装,得到一个能投入生产环境的调度器。
但是仅仅是这些是不够的,毕竟对于生产环境中的服务而言,外部服务、中间件的依赖是不能少的。
纤程是具有传染性的,对于外部组件提供的sdk,发送请求并进行同步等待会阻塞纤程对应的工作线程,影响整套机制。
为此,需要对现有的组件进行封装,对于同步接口,需要使用线程池配合fiber::promise;对于异步接口,可以改造成fiber::promise、future机制。下面介绍几种常用组件的fiber封装。
redis客户端封装
同步接口加线程池的方式将同步接口改造成异步接口的方案,存在较大的安全隐患。
线程池的容量不可控,当流量突然增加时,需要大量线程去等待,从而耗尽线程池资源,造成任务大量积压,服务崩溃。
而扩大线程池数量,又消耗了大量的资源。
综上,对于fiber化封装,还是建议采用异步接口。hiredis库支持异步接口,redis_plus_plus库对hiredis进行了c++封装,同时也提供了异步接口,本节将面向这个接口进行改造。
redis提供了挺多的接口,这里只对del、get、set三个接口做个示范:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | template<typename Type> using Promise = boost::fibers::promise<Type>;template<typename Type> using Future = boost::fibers::future<Type>;Future<long long > Del(const StringView &key) {auto promise = std::make_unique<Promise<long long >>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.del(key, [promise =promise.release()](sw::redis::Future<long long > &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future; }Future<OptionalString> Get(const StringView &key) {auto promise = std::make_unique<Promise<OptionalString>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.get(key, [promise = promise.release()](sw::redis::Future<OptionalString> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future; }Future<bool> Set(const StringView &key, const StringView &val) {auto promise = std::make_unique<Promise<bool>>();auto future = promise->get_future();// 在回调函数中对promise赋值redis_.set(key, val, [promise = promise.release()](sw::redis::Future<bool> &&fut) mutable {try {promise->set_value(fut.get());} catch (...) {promise->set_exception(std::current_exception());}delete promise;});return future; } |
注意,redis_plus_plus对每个回调函数通过模板进行判断,因此无法使用mutable+移动捕获promise,只能使用指针赋值的方式实现。redis_plus_plus在1.3.6以后的版本才有回调函数机制,之前的版本不支持。
上面原理是,创建fiber的promise和future,然后让redis的回调函数中捕获promise,并在promise中对数据进行赋值。而外部使用fiber的future进行等待,并不会阻塞工作线程。
grpc客户端封装
跟上面的redis客户端类似,这里也建议对grpc的异步客户端进行改造,支持fiber的promise、future机制。
grpc的异步客户端需要牵扯到grpc::CompletionQueue
,里面实现了一套poll engine,需要绑定一个线程去进行epoll_wait操作。首先定义一个GrpcClient类,包含四个成员变量、两个成员函数,如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | class GrpcClient {public:explicit GrpcClient(const ClientOption& option);~GrpcClient();// 对外提供的接口Future<meta::HelloResponse> Call(const meta::HelloRequest& request);private:// worker线程执行的逻辑void Work();private:std::unique_ptr<grpc::CompletionQueue> completion_queue_;std::thread worker_;std::shared_ptr<grpc::Channel> channel_;gpr_timespec timespec_{}; }; |
异步客户端分为三个部分逻辑,第一个是请求发送(Call函数),第二个是io线程批量处理,第三个是外部等待Future。
为了能够让io线程里给Promise进行赋值,需要Call函数中将Promise及其相关上下文传递到io线程中,这里定义一个上下文结构体:
1 2 3 4 5 6 | struct CallData {grpc::ClientContext context; // grpc上下文Promise<meta::HelloResponse> promise; // Promise对象grpc::Status status; // grpc调用状态meta::HelloResponse response; // 相应包 }; |
Call函数中的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 | // 创建上下文对象 auto data = new CallData; // 设置超时时间 data->context.set_deadline(timespec_); // 创建桩 meta::HelloService::Stub stub(channel_); auto future = data->promise.get_future(); // 异步调用,添加到完成队列中 auto rpc = stub.Asynchello(&data->context, request, completion_queue_.get()); // 绑定response、status,并将上下文对象作为tag传下去 rpc->Finish(&data->response, &data->status, reinterpret_cast<void*>(data)); return future; |
data对象在该函数中创建,在Work函数中释放,不存在内存泄漏问题。
grpc的异步稍微有点麻烦,发送之后,还要绑定数据。
接着是Work线程中的逻辑了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | CallData* data = nullptr; bool ok = false; // 获取状态完毕的数据 while (completion_queue_->Next((void**)&data, &ok)) {// 判断队列是否已经结束if (!ok) {break;}// 如果grpc状态ok,则赋值if (data->status.ok()) {data->promise.set_value(std::move(data->response));} else {// 否则设置异常data->promise.set_exception(std::make_exception_ptr(std::runtime_error(data->status.error_message())));}// 删除数据delete data;data = nullptr; } |
调用完成队列的Next函数会阻塞,如果队列中存在状态达到最终状态的数据,则返回一条。从完成对于中取到的数据的顺序与入队顺序不同。
上面两个函数组合实现了Future获取和Promise赋值的操作,使得grpc客户端能在fiber中使用。
参考
相关文章:

生产环境使用boost::fiber
简介 boost::fiber是一类用户级线程,也就是纤程。其提供的例子与实际生产环境相距较远,本文将对其进行一定的改造,将其能够投入到生产环境。 同时由于纤程是具有传染性的,使用纤程的代码里也全部要用纤程封装,本文将对…...

TSINGSEE青犀AI视频识别技术+危化安全生产智慧监管方案
一、背景分析 石油与化学工业生产过程复杂多样,涉及的物料易燃易爆、有毒有害,生产条件多高温高压、低温负压,现场危险化学品存储量大、危险源集中,重特大安全事故多发。打造基于工业互联网的安全生产新型能力,提高危…...

小程序request请求封装
以上为本人的项目目录 1.首先在utils中创建request.js文件封装request请求,此封装带上了token,每次请求都会自带token,需要你从后端获取后利用wx.setStorageSync(token,返回的token),不使用的话就是空。 直接复制即可,需要改一下…...

Easy Javadoc插件的使用教程
目录 一、安装Easy Javadoc插件 二、配置注释模板 三、配置翻译 一、安装Easy Javadoc插件 在idea的File-Settings-Plugins中搜索Easy Javadoc插件,点击install进行安装,安装完成后需要restart IDE,重启后插件生效。 二、配置注释模板 …...

一篇文章让你弄懂Java中的方法
目录 1. 方法概念及使用 1.1 什么是方法(method) 1.2 方法定义 1.3 方法调用的执行过程 1.4 实参和形参的关系 1.5 没有返回值的方法 2. 方法重载 2.1 为什么需要方法重载 2.2 方法重载概念 2.3 方法签名 1. 方法概念及使用 1.1 什么是方法(method) 方法就是一…...

SAP MM学习笔记39 - MRP(资材所要量计划)
这一章开始,离开请求书,学点儿新知识啦。 MRP ( Material Requirement Planning ) - 资材所要量计划。 它的位置在下面的调达周期图上来看,就是右上角的 所要量决定那块儿。 1,MRP(资材所要量计划) 的概要 MRP 的主要目的就是 确…...

总线类设备驱动——IIC
目录 一、本章目标 二、IIC设备驱动 2.1 I2C协议简介 2.2 LinuxI2C驱动 2.3 I2C 设备驱动实例 一、本章目标 一条总线可以将多个设备连接在一起,提高了系统的可扩展性能。这个互联的系统通常由三部分组成:总线控制器、物理总线(一组信号线) 和设备。总线控制器…...

MES 的价值点之动态调度
随着数字化技术的发展,为制造企业的生产计划提供了更多的便利。但在实际生产管理过程中,企业的生产计划不管做的多么理想,还是可能会因诸多的扰动因素造成执行与计划差异,这时就需要通过一些动态调整方案去适应新的生产要求与环境…...

dfs序及相关例题
常用的三种dfs序 欧拉序 每经过一次该点记录一次的序列。 dfs序 记录入栈和出栈的序列。 dfn序 只记录入栈的序列。 dfs序 DFS 序列是指 DFS 调用过程中访问的节点编号的序列。 如何求dfs序?可以用以下代码来找dfs序。 vector<vector<int>> g(n…...

python入门实战:爬取图片到本地
简单记录一下爬取网站图片保存到本地指定目录过程,希望对刚入门的小伙伴有所帮助! 目标网站就是下图所示页面: 实现步骤: 1.爬取每页的图片地址集合 2.下载图片到本地 3. 获取指定页数的页面路径 以下是实现代码: import bs4 import requests import os # 下…...

day02 矩阵 2023.10.26
1.矩阵 2.矩阵乘法 3.特殊矩阵 4.逆矩阵 5.正交矩阵 6.几何意义 7.齐次坐标 8.平移矩阵 9.旋转矩阵 10.缩放矩阵 11.复合运算...

浪潮信息inMerge超融合 刷新全球vSAN架构虚拟化VMmark最佳成绩
近日,在国际权威的VMmark测试中,浪潮信息inMerge1100超融合产品搭载NF5280M7服务器,满载运行44Tiles取得40.95分的成绩,刷新了vSAN架构(Intel双路最新平台)虚拟化性能测试纪录。该测试结果证明inMerge1100可…...

【【哈希应用】位图/布隆过滤器】
位图/布隆过滤器 位图位图概念位图的使用位图模拟实现 布隆过滤器布隆过滤器概念布隆过滤器的使用布隆过滤器模拟实现 位图/布隆过滤器应用:海量数据处理哈希切分 位图 位图概念 计算机中通常以位bit为数据最小存储单位,只有0、1两种二进制状态&#x…...

OpenCV学习笔记
一、OpenCV基础 (一)图像的读取、显示、创建 https://mp.weixin.qq.com/s?__bizMzA4MTA1NjM5NQ&mid2247485202&idx1&sn05d0b4cd25675a99357910a5f2694508&chksm9f9b80f6a8ec09e03ab2bb518ea6aad83db007c9cdd602c7459ed75c737e380ac9c3…...

idea 一键部署jar包
上传成功...

16、SpringCloud -- 常见的接口防刷限流方式
目录 接口防刷限流方式1:隐藏秒杀地址需求:思路:代码:前端:后端:测试:总结:方式2:图形验证码1、生成图形验证码需求:思路:代码:前端:后端:测试:2、校验验证码需求:思路:代码:...

Typora(morkdown编辑器)的安装包和安装教程
Typora(morkdown编辑器)的安装包和安装教程 下载安装1、覆盖文件2、输入序列号①打开 typora ,点击“输入序列号”:②邮箱一栏中任意填写(但须保证邮箱地址格式正确),输入序列号,点击…...

服务器不稳定对网站有什么影响
世界上最远的距离,不是树枝无法相依,而是相互了望的星星,却没有交汇的轨迹。 现代技术的进步,导致了人与人之间距 离的消除,直播行业的快速发展的影响和渗透进如今的日常生活,为人们在遥远的距离相见与互诉…...

py实现surf特征提取
import cv2def main():# 加载图像image1 cv2.imread(image1.jpg, cv2.IMREAD_GRAYSCALE)image2 cv2.imread(image2.jpg, cv2.IMREAD_GRAYSCALE)# 创建SURF对象surf cv2.xfeatures2d.SURF_create()# 检测特征点和描述符keypoints1, descriptors1 surf.detectAndCompute(imag…...

MS39233三个半桥驱动器可兼容TMC6300
MS39233 是一款低压三个半桥驱动器。可兼容 TMC6300(功能基本一致,管脚不兼容)。它可应用于低电压及电池供电的运动控制场合。并且内置电荷泵来提供内部功率 NMOS 所需的栅驱动电压。 MS39233 可以提供最高 2.8A 的峰值电流,其功率…...

09、SpringCloud -- 利用redis的原子性控制高并发请求访问到service层、本地标识
目录 利用redis的原子性控制请求问题:需求:思路什么是原子性的操作?代码思路:代码:工具类依赖SeckillGoodControllerSeckillOrderInfoController测试:本地标识的分析和实现问题:需求:思路:代码:测试:利用redis的原子性控制请求 利用redis的原子性控制人数请求访问到…...

竞赛选题 深度学习图像修复算法 - opencv python 机器视觉
文章目录 0 前言2 什么是图像内容填充修复3 原理分析3.1 第一步:将图像理解为一个概率分布的样本3.2 补全图像 3.3 快速生成假图像3.4 生成对抗网络(Generative Adversarial Net, GAN) 的架构3.5 使用G(z)生成伪图像 4 在Tensorflow上构建DCGANs最后 0 前言 &#…...

基于深度学习网络的美食检测系统matlab仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 % 图像大小 image_size [224 224 3]; num_classes size(VD,2)-1;% 目标类别数量…...

人工智能基础_机器学习006_有监督机器学习_正规方程的公式推导_最小二乘法_凸函数的判定---人工智能工作笔记0046
我们来看一下公式的推导这部分比较难一些, 首先要记住公式,这个公式,不用自己理解,知道怎么用就行, 比如这个(mA)T 这个转置的关系要知道 然后我们看这个符号就是求X的导数,X导数的转置除以X的导数,就得到单位矩阵, 可以看到下面也是,各种X的导数,然后计算,得到对应的矩阵结…...

【MongoDB】Windows 安装MongoDB 6.0
一、下载安装包 安装包下载地址https://www.mongodb.com/try/download/community这里我选择的是 二、解压并安装 1、解压 这里我将压缩包解压到了D盘,并重命名成了mongodb,解压后的目录如下: 2、创建配置文件 在D:\mongodb下新建conf目录…...

DM8 Dokcer镜像更新后远程无法jdbc连接问题
背景:原来官网下的dm8docker镜像有效期只有两个星期,问他们商务申请了新的dm8镜像,准备简单升级一下镜像再引入原来的database 先说结论:jdbc驱动要更新 官网dm8驱动链接地址 原来的tag镜像 dm8_single:v8.1.2.128_ent_x86_64…...

AI:39-基于深度学习的车牌识别检测
🚀 本文选自专栏:AI领域专栏 从基础到实践,深入了解算法、案例和最新趋势。无论你是初学者还是经验丰富的数据科学家,通过案例和项目实践,掌握核心概念和实用技能。每篇案例都包含代码实例,详细讲解供大家学习。 📌📌📌本专栏包含以下学习方向: 机器学习、深度学…...

软考 系统架构设计师系列知识点之系统架构评估(1)
所属章节: 第8章. 系统质量属性与架构评估 第2节. 系统架构评估 1. 概述 系统架构评估是在对架构分析、评估的基础上,对架构策略的选取进行决策。它利用数学或逻辑分析技术,针对系统的一致性、正确性、质量属性、规划结果等不同方面&#x…...

Spark UI中Shuffle dataSize 和shuffle bytes written 指标区别
背景 本文基于Spark 3.1.1 目前在做一些知识回顾的时候,发现了一些很有意思的事情,就是Spark UI中ShuffleExchangeExec 的dataSize和shuffle bytes written指标是不一样的, 那么在AQE阶段的时候,是以哪个指标来作为每个Task分区大…...

Java——Map.getOrDefault方法详解
Java——Map.getOrDefault方法详解 Map.getOrDefault(Object key, V defaultValue)是Java中Map接口的一个方法,用于获取指定键对应的值,如果键不存在,则返回一个默认值。 该方法的签名如下: V getOrDefault(Object key, V defau…...