200字
mqtt学习
2025-10-24
2025-10-24

简单了解一下mqtt,具体的流量格式就先不看了(

简介

MQTT(Message Queuing Telemetry Transport)是 OASIS 标准的「轻量级发布/订阅消息协议」,专为 IoT、移动、受限网络设计,报文头最小仅 2 字节,支持百万级设备并发。

MQTT中有三个角色:

  • Publisher(发布者)

  • Subscriber(订阅者)

  • Broker(消息代理,负责路由与持久化)

主题(Topic)与通配符
层级用“/”分割,支持单层“+”与多层“#”通配符,只能用于订阅,发布还是需要具体的
例:
sensor/+/temp 可匹配 sensor/01/tempsensor/02/temp
home/# 可匹配 home/living/temphome/garage/light 等任意深度

QoS 等级(Quality of Service)

  • 0:最多一次(at most once)

  • 1:至少一次(at least once)

  • 2:只有一次(exactly once,四步握手)

其他机制

  • Keep-Alive & 遗嘱消息(Last Will):检测异常掉线并自动通知

  • Retain 消息:新订阅者立刻拿到最新一条保留消息

  • TLS/SSL、用户名密码、ClientID、JWT、证书等多级安全认证

demo搭建

市面上有很多种Broker,可以看这篇文章:https://www.modb.pro/db/1744920590174724096

这是24年的数据

比较常用的是emqx和mosquitto

emqx支持高并发且有ui界面,安装使用可以直接参考官网的docker安装方式:https://www.emqx.com/zh/downloads-and-install/enterprise

emqx和mosquitto的具体区别如下:

Broker

EMQX

Mosquitto

架构

原生分布式、无主节点集群

单线程 C 程序,无集群

并发能力

单节点 400 万连接,23 节点集群 1 亿连接

单机 ≈ 10 万级连接

吞吐 (QoS0)

单节点 200 万 msg/s

单节点 ≈ 12 万 msg/s

延迟

毫秒级以内

高负载时可达秒级

资源占用

空载 200-300 MB RAM

嵌入式 10 MB 级

规则引擎

内置 SQL 流式处理,零代码过滤转储 Kafka/TSDB

无,需自己写订阅脚本

可观测性

Web Dashboard、HTTP API、Prometheus、Slow-sub 统计

$SYS 主题与日志

云原生

提供 K8s Operator、Terraform、Serverless 云服务

Docker 可用,无 Operator

协议扩展

MQTT 5.0/3.x、MQTT-SN、CoAP、MQTT over QUIC

MQTT 5.0/3.x、WebSocket

安全认证

JWT、OAuth2、LDAP、X.509、PSK、多粒度 ACL

仅用户名/密码、证书 TLS

这里就用mosquitto来进行demo的搭建,官方网站:https://mosquitto.org/

下载Windows的版本,如下

启动mosquitto.exe即可,端口默认开在1883

这里我们用python来编写发布端和订阅端,需要安装如下依赖

pip install paho-mqtt

Publisher端编写

import paho.mqtt.client as mqtt, time, random
import subprocess
client = mqtt.Client()
client.connect("localhost", 1883, 60)
while True:
    t = random.uniform(20, 30)
    proc = subprocess.run(f"echo clown", shell=True, capture_output=True)
    client.publish("demo/01/temp", f"{proc.stdout.decode()}", qos=1)
    time.sleep(5)

这里的流程就是new一个client,然后连接到broker,然后发布到对应主题

Subscriber端编写

import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code", rc)
    client.subscribe("demo/+/temp")          # 通配符订阅

def on_message(client, userdata, msg):
    print(f"{msg.topic}  {msg.payload.decode()}")

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()

这里的流程就是新建一个client,然后实现on_connect和on_message方法即可,message方法就是用来接收订阅频道消息的

参考

http://www.ba1100n.tech/iot_security/%E5%85%A5%E9%97%A8mqtt%E6%BC%8F%E6%8C%96%EF%BC%9A%E4%BB%8E%E6%A6%82%E5%BF%B5%EF%BC%8C%E5%BA%95%E5%B1%82%E5%8E%9F%E7%90%86%E5%88%B0rce/

https://www.emqx.com/zh/downloads-and-install/enterprise?os=Docker

https://www.yuque.com/hxfqg9/iot/pqfymw

评论