Add ch32v208 OTA readme

This commit is contained in:
songyanguang 2025-06-19 14:10:56 +08:00
parent fc82573bbe
commit 84e4f2c3a1
15 changed files with 340 additions and 0 deletions

View File

@ -0,0 +1,86 @@
# 编译
路径 Ubiquitous/XiZi_IIoT 下,执行 make BOARD=ch32v208rbt6 menuconfig打开config 选项:
.config - XiZi_IIoT Project Configuration
> Tool feature > OTA function > Enable support OTA function > Compile bootloader bin or application bin.
![alt text](img/boot_or_app.png)
.config - XiZi_IIoT Project Configuration
> APP_Framework > app lib > lib using MQTT > Enable support MQTT function > xiuos platform mqtt connection parameter configuration.
![alt text](img/mqtt_config.png)
执行 make BOARD=ch32v208rbt6进行编译。
编译后,分别生成 XiZi-ch32v208rbt6-boot.bin 和 XiZi-ch32v208rbt6-app.bin以及 XiZi-ch32v208rbt6-boot.hex 和 XiZi-ch32v208rbt6-app.hex。
从沁恒官网([产品手册 - 南京沁恒微电子股份有限公司](https://www.wch.cn/downloads/category/27.html))下载工具软件 WCH_AssemblingFileTool.exe将 XiZi-ch32v208rbt6-boot.hex 和 XiZi-ch32v208rbt6-app.hex 合并成 XiZi-ch32v208rbt6.bin。
![alt text](img/AssemblingFileTool.png)
# 烧录
从沁恒官网下载工具软件 WchIspStudio.exe选择芯片和串口选择固件。
设备端按boot和reset按键让设备进入烧录模式。点击“解除代码保护”和“下载”进行烧录。
![alt text](img/WchIspStudio.png)
# OTA升级
烧录成功后,先启动 boot 程序,然后跳转到 app 程序 。
![alt text](img/boot_start.png)
![alt text](img/jump_to_app.png)
进入系统后初始化4G模块并开启线程进行mqtt通信可以看到此时的版本是 001.000.000。
![alt text](img/current_version.png)
然后需要进入publisher将要更新的app程序XiZi-ch32v208rbt6-app.bin放到publisher下修改main.py中的version和文件名。
然后,电脑上运行 python3 main_early.py (需pip install paho-mqtt安装依赖库)。
![alt text](img/python_early.png)
设备检测到版本信息后写入OTA状态然后重启重启后boot进入下载状态。
![alt text](img/ota_flag.png)
然后,电脑上运行 python3 main.py。
![alt text](img/python_main.png)
设备检测到版本信息后,开始传输文件。
![alt text](img/download_start.png)
当传输进度为100%后更新app的bin将烧录至APP区之后板子会自动重启。
![alt text](img/download_ok.png)
重启后依然先进入boot然后启动app。查看APP日志以及version打印信息确认升级成功。
# 注意事项
CH32V208的FLASH大小是480KB其中零等待运行区域大小是128KB。FLASH快速写函数以及系统中断等关键函数需要运行在零等待运行区域。
程序分布如下:
|address|Area|Size|
|-|-|-|
|0x08000000 ~ 0x08008000 | Boot_FAST | 32KB |
|0x08008000 ~ 0x08050000 | Application | 288KB |
|0x08050000 ~ 0x08077000 | Boot_SLOW | 156KB |
|0x08077000 ~ 0x08078000 | OTA_FLAG | 4KB |

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 80 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.8 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.5 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 11 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 22 KiB

View File

@ -0,0 +1,127 @@
import paho.mqtt.client as mqtt
import json
import os
import hashlib
# 配置参数(根据实际情况修改)
MQTT_BROKER = "47.115.50.232" # MQTT代理地址
MQTT_PORT = 1883 # 端口
FILE_PATH = "XiZi-ch32v208rbt6-app.bin" # 要发送的固件文件路径
CLIENT_ID = "D001" # 设备客户端ID
VERSION = "001.000.002" # 新固件版本号
FILE_ID = 1 # 文件标识符
def calculate_md5(file_path):
""" 计算文件的MD5哈希值 """
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def compact_json(payload: dict) -> str:
"""生成无空格/换行的紧凑JSON字符串"""
return json.dumps(payload, separators=(',', ':'))
def on_connect(client, userdata, flags, rc):
""" 连接回调函数 """
if rc == 0:
print("连接成功!")
# 订阅文件请求主题
client.subscribe("xiuosiot/ota/files")
# 发送初始更新信息
init_payload = {
"fileSize": userdata["file_size"],
"version": userdata["fw_version"],
"fileId": userdata["file_id"],
"md5": userdata["file_md5"]
}
client.publish(
topic=f"ota/{CLIENT_ID}/update",
payload=compact_json(init_payload),
qos=1
)
print(f"已发送初始化消息到 ota/{CLIENT_ID}/update")
else:
print(f"连接失败,错误码:{rc}")
def on_message(client, userdata, msg):
""" 消息到达回调函数 """
try:
payload = json.loads(msg.payload.decode())
print("收到文件请求:", payload)
# 验证必要字段
required_fields = ["clientId", "fileId", "fileOffset", "size"]
if not all(field in payload for field in required_fields):
print("错误:缺少必要字段")
return
# 验证客户端ID
if payload["clientId"] != CLIENT_ID:
print(f"忽略非目标客户端请求:{payload['clientId']}")
return
# 提取请求参数
offset = payload["fileOffset"]
request_size = payload["size"]
reply_topic = f"ota/{CLIENT_ID}/files"
# 读取文件内容
try:
with open(FILE_PATH, "rb") as f:
f.seek(offset)
file_data = f.read(request_size)
if len(file_data) == 0:
print("错误:读取到空数据或超出文件范围")
return
except Exception as e:
print(f"文件读取失败:{str(e)}")
return
# 发送文件片段
client.publish(reply_topic, file_data, qos=1)
print(f"已发送 {len(file_data)} 字节到 {reply_topic}")
except json.JSONDecodeError:
print("错误无效的JSON格式")
except Exception as e:
print(f"处理消息时出错:{str(e)}")
def main():
# 验证文件存在性
if not os.path.exists(FILE_PATH):
print(f"错误:文件 {FILE_PATH} 不存在")
exit(1)
# 计算文件参数
file_size = os.path.getsize(FILE_PATH)
file_md5 = calculate_md5(FILE_PATH)
# 创建MQTT客户端
client = mqtt.Client(userdata={
"file_size": file_size,
"fw_version": VERSION,
"file_id": FILE_ID,
"file_md5": file_md5
})
# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
try:
client.username_pw_set("ch32v208", "xiuosiot") # 取消注释并填写凭证
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_forever()
except KeyboardInterrupt:
print("\n程序已终止")
except Exception as e:
print(f"连接错误:{str(e)}")
exit(1)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,127 @@
import paho.mqtt.client as mqtt
import json
import os
import hashlib
# 配置参数(根据实际情况修改)
MQTT_BROKER = "47.115.50.232" # MQTT代理地址
MQTT_PORT = 1883 # 端口
FILE_PATH = "XiZi-ch32v208rbt6-app.bin" # 要发送的固件文件路径
CLIENT_ID = "D001" # 设备客户端ID
VERSION = "001.000.002" # 新固件版本号
FILE_ID = 1 # 文件标识符
def calculate_md5(file_path):
""" 计算文件的MD5哈希值 """
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def compact_json(payload: dict) -> str:
"""生成无空格/换行的紧凑JSON字符串"""
return json.dumps(payload, separators=(',', ':'))
def on_connect(client, userdata, flags, rc):
""" 连接回调函数 """
if rc == 0:
print("连接成功!")
# 订阅文件请求主题
client.subscribe("xiuosiot/ota/files")
# 发送初始更新信息
init_payload = {
"fileSize": userdata["file_size"],
"version": userdata["fw_version"],
"fileId": userdata["file_id"],
"md5": userdata["file_md5"]
}
client.publish(
topic=f"ota/{CLIENT_ID}/update",
payload=compact_json(init_payload),
qos=1
)
print(f"已发送初始化消息到 ota/{CLIENT_ID}/update")
else:
print(f"连接失败,错误码:{rc}")
def on_message(client, userdata, msg):
""" 消息到达回调函数 """
try:
payload = json.loads(msg.payload.decode())
print("收到文件请求:", payload)
# 验证必要字段
required_fields = ["clientId", "fileId", "fileOffset", "size"]
if not all(field in payload for field in required_fields):
print("错误:缺少必要字段")
return
# 验证客户端ID
if payload["clientId"] != CLIENT_ID:
print(f"忽略非目标客户端请求:{payload['clientId']}")
return
# 提取请求参数
offset = payload["fileOffset"]
request_size = payload["size"]
reply_topic = f"ota/{CLIENT_ID}/files"
# 读取文件内容
try:
with open(FILE_PATH, "rb") as f:
f.seek(offset)
file_data = f.read(request_size)
if len(file_data) == 0:
print("错误:读取到空数据或超出文件范围")
return
except Exception as e:
print(f"文件读取失败:{str(e)}")
return
# 发送文件片段
#client.publish(reply_topic, file_data, qos=1)
#print(f"已发送 {len(file_data)} 字节到 {reply_topic}")
except json.JSONDecodeError:
print("错误无效的JSON格式")
except Exception as e:
print(f"处理消息时出错:{str(e)}")
def main():
# 验证文件存在性
if not os.path.exists(FILE_PATH):
print(f"错误:文件 {FILE_PATH} 不存在")
exit(1)
# 计算文件参数
file_size = os.path.getsize(FILE_PATH)
file_md5 = calculate_md5(FILE_PATH)
# 创建MQTT客户端
client = mqtt.Client(userdata={
"file_size": file_size,
"fw_version": VERSION,
"file_id": FILE_ID,
"file_md5": file_md5
})
# 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
try:
client.username_pw_set("ch32v208", "xiuosiot") # 取消注释并填写凭证
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_forever()
except KeyboardInterrupt:
print("\n程序已终止")
except Exception as e:
print(f"连接错误:{str(e)}")
exit(1)
if __name__ == "__main__":
main()