当前位置: 首页 > news >正文

Python 全栈系列256 异步任务与队列消息控制(填坑)

说明

每个创新都会伴随着一系列的改变。

在使用celery进行异步任务后,产生的一个问题恰好也是因为异步产生的。

内容

1 问题描述

我有一个队列 stream1, 对应的worker1需要周期性的获取数据,对输入的数据进行模式识别后分流。worker1我设施为10秒运行一次。然后我就发现输出队列的数据大约是6~7倍于原始数据。

2 分析

在同步执行的状态下,前面 一个任务没有结束,后面的任务即使到了执行时间也会错过。这个在APS任务里是非常明确的。但由于Celery执行的Worker是异步的,这意味着即使前一个任务没有完成,后一个任务还是会如期启动,另开一个线头。

Worker1之前的模式是采用xrange方式获取数据,在处理完成后才将消息删除。

由于模式识别的过程比较复杂,层层过滤,所以单个worker执行的时间超过了60秒。这样在这批消息删除之前,每次启动的worker都取到了相同的数据,处理后也会输出到结果队列。

3 解决办法

理论上,每次worker的取数应该是采用xfetch比较合理,但是对应的,xfetch会因为worker的中断导致消息残留。所以就要有另一些worker来进行残余消息的检测和处理。结果就是 xfetch worker + residual worker配合,显得麻烦。

过去在同步状态下,我就偷懒,只用一个worker进行xrange,这样只有消息被真实消费才会删除。

xfetch是支持多个worker并行的,而xrange则智能支持单个worker。

所以,本次要做的事就是把xfetch + residual 模式搞一下,以后该用什么模式就什么模式。

4 实践

为每个worker提供一种获取残余消息(residual)的办法,每个小时执行一次即可。普通的worker(fetch)一般是秒级,或者分钟级执行的。

当前的QManager是架在RedisAgent服务上封装的对象,这个对象极大简化了平时的操作。不过之前,并没有完全将QManager与RedisAgent的参数对接,采用了较为简单的方式。

本次需要做的是先使用RedisAgent完成对应的任务,然后将QManager进行升级。

构造测试队列

test_list = [{'doc_id':1, 'content':'first'}, {'doc_id':2, 'content':'ss'}]
qm.ensure_group('test.test.test')
qm.parrallel_write_msg('test.test.test', test_list){'status': True, 'msg': 'ok,add 2 of 2  messages'}

获取消息

qm.xfetch('test.test.test', count=1)
{'data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'}],'status': True,'msg': 'ok'}
  • 1 判断是否有延误消息

两个关键参数,一个是队列名称,一个是延误时间。如果不写延误时间,就是看所有的延误。

resp = req.post('http://172.17.0.1:24118/get_pending_msg/',json = {'stream_name':'test.test.test' , 'idle_seconds':20}).json()resp = req.post('http://172.17.0.1:24118/get_pending_msg/',json = {'stream_name':'test.test.test','idle_seconds':None }).json(){'status': True,'msg': 'ok','data': [['1718984345178-0', 'consumer1', 36675032, 1]]}

延误时间的最大作用是避免获取短时间内超时的任务(如果任务本身就需要很长时间)

如果data字段长度不为0,那么就会有延误消息,获取最小和最大的id即可。

  • 2 根据起止id获取数据
delay_data = resp['data'] 
start_id = delay_data[0][0]
end_id = delay_data[-1][0]resp = req.post('http://172.17.0.1:24118/xrange/',json = {'stream_name':'test.test.test' , 'start_id': start_id,'end_id':end_id}).json()
{'status': True,'msg': 'ok','data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},{'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]}

所以相应低,修改QMananger(version1.3)的xrange方法,并增加xpending方法

xrange

...# 批量获取数据def xrange(self, stream_name, count = None, start_id = '-' , end_id ='+'):cur_count = count or self.batch_size recs_resp = req.post(self.redis_agent_host + 'xrange/',json ={'connection_hash':self.redis_connection_hash, 'stream_name':stream_name,'count':cur_count,'start_id':start_id,'end_id':end_id}).json()return recs_resp

xpending。原来的接口似乎有点小bug:如果队列没有延误,接口查询会失败

...def xpending(self, stream_name,count = None, idle_seconds = 3600):cur_count = count or self.batch_size # 1 确认是否有延误消息:没有延误消息的情况接口会报错try:resp = req.post(self.redis_agent_host + 'get_pending_msg/',json ={'stream_name': stream_name, 'idle_seconds': idle_seconds}).json()# 如果没有数据,直接返回(标准格式)if len(resp['data']) == 0:print('No Pending')return resp except:return {'status':True, 'msg':'query pending fail', 'data':[]}# 2 获取被延误的消息min_id = resp['data'][0][0]max_id = resp['data'][-1][0]return self.xrange(stream_name, count = cur_count, start_id = min_id, end_id = max_id)

Note: 我们对正常执行的任务,感知/容忍的周期为分钟;对延误执行(补漏)的任务,感知/容忍的周期为小时。

来看改造后的QM

#  xfetch,但是此时已经无数据可取
qm.xfetch('test.test.test' )
{'status': True, 'msg': 'ok', 'data': []}
# xpending 此时有两条延误较长时间的消息
qm.xpending('test.test.test' , idle_seconds=3600)
{'status': True,'msg': 'ok','data': [{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},{'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]}
# 用xrange取出,处理
data_list = qm.xpending('test.test.test' , idle_seconds=3600)['data']
[{'_msg_id': '1718984345178-0', 'doc_id': '1', 'content': 'first'},{'_msg_id': '1718984345178-1', 'doc_id': '2', 'content': 'ss'}]# 假设处理完,准备删除消息
data_msg_list = qm.extract_msg_id(data_list)
['1718984345178-0', '1718984345178-1']
qm.xdel('test.test.test', data_msg_list)
{'data': 2, 'status': True, 'msg': 'ok'}# 再次使用xpending
qm.xpending('test.test.test' , idle_seconds=3600)
{'status': True, 'msg': 'no source data', 'data': []}

另外,xpending中,即使是把pending的消息处理掉了,仍然可以读到pending信息,所以每次会调用一下xrange查询一个不存在的区间,稍微有点浪费。不过考虑到这是补救型的操作,一个小时才运行一次,就没有关系了。

相关文章:

Python 全栈系列256 异步任务与队列消息控制(填坑)

说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后,产生的一个问题恰好也是因为异步产生的。 内容 1 问题描述 我有一个队列 stream1, 对应的worker1需要周期性的获取数据,对输入的数据进行模式识别后分流。worker1我设施为10秒运行…...

从零开始的Ollama指南:部署私域大模型

大模型相关目录 大模型,包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步,扬帆起航。 大模型应用向开发路径:AI代理工作流大模型应用开发实用开源项目汇总大模…...

C++类和对象总结

目录 总结 一、引言 二、类的定义 三、对象的创建与初始化 四、访问控制 五、封装 六、继承 七、多态 八、其他特性 九、总结 C类的定义 C对象的创建和初始化 C类的访问控制 总结 一、引言 C是一种面向对象的编程语言,其核心概念是类和对象。类是对现…...

基于PHP的民宿管理系统

有需要请加文章底部Q哦 可远程调试 基于PHP的民宿管理系统 一 介绍 此民宿管理系统基于原生PHP开发,数据库mysql,前端jquery.js和echarts.js。系统角色分为用户和管理员。用户可以在线浏览和预订民宿,管理员登录后台进行相关管理等。(在系统…...

ROS中C++、Python完整的目录结构

文章目录 在ROS中,一个典型的C软件包目录结构通常包括以下几个主要目录: include:该目录包含C头文件(.hpp或者.h文件),用于声明类、函数、变量等。通常,这些头文件定义了ROS节点、消息类型、服务…...

Boosting原理代码实现

1.提升方法是将弱学习算法提升为强学习算法的统计学习方法。在分类学习中,提升方法通过反复修改训练数据的权值分布,构建一系列基本分类器(弱分类器),并将这些基本分类器线性组合,构成一个强分类…...

【Qt基础教程】事件

文章目录 前言事件简介事件示例总结 前言 在开发复杂的图形用户界面(GUI)应用程序时,理解和掌握事件处理是至关重要的。Qt,作为一个强大的跨平台应用程序开发框架,提供了一套完整的事件处理系统。本教程旨在介绍Qt事件处理的基础知识&#x…...

外星人Alienware m15R7 原厂Windows11系统

装后恢复到您开箱的体验界面,包括所有原机所有驱动AWCC、Mydell、office、mcafee等所有预装软件。 最适合您电脑的系统,经厂家手调试最佳状态,性能与功耗直接拉满,体验最原汁原味的系统。 原厂系统下载网址:http://w…...

stata17中java installation not found或java not recognozed的问题

此问题在于stata不知道去哪里找java,因此需要手动的告诉他 方法1: 1.你得保证已经安装并配置好java环境 2.在stata中输入以下内容并重启stata即可 set java_home "D:\Develope\JDk17" 其中java_home后面的""里面的内容是你的jdk安装路径 我的…...

Harbor本地仓库搭建003_Harbor常见错误解决_以及各功能使用介绍_镜像推送和拉取---分布式云原生部署架构搭建003

首先我们去登录一下harbor,但是可以看到,用户名密码没有错,但是登录不上去 是因为,我们用了负债均衡,nginx会把,负载均衡进行,随机分配,访问的 是harbora,还是harborb机器. loadbalancer中 解决方案,去loadbalance那个机器中,然后 这里就是25机器,我们登录25机器 然后去配置…...

怎样搭建serveru ftp个人服务器

首先说说什么是ftp? FTP协议是专门针对在两个系统之间传输大的文件这种应用开发出来的,它是TCP/IP协议的一部分。FTP的意思就是文件传输协议,用来管理TCP/IP网络上大型文件的快速传输。FTP早也是在Unix上开发出来的,并且很长一段…...

SEO是什么?SEO相关发展历史

一、SEO是什么意思? SEO(Search Engine Optimization),翻译成中文就是“搜索引擎优化”。简单来讲,seo是指自然搜索结果下获得的网站流量的技术,是可以不用花钱就可以让自己的网站有好的排名,也…...

android之WindowManager悬浮框

文章目录 阐述悬浮框的实现AndroidManifest配置使用方法 阐述 Window的类型大致分为三种: Application Window 应用程序窗口、Sub Window 子窗口、System Window 系统窗口 窗口类型图层值(type)Application Window1~99Sub Windo…...

注解详解系列 - @Scope:定义Bean的作用范围

注解简介 在今天的注解详解系列中,我们将探讨Scope注解。Scope是Spring框架中的一个重要注解,用于定义bean的作用范围。通过Scope注解,可以控制Spring容器中bean的生命周期和实例化方式。 注解定义 Scope注解用于定义Spring bean的作用范围…...

仿中波本振电路的LC振荡器电路实验

手里正好有一套中波收音机套件的中周。用它来测试一下LC振荡器,电路如下: 用的是两只中频放大的中周,初步测试是用的中周自带的瓷管电容,他们应该都是谐振在465k附近。后续测试再更换电容测试。 静态电流,0.5到1mA。下…...

Java 面试题:谈谈 final、finally、 finalize 有什么不同?

在 Java 编程中,final、finally 和 finalize 是三个看似相似但用途截然不同的关键字和方法。理解它们的区别对于编写高质量和健壮的代码至关重要。 final 关键字可用于声明常量、方法和类。用在变量上表示变量不可变,用在方法上表示方法不能被重写&#…...

45、基于深度学习的螃蟹性别分类(matlab)

1、基于深度学习的螃蟹性别分类原理及流程 基于深度学习的螃蟹性别分类原理是利用深度学习模型对螃蟹的图像进行训练和识别,从而实现对螃蟹性别的自动分类。整个流程可以分为数据准备、模型构建、模型训练和性别分类四个步骤。 数据准备: 首先需要收集包…...

mongodb嵌套聚合

db.order.aggregate([{$match: {// 下单时间"createTime": {$gte: ISODate("2024-05-01T00:00:00Z"),$lte: ISODate("2024-05-31T23:59:59Z")}// 商品名称,"goods.productName": /美国皓齿/,//订单状态 2:待发货 3:已发货 4:交易成功…...

在 KubeSphere 上快速安装和使用 KDP 云原生数据平台

作者简介:金津,智领云高级研发经理,华中科技大学计算机系硕士。加入智领云 8 余年,长期从事云原生、容器化编排领域研发工作,主导了智领云自研的 BDOS 应用云平台、云原生大数据平台 KDP 等产品的开发,并在…...

Dev Eco Studio设置中文界面

Settings-Plugins-installed-搜索Chinese...

vscode作为markdown LaTeX编辑器

1、安装插件 Markdown All in One 2、下载并安装 prince:Prince - Latest builds Deepin 20.9 对应 debian 10,下载 debian 10 的deb包安装即可 (安装后命令在 /usr/bin 下) 3、安装插件 Markdown Preview Enhanced&#xff…...

Java中的图形用户界面开发

Java中的图形用户界面开发 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在当今软件开发的世界中,图形用户界面(Graphical User Inte…...

android常用知识

透明activity样式: android:theme"android:style/Theme.Translucent.NoTitleBar.Fullscreen"这句代码,当你是建的empty activity project时,默认继承的是AppCompat这个类。所以在AndroidMifext.xml文件中用上述代码会导致程序错误&…...

centos中安装并设置vsftpd

vsftpd是一个可安装在linux上的ftp服务器软件。 一、安装 安装前保证服务器能上互联网。如果不能上网,看看能不能设法利用局域网代理上网。 sudo yum -y install vsftpd二、配置 1、修改配置文件 cd /etc/vsftpd #修改之前记得备份!!&am…...

C语言入门系列:指针入门(超详细)

文章目录 一,什么是指针1,内存2,指针是什么? 二,指针的声明1,声明指针类型变量2,二级指针 三,指针的计算1,两个指针运算符1.1 *运算符1.2 & 运算符1.3 &运算符与…...

打印水仙花数

题目:打印出所有的“水仙花数”,所谓“水仙花数”是指一个三位数,其各位数字立方和等于该数本身。 例如:153是一个“水仙花数”,因为153 1的三次方 +5的三次方+3的三次方。 程序分析&#xff…...

【SCAU数据挖掘】数据挖掘期末总复习题库简答题及解析——下

1.从某超市顾客中随机抽取5名,他们的购物篮数据的二元0/1表示如下: 顾客号 面包 牛奶 尿布 啤酒 鸡蛋 可乐 1 1 1 0 0 0 0 2 1 0 1 1 1 0 3 0 1 1 1 0 1 4 1 1 1 1 0 0 5 1 1 1 0 0 1 某学生依据这些数据做…...

PyQt学习之简介

1.Python图形界面称为程序的用户交互界面,英文称之为 UI (user interface) Tkinter 基于Tk的Python库,Python官方采用的标准库,优点是作为Python标准库、稳定、发布程序较小,缺点是控件相对较少。 wxPython 基于wxWidgets的Py…...

深入理解前端缓存

前端缓存是所有前端程序员在成长历程中必须要面临的问题,它会让我们的项目得到非常大的优化提升,同样也会带来一些其它方面的困扰。大部分前端程序员也了解一些缓存相关的知识,比如:强缓存、协商缓存、cookie等,但是我…...

K-means聚类算法详解与实战

一、引言 K-means聚类算法是一种无监督学习算法,旨在将数据点划分为K个不同的聚类或群组,使得同一聚类内的数据点尽可能相似,而不同聚类间的数据点尽可能不同。在图像处理、数据挖掘、客户细分等领域有着广泛的应用。本文将通过图文结合的方…...

python数据分析-糖尿病数据集数据分析预测

一、研究背景和意义 糖尿病是美国最普遍的慢性病之一,每年影响数百万美国人,并对经济造成重大的经济负担。糖尿病是一种严重的慢性疾病,其中个体失去有效调节血液中葡萄糖水平的能力,并可能导致生活质量和预期寿命下降。。。。 …...

【前端】 nvm安装管理多版本node、 npm install失败解决方式

【问题】If you believe this might be a permissions issue, please double-check the npm ERR! permissio或者Error: EPERM: operation not permitted, VScode中npm install或cnpm install报错 简单总结,我们运行npm install 无法安装吧包,提示权限问题…...

第11天:API开发与REST framework

第11天:API开发与REST framework 目标 使用Django REST framework构建RESTful API。 任务概览 学习序列化器的概念和使用方法。创建API视图和路由。实现API的权限和认证。 详细步骤 1. 学习序列化器 序列化器是Django REST framework中用于数据转换的组件&am…...

2024 年解锁 Android 手机的 7 种简便方法

您是否忘记了 Android 手机的 Android 锁屏密码,并且您的手机已被锁定?您需要使用锁屏解锁 Android 手机?别担心,您不是唯一一个忘记密码的人。我将向您展示如何解锁 Android 手机的锁屏。 密码 PIN 可保护您的 Android 手机和 G…...

利用机器学习重构视频中的人脸

引言 中国与英国的研究团队携手合作,开创了一种创新的视频面孔重塑技术。这项技术能够以极高的一致性对视频中的面部结构进行逼真的放大和缩小,且避免了常见伪影的产生。 从研究人员选取的YouTube视频样例中可见,经过处理后,女演…...

2021数学建模C题目– 生产企业原材料的订购与运输

C 题——生产企业原材料的订购与运输 思路:该题主要是通过对供应商的供货能力和运送商的运货能力进行估计,给出合适的材料订购方案 程序获取 第一题问题思路与结果: 对 402 家供应商的供货特征进行量化分析,建立反映保障企业生…...

C# OpenCvSharp 图像复制-clone、copyTo

在C#中使用OpenCvSharp库处理图像时,clone和copyTo是两个非常常用的函数。理解和合理使用这些函数可以帮助你在图像处理项目中更高效地操作图像数据。本文将详细介绍这两个函数的使用方法,并通过具体的示例来说明它们的实际应用。 1. clone 函数 定义 …...

中国投入到终止遗传性疾病的战斗

中国投入到终止遗传性疾病的战斗 编译 李升伟 于2006年6月在澳大利亚的墨尔本会议上启动的人类变异组计划(Human Variome Project,简称HVP),旨在全球范围内广泛收集所有基因和蛋白质序列变异和多态性的数据,采用全基…...

PCL common中常见的基础功能函数

文章目录 一、common模块中的头文件二、common模块中的基本函数1、angles.h2、centroid.h1)计算给定一群点的3D中心点,并且返回一个三维向量2)计算给定的三维点云的协方差矩阵。3)计算正则化的3*3的协方差矩阵以及给定点云数据的中心点4)利用一组点的指数对其进行一般的、…...

力扣每日一题 6/22 字符串/贪心

博客主页:誓则盟约系列专栏:IT竞赛 专栏关注博主,后期持续更新系列文章如果有错误感谢请大家批评指出,及时修改感谢大家点赞👍收藏⭐评论✍ 2663.字典序最小的美丽字符串【困难】 题目: 如果一个字符串满…...

MCT Self-Refine:创新集成蒙特卡洛树搜索 (MCTS)提高复杂数学推理任务的性能,超GPT4,使用 LLaMa-3 8B 进行自我优化

📜 文献卡 题目: Accessing GPT-4 level Mathematical Olympiad Solutions via Monte Carlo Tree Self-refine with LLaMa-3 8B作者: Di Zhang; Xiaoshui Huang; Dongzhan Zhou; Yuqiang Li; Wanli OuyangDOI: 10.48550/arXiv.2406.07394摘要: This pape…...

自制HTML5游戏《开心消消乐》

1. 引言 游戏介绍 《开心消消乐》是一款基于HTML5技术开发的网页游戏,以其简单的操作方式、轻松的游戏体验和高度的互动性,迅速在社交平台上获得了广泛的关注和传播。玩家通过消除相同类型的元素来获得分数,游戏设计巧妙,易于上手…...

【C++】平衡二叉树(AVL树)的实现

目录 一、AVL树的概念二、AVL树的实现1、AVL树的定义2. 平衡二叉树的插入2.1 按照二叉排序树的方式插入并更新平衡因子2.2 AVL树的旋转2.2.1 新节点插入较高左子树的左侧(LL平衡旋转)2.2.2 新节点插入较高右子树的右侧(RR平衡旋转&#xff09…...

第一百一十八节 Java面向对象设计 - Java接口

Java面向对象设计 - Java接口 什么是接口? Java中的接口定义了一个引用类型来创建抽象概念。接口由类实现以提供概念的实现。 在Java 8之前,一个接口只能包含抽象方法。 Java 8允许接口具有实现的静态和默认方法。 接口通过抽象概念定义不相关类之间…...

Flink nc -l -p 监听端口测试

1、9999端口未占用 netstat -apn|grep 99992、消息发送端 nc -l -k -p 9999 {"user":"ming","url":"www.baidu1.com", "timestamp":1200L, "score":1} {"user":"xiaohu","url":…...

在IntelliJ IDEA中使用Spring Boot:快速配置

使用IntelliJ IDEA开发Spring Boot应用程序可以极大地提高开发效率,因为IDEA提供了许多便捷的功能,比如自动补全、代码分析、热部署等。以下是一篇可能的CSDN博客文章草稿,介绍如何在IntelliJ IDEA中使用Spring Boot: 在IntelliJ …...

django filter 批量修改

django filter 批量修改 在Django中,如果你想要批量修改记录,可以使用update()方法。这个方法允许你在一个查询集上执行批量更新,而不需要为每条记录生成单独的数据库事务。 以下是一个使用update()方法批量修改记录的例子: fro…...

maven:中央仓库验证方式改变:401 Content access is protected by token

前几天向maven中央仓库发布版本,执行上传命令mvn release:perform时报错了: [ERROR] Failed to execute goal org.sonatype.plugins:nexus-staging-maven-plugin:1.6.13:deploy (injected-nexus-deploy) on project xxxxx: Failed to deploy artifacts: …...

【面试】http

一、定义 HTTP(超文本传输协议),是一种用于分布式、协作式、超媒体信息系统的应用层协议,它是万维网数据通信的基础。主要特点是无状态(服务器不会保存之前请求的状态)、无连接(服务器处理完请…...

获取泛型,泛型擦除,TypeReference 原理分析

说明 author blog.jellyfishmix.com / JellyfishMIX - githubLICENSE GPL-2.0 获取泛型,泛型擦除 下图中示例代码是一个工具类用于生成 csv 文件,需要拿到数据的类型,使用反射感知数据类型的字段,来填充表字段名。可以看到泛型…...