wordpress登陆logo修改/seo网络推广专员招聘
一、背景简介
业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。
目前我们只是先简单的进行了一个性能场景的测试,就是评估目前服务是否能够支持,预期的最大同时在线车辆上传数据。经过评估,在线车辆数据按照预期的10倍来进行的,并且后面增加持续运行12h查看服务链路的稳定性。
本篇并不是一个严谨的性能测试过程结果分享,主要是分享下关于mqtt协议服务的压测脚本的编写。因为之前我也没接触过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记录一下,仅供参考。
捋一下链路就知道需要生成哪些数据(因为服务还未上线使用,所以产生的压测数据后面可以直接清理掉即可。):
- 一些前置数据:比如数据库、缓存里涉及到的车辆数据,通信秘钥数据等等,这些可以之前写脚本一次性生成即可。
- 车辆上报的数据:车辆上报到云端的数据,是经过一系列加密转码,期间还要设计到解密等,这个经过评估,可以简化其中的某些环境,所以所有的车可以直接发送相同的数据即可。
- 车辆数据:最后就是生成对应的车辆数据,同时在线,按照评估的频率发送数据。
其中第1、2的数据在之前针对性的分别生成即可,第3步的车辆发送数据就是压测脚本要干的事情了。
二、技术选型
这个倒是很快,搜索引擎大概搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要麻烦的多。
所以就继续编码好了,仍然首选python,想到了locust
库,后来看官方文档的时候,看到locust
也针对mqtt
协议拓展了一些内容。但是我尝试下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开始编写了。
搜索中又发现Python
中用于mqtt
协议的库叫paho.mqtt
,支持连接代理,消息的订阅、收发等等,于是最后确定使用:locust
+paho.mqtt
的组合来实现本次的负载脚本。
三、代码编写
1. 脚本代码
暂时没做代码分层,目前场景简单,就直接都放一个模块里了,有点长,先贴上来,后面部分会对脚本的重点内容进行拆解。
脚本目前做了这些事情:
- 从db中查询有效可用的所有测试车辆信息数据
- 根据命令行的输入参数,指定启动的车辆数,以及与broker代理建立连接的频率
- 建立连接成功的车辆,就可以根据脚本里指定的频次,来像broker发送数据
- 脚本统计连接数、请求数、响应时间等信息写到报表中
- 调试遇到车辆会批量断开连接的情况,增加了当车辆断开连接时,把断开时间、车辆信息写到本地csv中,方便第二天来查看分析。
import csv
import datetime
import queue
import os
import sys
import time
import sslfrom paho.mqtt import client as mqtt_client# 根据不同系统进行路径适配
if os.name == "nt":path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))sys.path.insert(0, path)from GB_test.utils.mysql_operating import DB
elif os.name == "posix":sys.path.append("/app/qa_test_app/")from GB_test.utils.mysql_operating import DBfrom locust import User, TaskSet, events, task, between, run_single_userBROKER_ADDRESS = "broker服务地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000 # 超时时间
TEST_TOPIC = "test_topic"TEST_VALUE = [16, 3, -26, 4, 0, 36,.......] # 用来publish的测试数据,仅示意BYTES_DATA = bytes(i % 256 for i in TEST_VALUE) # 业务需要转换成 byte 类型后再发送# 创建队列
client_queue = queue.Queue()# 连接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:# 把可用的车辆信息存到队列中去client_queue.put(t)def fire_success(**kwargs):"""请求成功时调用"""events.request.fire(**kwargs)def calculate_resp_time(t1, t2):"""计算响应时间"""return int((t2 - t1) * 1000)class MQTTMessage:"""已发送的消息实体类"""def __init__(self, _type, qos, topic, payload, start_time, timeout):self.type = _type,self.qos = qos,self.topic = topicself.payload = payloadself.start_time = start_timeself.timeout = timeout# 统计总共发送成功的消息数量
total_published = 0
disconnect_record_list = [] # 定义存放连接断开的记录的列表容器class PublishTask(TaskSet):@taskdef task_publish(self):self.client.loop_start()topic = TEST_TOPICpayload = BYTES_DATA# 记录发送的开始时间start_time = time.time()mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)published_mid = mqtt_msg_info.mid# 将发送成功的消息内容,放入client实例的 published_message 字段self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,0,topic,payload,start_time,PUBLISH_TIMEOUT)# 发送成功回调self.client.on_publish = self.on_publish# 断开连接回调self.client.on_disconnect = self.on_disconnect@staticmethoddef on_disconnect(client, userdata, rc):""" broker连接断开,放入列表容器"""disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]disconnect_record_list.append(disconnected_info)print("rc状态:{} - -".format(rc), "{}-broker连接已断开".format(str(client._client_id)))@staticmethoddef on_publish(client, userdata, mid):if mid:# 记录消息发送成功的时间end_time = time.time()# 从已发送的消息容器中,取出消息message = client.published_message.pop(mid, None)# 计算开始发送到发送成功的耗时publish_resp_time = calculate_resp_time(message.start_time, end_time)fire_success(request_type="p_success",name="client_id: " + str(client._client_id),response_time=publish_resp_time,response_length=len(message.payload),exception=None,context=None)global total_published# 成功发送累加1total_published += 1class MQTTLocustUser(User):tasks = [PublishTask]wait_time = between(2, 2)def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)# 从队列中获取客户端 username 和 client_idcurrent_client = client_queue.get()self.client = mqtt_client.Client(current_client[1])self.client.username_pw_set(current_client[0], PASSWORD)# self.client.username_pw_set(current_client[0] + "1", PASSWORD) # 模拟client连接报错# 定义一个容器,存放已发送的消息self.client.published_message = {}def on_start(self):# 设置tlscontext = ssl.SSLContext(ssl.PROTOCOL_TLS)self.client.tls_set_context(context)self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)self.client.on_connect = self.on_connectdef on_stop(self):print("publish 成功, 当前已成功发送数量:{}".format(total_published))if len(disconnect_record_list) == 0:print("无断开连接的client")else:# 把断开记录里的信息写入csvwith open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:writer = csv.writer(csvfile)writer.writerow(['client_id', 'rc_status', 'disconnected_time'])for i in disconnect_record_list:writer.writerow(i)print("断开连接的client信息已写入csv文件")@staticmethoddef on_connect(client, userdata, flags, rc, props=None):if rc == 0:print("rc状态:{} - -".format(rc), "{}-连接broker成功".format(str(client._client_id)))fire_success(request_type="c_success",name='count_connected',response_time=0,response_length=0,exception=None,context=None)else:print("rc状态:{} - -".format(rc), "{}-连接broker失败".format(str(client._client_id)))fire_success(request_type="c_fail",name="client_id: " + str(client._client_id),response_time=0,response_length=0,exception=None,context=None)if __name__ == '__main__':run_single_user(MQTTLocustUser)
2. 代码分析-locust库部分
并发请求能力还是使用的locust
库的能力。官方只提供了http
协议接口的相关类,没直接提供mqtt
协议的,但是我们可以按照官方的规范,自定义相关的类,只要继承User
和TaskSet
即可。
User类
首先是先定义User
类,这里就是用来生成我要用来测试的车辆。
类初始化的时候,黄色框里,会去队列里取出车辆信息,用来做一些相关的设置。client
来源于from paho.mqtt import client as mqtt_client
提供的能力,固定用法,按照人家的文档使用就行。
红色框里,是User
类的2个重要熟悉属性:
tasks
: 这里定义了生成的用户需要去干哪些事情,也就是对应脚本里的PublishTask
类下面定义的内容。wait_time
: 用户在执行task时间隔停留的时间,可以是个区间,在里面随机。我这里意思是每2s发送一次数据到broker。
绿色框里,定义了一个字典容器,用来存放当前用户已发送成功的消息内容,因为后面我要取出来把里面相关的数据写到生成的报表中去。
蓝色框里有2个方法,也是locust
提供的能力:
-
on_start
:当用户开始运行时调用,这里我做了车辆连接broker代理的处理,注意这里需要设置tls,因为服务连接需要。 -
on_stop
:当用户结束运行时调用,这里我做了一些其他的处理,比如把运行期间断开连接的车辆信息写到本地csv中。
TaskSet类
定义好User
类,就需要来定义TaskSet
类,你得告诉产生出来的用户,要干点啥。
我这根据业务需要,就是让车辆不停的像broker发送数据即可。
红色部分,同样是paho.mqtt
提供的能力,会启动新的线程去执行你定义的事情。
黄色部分,就是做发送数据的操作,并且我可以拿到一些返回,查看源码就可以知道返回的是MQTTMessageInfo
类。
注意返回的2个属性:
mid
: 返回这个消息发送的顺序rc
: 表示发送的响应状态,0 就是成功
绿色部分,还记得我在上面的User
类中定义了一个容器,在这里就把发送的消息相关信息放到容器中去,留着后面使用。
2. 代码分析-paho.mqtt库部分
上面的代码已经用到了不少paho.mqtt
的能力,这里再进行整体梳理下。
- client.Client():声明一个client
- client.username_pw_set(): 设置客户端的用户名,密码
- client.tls_set_context: 设置ssl模式
- client.connect(): 连接代理
- client.publish:向代理推送消息
还用到了一些回调函数:
- on_connect:连接操作成功时回调
- on_publish:发布成功时回调
- on_disconnect:客户端与代理断开连接时回调
另外还用到了一个事件函数events.request
。
当客户端发送请求时会调用,不管是请求成功还是请求失败;当我需要自定义我的报告内容时,就需要用到这个event
。
查看源码,知道里面要传哪些参数,那我们在调用时候就需要传入对应的参数。
比如我在发送回调函数里调用了该方法。
所以最后在控制台显示的报告里就有我定义的内容了。
由于后来在使用中发现,不知道会在什么时候出现批量断开的情况,于是在on_disconnect
回调函数里增加了对应处理,把相关的断开信息记录下来,运行结束的时候写到本地文件里去。
后来我主动尝试客户端断开的情况测试了下文件的写入结果,功能正常。
三、小结
后面就开始运行了,在运行过程中,开发关注链路服务的各项指标,这里就不展开了,业务缠身就并没有过多的去做这个事情,况且也不专业。确实也发现了不少问题,后面逐步优化,再继续测试。
现在稳定运行12h,服务正常,暂时就先告一段落了。后面还有会相关其他性能测试场景,届时就可以针对性的展开分享下了。
另外,这个脚本分享也只是仅供参考,现在我这是使用简单,本着能用就行,可能存在一些不合理需要优化的地方,有需要的朋友还请自行查阅相关文档。
相关文章:

基于Locust实现MQTT协议服务的压测脚本
一、背景简介 业务背景大概介绍一下,就是按照国标规定,车辆需要上传一些指定的数据到ZF的指定平台,同时车辆也会把数据传到企业云端服务上,于是乎就产生了一些性能需求。 目前我们只是先简单的进行了一个性能场景的测试…...

AURIX TC3XX Cached PFLASH与Non-Cached PFLASH的区别
Cached ? Non-Cached? 在阅读TC3XX的用户手册时,在内存映射表中,有两个segment都是Program Flash,而且大小都一样是3M,一个是segment 8 另一个是segment10 这难免让人产生疑惑,二者区别在哪? …...

uniapp开发小程序-显示左滑删除效果
一、效果图: 二、代码实现: <template><view class"container"><view class"myorderList"><uni-swipe-action><uni-swipe-action-item class"swipe-action-item" :right-options"option…...

FPGA 的数字信号处理:Verilog 实现简单的 FIR 滤波器
该项目介绍了如何使用 Verilog 实现具有预生成系数的简单 FIR 滤波器。 绪论 不起眼的 FIR 滤波器是 FPGA 数字信号处理中最基本的模块之一,因此了解如何将具有给定抽头数及其相应系数值的基本模块组合在一起非常重要。因此,在这个关于 FPGA 上 DSP 基础…...

使用粒子群优化算法(PSO)辨识锂电池二阶RC模型参数(附MATLAB代码)
目录 一、原理部分 二、代码详解部分 三、结果及分析 一、原理部分 PSO算法由美国学者于 1995 年提出,因其算法简单、效果良好,而在很多领域得到了广泛应用。该算法的起源是模拟鸟群的觅食过程,形成一种群体智能搜索算法。 其核心是&#…...

如何利用地面控制点实现倾斜摄影三维模型数据的几何坐标变换和纠正?
如何利用地面控制点实现倾斜摄影三维模型数据的几何坐标变换和纠正? 倾斜摄影是一种在空中拍摄地表物体的技术,可以获得高分辨率、高精度的三维模型数据,广泛应用于城市规划、建筑设计、土地管理等领域。然而,由于航拍时无法避免姿…...

设计规则之里氏替换原则
tip: 作为程序员一定学习编程之道,一定要对代码的编写有追求,不能实现就完事了。我们应该让自己写的代码更加优雅,即使这会费时费力。 相关规则: 推荐:体系化学习Java(Java面试专题) 1.6大设…...

【叠高高】叠蛋糕游戏的微信小程序开发流程详解
记得小时候玩过的搭积木游戏吗,和叠高高游戏原理差不多的,与之类似的还有盖高楼游戏,就是看谁盖的(叠的)最高,这里讲一下比较基础的叠高高游戏小程序实现过程,对编程感兴趣的同学可以参考学习一…...

收集关键词的方法有哪些?(如何查找精准的行业流量关键词)
关键词的收集通常可以通过以下几种方法: 关键词收集方法 1.根据市场价值、搜索词竞争性和企业实际产品特征进行筛选:确定您的关键词列表之前,建议先进行市场分析,了解您的竞争对手、行业状况和目标受众等信息,以更好地了解所需的特…...

【GreenDao】RxQuery查询并修改GreenDao数据库,完成后更新UI
GreenDao是一个轻量级的ORM(对象关系映射)数据库,而RxJava是一个响应式编程库,可以帮助我们更轻松地处理异步事件。在 Android 应用程序中,您可以使用这两个库一起处理数据库查询和更新,并使用观察者模式来…...

Modifier ‘public‘ is redundant for interface methods错误
java中接口的方法默认是 public abstract 的 所以放心的删掉public即可,如果改为protected 或者 private还会报错 接口的方法及变量的默认修饰符 1.接口中每一个方法也是隐式抽象的,接口中的方法会被隐式的指定为 public abstract (只能是 public abst…...

Redis缓存击穿及解决问题
缓存击穿的意思是对于设置了过期时间的key,缓存在某个时间点过期的时候,恰好这时间点对这个 Key有大量的并发请求过来,这些请求发现缓存过期- -般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把DB压垮。 解决方案有两种…...

环境感知算法——2.CenterNet基于KITTI数据集训练
1. CenterNet简介 CenterNet采用了一种新的检测思路,即以目标中心点为基础,直接回归出目标的位置和大小。而传统的目标检测算法通常会先产生大量候选框(Anchor),再通过分类器进行筛选,这种方法比较复杂。C…...

JUC 高并发编程基础篇
JUC 高并发编程基础篇 • 1、什么是 JUC • 2、Lock 接口 • 3、线程间通信 • 4、集合的线程安全 • 5、多线程锁 • 6、Callable 接口 • 7、JUC 三大辅助类: CountDownLatch CyclicBarrier Semaphore • 8、读写锁: ReentrantReadWriteLock • 9、阻塞队列 • 10、ThreadPo…...

【十二】设计模式~~~行为型模式~~~命令模式(Java)
命令模式-Command Pattern【学习难度:★★★☆☆,使用频率:★★★★☆】 1.1. 模式动机 在软件设计中,我们经常需要向某些对象发送请求,但是并不知道请求的接收者是谁,也不知道被请求的操作是哪个…...

可再生能源的不确定性和储能系统的时间耦合的鲁棒性和非预期性区域微电网的运行可行性研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...

Revit中如何使创建的族文件内存变小
族文件的大小直接影响到项目文件的大小和软件运行速度,如何将族文件做的最小并且满足项目需求呢? 方法一:清除未使用项 1. 族制作完成可以把族文件中未用到的外部载入族或其他多余数据删掉,点击“管理”选项卡下拉的“清除未使用项”命令; 2…...

ClassLoader源码
介绍 ClassLoader 顾名思义就是类加载器 ClassLoader 是一个抽象类 没有父类 作用 1.负责将 Class 加载到 JVM 中 2.审查每个类由谁加载(父优先的等级加载机制) 3.将 Class 字节码重新解析成 JVM 统一要求的对象格式 常量&变量 //注册本地方法…...

Kafka分区消息积压排查指南
针对某个TOPIC只有几个分区积压的场景,可以采用以下方法进行排查: 消息生产是否指定key? 如果指定了消息key,那么消息会指定生产到hash(key)的分区中。如果指定了key,那么有下列几种可能: 生产该key的消息体…...

数据库 期末复习(4) 概念数据库的设计
第一部分 为啥要引入概念数据库 感觉只有一个重点 实体联系模型----ER模型 第二部分-----实体联系模型 这个例子可以全看完之后再来看 举个例子:根据COMPANY数据库的需求来构造数据库模式:The company is organized into DEPARTMENTs. Each department has a name, number …...

WuThreat身份安全云-TVD每日漏洞情报-2023-05-26
漏洞名称:Barracuda Email Security Gateway TAR文件命令注入 漏洞级别:严重 漏洞编号:CVE-2023-2868,CNNVD-202305-2128 相关涉及:Barracuda Email Security Gateway 5.1.3.001 漏洞状态:在野 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_IDTVD-2023-12949 漏洞名称…...

关于Idea的一些常用设置项
1. 输出中文不乱码 设置工程项目编码 file -> settings -> Editor -> File Encodings-> 如下图通通UTF-8 2. 创建文件自动设置本文模板 File–>settings–>Editor–>File and Code Templates–>Includes -> 输入类注释模板 /*** Classname ${N…...

Python使用WMI模块获取Windows系统的硬件信息,并使用pyinstaller库编译打包成exe的可执行文件
引言 今天给大家分享一篇有关Python和Windows系统的文章。你有没有想过如何获取Windows系统的硬件信息?或者你是否曾经尝试过将Python脚本编译打包成可执行文件?如果你对这些问题感兴趣,那么这篇文章一定适合你。 背景 由于公司现阶段大多…...

JavaScript语句(七)
JavaScript语句 1、条件语句2、循环语句3、break 和 continue 语句4、异常处理语句4.1、抛出异常4.2、捕获异常4.3、处理异步代码块中的异常4.3.1、Promise4.3.2、async/await try-catch 4.4、处理未捕获的异常4.5、总结 1、条件语句 名称描述if当指定条件为 true 时…...

孪生诱捕网络在欺骗防御领域的应用
信息化在给人们带来便利的同时,网络信息安全问题也日益凸显。尤其是随着网络攻击由传统的盲目、直接、粗暴的方式转变为目前的精确化、持久化、隐匿式的恶意攻击,攻击过程中只需发现并利用一个未被修复的漏洞或不安全配置即可击破边界防御,试图将攻击者拒之门外的安全防护方…...

【性能测试】Jenkins+Ant+Jmeter自动化框架的搭建思路
前言 前面讲了Jmeter在性能测试中的应用及扩展。随着测试的深入,我们发现在性能测试中也会遇到不少的重复工作。 比如某新兴业务处于上升阶段,需要在每个版本中,对某些新增接口进行性能测试,有时还需要在一天中的不同时段分别进行…...

ARM体系结构与异常处理
目录 一、ARM体系架构 1、ARM公司概述 ARM的含义 ARM公司 2.ARM产品系列 3.指令、指令集 指令 指令集 ARM指令集 ARM指令集 Thumb指令集 (属于ARM指令集) 4.编译原理 5.ARM数据类型 字节序 大端对齐 小端对齐 …...

招聘网站—Hive数据分析
招聘网站—Hive数据分析 第1关:统计最热门的十种职业(招聘人数最多) #进入hive hive#在hive中创建数据库 mydb create database mydb;#使用数据库 mydb use mydb;#创建表 recruitcleaned 并使用"/t"分割字段 create table re…...

双指针滑动窗口整理1——长度最小的子数组、水果成篮
209. 长度最小的子数组 这篇文章主要是想针对这题 209. 长度最小的子数组,总结一下双指针或是滑动窗口的小细节。对于暴力算法,我们就不再阐释了。 算法原理: 滑动窗口主要是通过控制循环终止节点j,并移动i来缩放窗口。具体而言…...

textarea之换行、replace、\n、br、innerHTML
文章目录 前言换行符介绍JavaScript部分html部分 前言 textarea标签本身不识别换行功能,回车换行用的是\n换行符,输入时的确有换行的效果,但是渲染时就只是一个空格了。这时就需要利用换行符\n和br标签的转换进行处理。 换行符介绍 表格 序…...