资源简介
该工程实现了MQTT,订阅消息,发布消息的功能。DEMO工程。
代码片段和文件信息
import re
import os
import sys
from socket import *
from threading import Thread
import struct
import time
msgid=-1
def get_my_ip():
s1 = socket(AF_INET SOCK_DGRAM)
s1.connect((“8.8.8.8“ 80))
my_ip = s1.getsockname()[0]
s1.close()
return my_ip
def mqqt_server_sketch(my_ip port):
global msgid
print(“Starting the server on {}“.format(my_ip))
s = None
try:
s=socket(AF_INET SOCK_STREAM)
s.settimeout(60)
s.bind((my_ip port))
s.listen(1)
qaddr=s.accept()
q.settimeout(30)
print(“connection accepted“)
except:
print(“Local server on {}:{} listening/accepting failure: {}“
“Possibly check permissions or firewall settings“
“to accept connections on this address“.format(my_ip port sys.exc_info()[0]))
raise
data = q.recv(1024)
# check if received initial empty message
print(“received from client {}“.format(data))
data = bytearray([0x20 0x02 0x00 0x00])
q.send(data)
# try to receive qos1
data = q.recv(1024)
msgid = struct.unpack(“>H“ data[15:17])[0]
print(“received from client {} msgid: {}“.format(data msgid))
data = bytearray([0x40 0x02 data[15] data[16]])
q.send(data)
time.sleep(5)
s.close()
print(“server closed“)
# this is a test case write with tiny-test-fw.
# to run test cases outside tiny-test-fw
# we need to set environment variable ‘TEST_FW_PATH‘
# then get and insert ‘TEST_FW_PATH‘ to sys path before import FW module
test_fw_path = os.getenv(“TEST_FW_PATH“)
if test_fw_path and test_fw_path not in sys.path:
sys.path.insert(0 test_fw_path)
import TinyFW
import IDF
import DUT
@IDF.idf_example_test(env_tag=“Example_WIFI“)
def test_examples_protocol_mqtt_qos1(env extra_data):
global msgid
“““
steps: (QoS1: Happy flow)
1. start the broker broker (with correctly sending ACK)
2. DUT client connects to a broker and publishes qos1 message
3. Test evaluates that qos1 message is queued and removed from queued after ACK received
4. Test the broker received the same message id evaluated in step 3
“““
dut1 = env.get_dut(“mqtt_tcp“ “examples/protocols/mqtt/tcp“)
# check and log bin size
binary_file = os.path.join(dut1.app.binary_path “mqtt_tcp.bin“)
bin_size = os.path.getsize(binary_file)
IDF.log_performance(“mqtt_tcp_bin_size“ “{}KB“.format(bin_size//1024))
IDF.check_performance(“mqtt_tcp_size“ bin_size//1024)
# 1. start mqtt broker sketch
host_ip = get_my_ip()
thread1 = Thread(target = mqqt_server_sketch args = (host_ip1883))
thread1.start()
# 2. start the dut test and wait till client gets IP address
dut1.start_app()
# waiting for getting the IP address
try:
ip_address = dut1.expect(re.compile(r“ sta ip: ([^]+)“) timeout=30)
print(“Connected to AP with IP: {}“.format(ip_address))
except DUT.ExpectTimeout:
属性 大小 日期 时间 名称
----------- --------- ---------- ----- ----
文件 237 2018-11-13 14:54 CMakeLists.txt
文件 178 2018-11-13 14:54 Makefile
文件 2282 2018-11-13 14:54 README.md
目录 0 2019-05-08 15:24 build\
目录 0 2019-05-08 15:09 build\app_trace\
文件 15982 2019-05-08 15:09 build\app_trace\app_trace.d
文件 1972 2019-05-08 15:09 build\app_trace\app_trace.o
文件 13713 2019-05-08 15:09 build\app_trace\app_trace_util.d
文件 16056 2019-05-08 15:09 build\app_trace\app_trace_util.o
文件 289 2019-05-08 15:08 build\app_trace\component_project_vars.mk
目录 0 2019-05-08 15:09 build\app_trace\gcov\
文件 14591 2019-05-08 15:09 build\app_trace\gcov\gcov_rtio.d
文件 1976 2019-05-08 15:09 build\app_trace\gcov\gcov_rtio.o
文件 13547 2019-05-08 15:09 build\app_trace\host_file_io.d
文件 1976 2019-05-08 15:09 build\app_trace\host_file_io.o
文件 22576 2019-05-08 15:09 build\app_trace\libapp_trace.a
目录 0 2019-05-08 15:09 build\app_update\
文件 294 2019-05-08 15:08 build\app_update\component_project_vars.mk
文件 16824 2019-05-08 15:09 build\app_update\esp_ota_ops.d
文件 52956 2019-05-08 15:09 build\app_update\esp_ota_ops.o
文件 53274 2019-05-08 15:09 build\app_update\libapp_update.a
目录 0 2019-05-08 15:09 build\asio\
目录 0 2019-05-08 14:42 build\asio\asio\
目录 0 2019-05-08 14:42 build\asio\asio\asio\
目录 0 2019-05-08 15:09 build\asio\asio\asio\src\
文件 63699 2019-05-08 15:09 build\asio\asio\asio\src\asio.d
文件 1740932 2019-05-08 15:09 build\asio\asio\asio\src\asio.o
文件 347 2019-05-08 15:08 build\asio\component_project_vars.mk
文件 1779596 2019-05-08 15:09 build\asio\libasio.a
目录 0 2019-05-08 15:09 build\aws_iot\
文件 232 2019-05-08 15:08 build\aws_iot\component_project_vars.mk
............此处省略2248个文件信息
- 上一篇:opengl绘制花瓶演示
- 下一篇:线性系统理论(第二版 郑大钟著 清华大学出版社
相关资源
- MQTT单片机编程小工具(技小新).zi
- MQTT客户端
- Mqtt发布与订阅功能
- STM32CubeMX通过ESP8266 AT指令MQTT上阿里云
- emqttd-windows7-v2.3.0.zip
-
MQTT单片机编程小工具(阿里云li
n - MQTT服务端
- MQTT 服务器和客户端工具及使用说明
- MQTT.fx安装包 windows64位系统
- apollo-mqtt服务器搭建
- 基于MQTT协议的物联网通信系统的研究
- esp32 qt 源码
- esp8266自动获取天气及时钟在oled显示
- MQTT固件使用指导.zip
- 移柯L206 ALIYUN_MQTT开发流程说明
- Quectel_EC2x&EG9x;&EM05;_MQTT_Application_Note
- vert.x结合springboot开发mqtt服务,真实可
- TMS MQTT_v1.1.0.2.rar
- GOOUUU-ESP32原理图
- ESP8266和MQTT协议接收平台下发命令版
- 移动onenet 基于arduino IDE 的mqtt连接
- delphi mqtt
- EC20开发资料,含源码
- mqtt发布和订阅
- mqtt订阅和发送及mqttws31.js
- esp32 sdk编程利用rmt驱动ws2812七彩灯,
- ios手机端集成mqtt接受服务器推送消息
- esp32 Windows环境工具链安装指导,
- 501222zw_mqtt_fc.zip
- thingsboard之MQTT接入TB说明0531.docx
评论
共有 条评论