本帖最后由 lovzx 于 2024-9-14 12:01 编辑
【Ai-WB2高级篇】MQTT 协议连接
MQTT
MQTT(Message Queuing Telemetry Transport)是一种轻量级适用于资源受限的设备下使用,特别是嵌入式领域
MQTT特点
- 轻量级:MQTT开销低、报文小非常适合资源受限的设备
- 可靠:MQTT支持多种QoS等级、会话感知和持久连接
- 安全通信: 支持TLS和SSL加密功能,还可以通过用户名/密码方式进行认证
- 双向通信: MQTT基于**发布-订阅**模式为设备之间提供了无缝双向通信,既可以发布消息也可以订阅消息,无须设备之间相互耦合,易于扩展
- 大规模物联网设备支持:MQTT轻量级、低带宽消耗,通过发布-订阅模式解耦设备与设备之间的关系,从而有效的网络流量和资源的使用
- 语言支持: 编程语言支持多
MQTT基本组件
发布订阅模式: 为了解决物联网设备之间的解耦问题,MQTT采用了发布-订阅模式,与传统的客户端-服务端的模式不同,MQTT发布者和订阅者无须知道对方的存在,无须建立直接的连接,通过MQTT Broker来负责消息的路由和分发
MQTT设备将温度数据发布到指定的Temperature主题,MQTT Broker收到消息后会将改数据转发给该主题的订阅者
主题topic
MQTT通过topic对发布和订阅的客户端进行管理,类似于分组,发布者可以发送订阅topic的所有组内成员,topic通过/来分割,支持+和#两种通配符
- +: 表示单层匹配,例如:/a/+,匹配/a/b,和a/c
- #: 表示多层匹配,例如:a/#,匹配/a/x,/a/b/c
需要注意的是通配符只能用于订阅,不能用于发布
服务质量 QoS
为了适应不同的网络环境,MQTT提供了三种服务质量等级
- QoS 0: 消息最多传送一次。如果客户端不可用,将丢失这条消息
- QoS 1:消息至少传送一次,保证可以收到消息,但可能会重复
- QoS 2: 消息只传递一次,保证即不丢失也不重复
QoS等级从低到高消息可靠性增高,传输复杂度也提升
MQTT工作流程
客户端通过TCP/IP协议与MQTT Broker建立连接,可以选择TLS/SSL加密来实现,建立连接后既可以发布指定的主题也可以定于指定的主题,客户端发布时消息需要指定topic和QoS,Broker收到消息后会根据不同的QoS等级按照不通过的策略将消息分发给该主题的订阅者们
MQTT客户端
MQTTX是一款免费开源的全平台客户端,完全兼容MQTT协议MQTT官网
免费公共MQTT服务器
MQTT免费服务器
服务器信息
- 地址:broker.emqx.io
- mqtt端口:1883
- ws端口:8083
- mqtts端口:8883
- wss端口:8084
证书文件可以从官方手动下载
连接成功后可以设置订阅内容或者发送消息
代码编写
MQTT需要用到lwip和网络相关的组件,需要在Makefile配置文件中添加相关的依赖
- COMPONENTS_BLSYS := bltime blfdt blmtd bl_os_adapter rfparam_adapter_tmp bl602_os_adapter bl602 bl602_std
- COMPONENTS_LOOPS := bloop loopadc looport yloop loopset looprt
- COMPONENTS_VFS := romfs
- COMPONENTS_NETWORK := sntp dns_server lwip lwip_dhcpd
- COMPONENTS_MQTT := axk_common http-parser axk_tls axk_mqtt wpa_supplicant tcp_transport blcrypto_suite
- COMPONENTS_WIIF := wifi wifi_manager wifi_hosal
- INCLUDE_COMPONENTS += freertos_riscv_ram newlibc
- INCLUDE_COMPONENTS += mbedtls_lts vfs hosal coredump blog
- INCLUDE_COMPONENTS += utils cli cjson
- INCLUDE_COMPONENTS += $(COMPONENTS_LOOPS)
- INCLUDE_COMPONENTS += $(COMPONENTS_BLSYS)
- INCLUDE_COMPONENTS += $(COMPONENTS_VFS)
- INCLUDE_COMPONENTS += $(COMPONENTS_NETWORK)
- INCLUDE_COMPONENTS += $(COMPONENTS_WIIF)
- INCLUDE_COMPONENTS += $(COMPONENTS_MQTT)
- INCLUDE_COMPONENTS += $(PROJECT_NAME)
复制代码
在获取到IP地址的地方开始mqtt的连接,使用的是sdk里面提供的axk_mqtt组件,其配置连接代码如下
- axk_mqtt_client_config_t mqtt_cfg = {
- .uri = "ws://broker.emqx.io:8083/mqtt",
- .event_handle = event_cb,
- //指定客户端名称
- .client_id = "axk mqtt",
- //设置服务器证书文件
- // .cert_pem = CA_CRT,
- // .cert_len = strlen(CA_CRT) + 1,
- // .username = "wb2",
- // .password = "wb2",
-
- //客户端证书
- // .client_cert_pem = CLIENT_CA,
- // .client_key_pem = CLIENT_KEY,
-
- };
- axk_mqtt_client_handle_t client = axk_mqtt_client_init(&mqtt_cfg);
- axk_mqtt_client_start(client);
复制代码
可以通过event->event_id来判断消息的类型,常用的的类型有一下几种
- MQTT_EVENT_CONNECTED:连接到mqtt服务器
- MQTT_EVENT_DISCONNECTED:断开连接
- MQTT_EVENT_SUBSCRIBED:订阅成功
- MQTT_EVENT_PUBLISHED:发送成功
- MQTT_EVENT_DATA:接收到消息
MQTT_EVENT_DATA用来接收消息,通过event->topic来判断是什么类型的消息,从而做出对应的操作,event->data是消息的内容
常用的函数
- //初始化mqtt配置,并返回一个client对象
- axk_mqtt_client_handle_t axk_mqtt_client_init(const axk_mqtt_client_config_t *config);
- //重新连接
- axk_err_t axk_mqtt_client_reconnect(axk_mqtt_client_handle_t client);
- //断开连接
- axk_err_t axk_mqtt_client_disconnect(axk_mqtt_client_handle_t client);
- //设置client连接地址
- axk_err_t axk_mqtt_client_set_uri(axk_mqtt_client_handle_t client, const char *uri);
- //开始启动mqtt client
- axk_err_t axk_mqtt_client_start(axk_mqtt_client_handle_t client);
- //设置client配置
- axk_err_t axk_mqtt_set_config(axk_mqtt_client_handle_t client, const axk_mqtt_client_config_t *config);
- //订阅主题
- int axk_mqtt_client_subscribe(axk_mqtt_client_handle_t client, const char *topic, int qos);
- //取消订阅主题
- int axk_mqtt_client_unsubscribe(axk_mqtt_client_handle_t client, const char *topic);
- //发送主题消息
- int axk_mqtt_client_publish(axk_mqtt_client_handle_t client, const char *topic, const char *data, int len, int qos, int retain);
- //注册回调事件
- axk_err_t axk_mqtt_client_register_event(axk_mqtt_client_handle_t client, axk_mqtt_event_id_t event, axk_event_handler_t event_handler, void *event_handler_arg);
复制代码
案例
通过mqtt协议来控制wb2板载led灯,首先创建wb2_led.h,代码如下
- #ifndef _WB2_LED_H
- #define _WB2_LED_H
- #include <stdint.h>
- #define PIN_R 14
- #define PIN_G 17
- #define PIN_B 3
- #ifdef __cplusplus
- extern "C" {
- #endif
- /**
- * 初始化led
- * @param pin
- */
- void wb2_led_init(uint8_t pin);
- /**
- * 打开指定颜色的灯
- * @param led r、g、b
- */
- void wb2_led_open(const char* led);
- /**
- * 关闭指定颜色的灯
- * @param led r、g、b
- */
- void wb2_led_close(const char* led);
- /**
- * 获取指定灯的状态
- * @param led r、g、b
- * @return int 0 close,1open
- */
- uint8_t wb2_led_status(const char* led);
- #ifdef __cplusplus
- }
- #endif
- #endif
复制代码创建wb2_led.c,代码实现如下
- #include "wb2_led.h"
- #include <bl_gpio.h>
- #include <string.h>
- #include <stdint.h>
- #include <bl602.h>
- #include <glb_reg.h>
- #include <stdio.h>
- /**
- * bit0: b
- * bit1: g
- * bit2: r
- */
- static uint8_t rgb = 0;
- void update_status(uint8_t pin, uint8_t data)
- {
- int mask = 0;
- switch (pin)
- {
- case PIN_R:
- mask = 2;
- break;
- case PIN_G:
- mask = 1;
- break;
- case PIN_B:
- mask = 0;
- break;
- }
- if (data) {
- rgb |= 1 << mask;
- }
- else {
- rgb &= ~(1 << mask);
- }
- }
- static int get_pin(const char* led)
- {
- int pin = -1;
- if (strcmp(led, "r") == 0) {
- pin = PIN_R;
- }
- else if (strcmp(led, "g") == 0) {
- pin = PIN_G;
- }
- else if (strcmp(led, "b") == 0) {
- pin = PIN_B;
- }
- return pin;
- }
- /**
- * 打开指定颜色的灯
- * @param led r、g、b
- */
- void wb2_led_open(const char* led)
- {
- if (get_pin(led) != -1) {
- update_status(get_pin(led), 1);
- bl_gpio_output_set(get_pin(led), 1);
- }
- }
- /**
- * 关闭指定颜色的灯
- * @param led r、g、b
- */
- void wb2_led_close(const char* led)
- {
- if (get_pin(led) != -1) {
- update_status(get_pin(led), 0);
- bl_gpio_output_set(get_pin(led), 0);
- }
- }
- /**
- * 获取指定灯的状态
- * @param led r、g、b
- * @return int 0 close,1open
- */
- uint8_t wb2_led_status(const char* led)
- {
- int mask = 0;
- switch (get_pin(led))
- {
- case PIN_R:
- mask = 2;
- break;
- case PIN_G:
- mask = 1;
- break;
- case PIN_B:
- mask = 0;
- break;
- }
- return (rgb >> mask) & 0x1;
- }
- void wb2_led_init(uint8_t pin)
- {
- bl_gpio_enable_output(pin, 0, 1);
- bl_gpio_output_set(pin, 0);
- }
复制代码
main函数中调用wb2_led_init初始化led,并在mqtt连接后订阅/on/led/wb2、/off/led/wb2、以及/get/led/wb2 三个主题,然后再收到消息后去处理消息的内容,消息内容
例如:
发送主题/on/led/wb2的内容为r表示打开wb2板载的r led灯
发送主题/off/led/wb2的内容为r表示关闭wb2板载的r led灯
发送主题/get/led/wb2的内容为r表示获取wb2板载的r led灯的状态,并发送到/status/led/wb2主题,1为打开,0为关闭
main中代码如下
- #define TOPIC_ON "/on/led/wb2"
- #define TOPIC_OFF "/off/led/wb2"
- #define TOPIC_GET "/get/led/wb2"
- #define TOPIC_STATUS "/status/led/wb2"
- /**
- * 根据不同的tpioc处理事件
- * @param data
- * @param topic
- */
- void resolve_led_message(const char* data, const char* topic, axk_mqtt_client_handle_t client) {
- //打开灯
- if (strcmp(topic, TOPIC_ON) == 0) {
- wb2_led_open(data);
- }
- //关闭灯
- else if (strcmp(TOPIC_OFF, topic) == 0) {
- wb2_led_close(data);
- }
- //发送灯的状态
- else if (strcmp(TOPIC_GET, topic) == 0) {
- axk_mqtt_client_publish(client, wb2_led_status(data), TOPIC_STATUS, 1, 0, 0);
- }
- }
复制代码mqtt_data_task中创建一个queue队列接收消息并处理
- /**
- * 循环队列接收mqtt数据
- * @param argv
- */
- void mqtt_data_task(void* argv)
- {
- //创建queue
- mqtt_data_queue = xQueueCreate(10, sizeof(mqtt_data_t));
- mqtt_data_t data;
- while (pdTRUE == xQueueReceive(mqtt_data_queue, &data, portMAX_DELAY))
- {
- //接收处理消息
- resolve_led_message(data.data, data.topic, client);
- }
- }
复制代码
mqtt订阅接收消息
- switch ((axk_mqtt_event_id_t)event_id) {
- case MQTT_EVENT_CONNECTED:
- //订阅ON事件
- msg_id = axk_mqtt_client_subscribe(client, TOPIC_ON, 0);
- blog_info("sent subscribe successful, msg_id=%d", msg_id);
- //订阅OFF事件
- msg_id = axk_mqtt_client_subscribe(client, TOPIC_OFF, 0);
- blog_info("sent subscribe successful, msg_id=%d", msg_id);
- //订阅GET事件
- msg_id = axk_mqtt_client_subscribe(client, TOPIC_GET, 0);
- blog_info("sent subscribe successful, msg_id=%d", msg_id);
- break;
- case MQTT_EVENT_DATA:
- //event->data后面没有'\0'分割,所以需要手动处理
- char* data = pvPortMalloc(sizeof(char) * event->data_len + 1);
- char* topic = pvPortMalloc(sizeof(char) * event->topic_len + 1);
- mqtt_data_t* mqdata = (mqtt_data_t*)pvPortMalloc(sizeof(mqtt_data_t));
- mqdata->data = data;
- mqdata->topic = topic;
- //复制data和topic
- memset(data, '\0', event->data_len + 1);
- memset(topic, '\0', event->topic_len + 1);
- memcpy(data, event->data, event->data_len);
- memcpy(topic, event->topic, event->topic_len);
- //向queue中发送消息
- xQueueSend(mqtt_data_queue, mqdata, 0);
-
- break;
- }
复制代码
- 通过mqttx客户端设置并连接到mqtt服务器
- 添加/status/led/wb2订阅
- 发送/on/led/wb2主题消息,内容为g,可以看到开发板绿灯亮了
- 发送/get/led/wb2主题消息,内容为g,可以看到客户端收到了数据1
- 发送/off/led/wb2主题消息,内容为g,可以看到开发板绿灯灭了
- 发送/get/led/wb2主题消息,内容为g,可以看到客户端收到了数据0
限于本人能力有限难免有错误或遗漏的地方请多多包涵
具体的代码上传到gitee代码仓库
|
|