基于mqtt协议的locust的性能压测脚本

本文介绍如何使用Locust进行基于MQTT协议的性能压测,提供了两种不同的实现方式,旨在帮助那些想要自定义客户端压测脚本但缺乏明确指导的读者。官方文档和网络资源可能不够清晰或已过时。文章还邀请读者加入自动化测试交流群以进行更深入的学习和讨论。

基于mqtt协议的locust的性能压测脚本

很多小伙伴想自定义客户端的性能压测脚本,又不知道怎么下手,官方文档的例子有很模糊,网上的资料都是千篇一律,大多数都是互相copy的,使用的也不是最新版本的locust,我写了两个例子,希望对大家有所帮助
欢迎加入测试交流群:自动化测试-夜行者(816489363)进行交流学习QAQ
两种实现方式:
第一种:

#!/user/bin/env python
# -*- coding: utf-8 -*-

"""
------------------------------------
@Project : locust_demo
@Time    : 2020/8/12 15:32
@Auth    : chineseluo
@Email   : 848257135@qq.com
@File    : locust_mqtt_demo1.py
@IDE     : PyCharm
------------------------------------
"""
import os
import time
from locust import User, task, between, events
from paho.mqtt.client import Client


class MQTTPubClient(Client):

    def on_connect_rewrite(self, client, userdata, flags, rc):
        print("Connected with result code: " + str(rc))

    def on_message_rewrite(self, client, userdata, msg):
        print(msg.topic + " " + str(msg.payload))

    Client.on_connect = on_connect_rewrite
    Client.on_message = on_message_rewrite

    def __getattribute__(self, name):
        func = Client.__getattribute__(self, name)
        print("func:{}".format(func))

        def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = func(*args, **kwargs)
                print('*' * 100, '\n', result, '$' * 100)
            except Exception as e:
                total_time = int((time.time() - start_time) * 1000)
                events.request_failure.fire(request_type="mqtt_pub", name=name,
                                            response_time=total_time, exception=e,
                                            response_length=0
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值