021.小项目之NAND烧录工具链|千篇笔记实现嵌入式全栈/裸机篇
项目摘要 本项目实现了一种基于XModem-CRC协议的串口烧录方案,可将文件通过串口传输并写入NAND Flash。方案包含设备端裸机程序与Python上位机工具"要你命3000"。设备端新增支持任意地址读取的NAND功能,并实现XModem接收逻辑,上位机提供图形化操作界面。传输过程采用"停-等"协议,通过CRC校验保证数据可靠性。该方案为嵌入式系统开发
⚠️上位机代码仓库(要你命3000):https://gitee.com/simonchina_carel_li/killyou3000.git
Tag:0330
⚠️裸机仓库:https://gitee.com/simonchina_carel_li/mini2440-bare-metal.git
Tag:21-xmodem2nand
1. 项目简介
从这节开始,我们要以项目的方式去完成剩下的裸机编程章节。
1.1 常见的几种烧录方式
| 方式 | 说明 |
|---|---|
| 裸机烧录 | 利用芯片自带的 JTAG 调试接口,通过专用的仿真器(如 J-Link、H-JTAG)将引导程序直接写入 NOR Flash 或 NAND Flash 这种方式一般烧录Bootloader或裸机固件 |
| Bootloader 烧录 (In-System Programming, ISP) |
- USB 接口烧录(最高效) - 串口烧录(速度较慢,但几乎所有嵌入式设备都具备串口) - 网络烧录(TFTP/NFS)(如果硬件带有以太网控制器) |
| 脱机烧录/离线烧录 (Offline Programming) |
- 芯片预烧录:使用专门的烧录器/编程器,在 Flash 芯片焊接到电路板之前,先将固件批量灌入芯片中。这种方式适合极大规模生产 - SD卡/外部存储卡烧录:需要支持从 SD 卡自动引导并运行自动烧录脚本 |
我们此节实现的烧录方式,严格来说不属于上述任意一种,
我们实现在裸机固件上支持串口烧录,将文件写入Nand(类似于ISP+串口烧录)
1.2 配套上位机
为了配合设备端烧录固件,我们还会实现一个带界面的上位机软件,并支持我们的裸机测串口烧录;
在后面,随着我们实现其他的烧录方式,也会同时升级扩展上位机软件的功能!
上位机定位为一个工具箱,那就叫“要你命3000”好啦!

2. 方案设计
| 子项 | 方案/说明 |
|---|---|
| 烧录目标 | Nand |
| 传输协议 | 串口XModem |
| 上位机方案 | - 基于Python,PySide6 UI框架 - 支持多文件不同地址发送 |
| 传输约定/步骤 | 1. 上位机发送命令rx <地址>通知设备开始接收2. 设备周期性发送’C’准备接收 3. 使用XModem-CRC协议传输文件 |
| 设备端设计之应用 | - 开发cmd-cli程序,以CMD终端的方式接受控制- 增加两个CMD命令实现: 1. rx <地址>传输文件至Nand的指定地址处(先保存至RAM,然后写入Nand)2. nandread <地址> <长度>从Nand的指定地址读取指定长度的数据打印出来(用于数据验证) |
| 设备端设计之Nand | 原有的Nand功能需要增加任意地址(不要求对齐)的字节数据读取 |
| 设备端设计之xmodem传输 | 需要增加xmodem传输功能,使用CRC协议,128字节块传输;保存至SDRAM(堆)中 |
3. 增加Nand功能
打开common/nand.c,增加函数
/// @brief 读取指定字节地址的数据(带ECC校验)
/// @param byte_addr 线性字节地址(从 0 起,不用对齐到页)
/// @param buf 数据目标
/// @param size 字节数
/// @return 0 成功,-1 失败(读 ECC 不可纠正)
int nand_read_ecc_bytes(unsigned int byte_addr, unsigned char *buf, unsigned int size)
{
static unsigned char page_buf[NAND_PAGE_BYTES];
if (size == 0)
return 0;
if (buf == NULL)
return -1;
while (size > 0) {
unsigned int page = byte_addr / NAND_PAGE_BYTES;
unsigned int off = byte_addr % NAND_PAGE_BYTES;
unsigned int chunk = NAND_PAGE_BYTES - off;
if (chunk > size)
chunk = size;
if (off == 0 && chunk == NAND_PAGE_BYTES) {
if (nand_read_ecc(page, buf) != 0)
return -1;
} else {
if (nand_read_ecc(page, page_buf) != 0)
return -1;
memcpy(buf, page_buf + off, chunk);
}
byte_addr += chunk;
buf += chunk;
size -= chunk;
}
return 0;
}
我们原有的nand_read_ecc函数,是地址页对齐的页大小读取函数,
新增的nand_read_ecc_bytes是对前者的封装,以实现任意地址任意长度的读取
4. XModem复盘
Xmodem 采用“停-等”(Stop-and-Wait)协议:发送方发送一个数据包,必须等待接收方的确认信号(ACK/NAK)后,才能发送下一个包
4.1 数据包结构
标准 Xmodem 数据包长度固定为 132 字节,格式如下:
| 字段 | 长度 | 说明 |
|---|---|---|
| SOH | 1 字节 | 控制字符,固定为 0x01(Start Of Header) |
| 包序号 | 1 字节 | 从 0x01 开始,每发一包自增 1,超过 255 后回绕到 0 |
| 包序号反码 | 1 字节 | 包序号的补码(255 - 包序号),用于校验包序号准确性 |
| 数据区 | 128 字节 | 实际传输的内容。若文件不足 128 字节,通常填 0x1A (CTRL+Z) |
| 校验和 | 1 字节 | 128 字节数据的累加和(Checksum)取模 256 |
4.2 交互控制字符
Xmodem 依赖几个关键的 ASCII 字符来控制流程:
-
SOH (0x01):数据包起始。
-
ACK (0x06):接收正确,告诉发送方发下一包。
-
NAK (0x15):接收失败或校验错误,请求重发当前包。
-
EOT (0x04):发送结束,发送方告知接收方文件已传完。
-
CAN (0x18):取消传输。
-
C (0x43):用于 Xmodem-CRC 模式,接收方发送 ‘C’ 代替 NAK 来发起请求
4.3 传输流程
| 步骤 | 说明 |
|---|---|
| 0. 启动(发送侧) | 发送方向设备终端发送命令rx 0xXXXXXXXX |
| 1. 启动 | 接收方每隔几秒发送一个 NAK/‘C’(这里用CRC协议,所以是后者) |
| 告知发送方“我准备好了” | |
| 2. 发送 | 发送方收到 NAK 后,开始发送第 1 个包(SOH + 01 + FE + Data + Checksum) |
| 3. 确认 | 接收方检查校验和及包序号。若正确则回 ACK,错误则回 NAK |
| 4. 循环 | 发送方收到 ACK 后发送下一包;收到 NAK 则重发上一包 |
| 5. 结束 | 发送方发完数据后发送 EOT,接收方回 ACK,传输正式关闭 |
4.4 变种杂谈
-
Xmodem-1K:将数据区扩展为 1024 字节(SOH 变为 STX 0x02),显著提高了大文件传输效率
-
Ymodem / Zmodem:在 Xmodem 基础上支持了文件名传输、多文件传输和断点续传
5. XModem实现
新建common/xmodem-crc.c,
实现接收文件到SDRAM(malloc堆的方式)的功能,
#include "s3c2440a.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h> // 为了使用 malloc 和 free
// --- Xmodem 控制字符定义 ---
#define SOH 0x01
#define EOT 0x04
#define ACK 0x06
#define NAK 0x15
#define CAN 0x18
/* Xmodem-CRC:接收端用该字节发起同步,请求发送方使用 CRC-16(而非校验和) */
#define CRC_REQ ((uint8_t)'C') /* 0x43 */
#define PACKET_SIZE 128
/* SOH(1)+blk(1)+~blk(1)+data(128)+CRC(2) — 下标 0..132 共 133 字节 */
#define XMODEM_SOH_FRAME_BYTES (1 + 1 + 1 + PACKET_SIZE + 2)
#define MAX_RETRYS 10
static void uart_putchar(uint8_t c)
{
uart0_putc(c);
}
static int uart_getchar_timeout(uint32_t timeout_ms)
{
return uart0_getc_timeout(timeout_ms);
}
// --- CRC16 计算函数 (CCITT标准) ---
static uint16_t crc16_ccitt(const uint8_t *data, uint16_t length) {
uint16_t crc = 0;
while (length--) {
crc ^= (uint16_t)(*data++) << 8;
for (int i = 0; i < 8; ++i) {
if (crc & 0x8000)
crc = (crc << 1) ^ 0x1021;
else
crc <<= 1;
}
}
return crc;
}
// --- 清空串口接收缓冲区 ---
static void flush_uart(void) {
// while (uart_getchar_timeout(100) >= 0);
}
// --- Xmodem 接收主函数 ---
// 返回值:>0 表示成功接收的文件大小,<=0 表示失败
// 接收到预先分配好的 buffer 中,并保护不超出 max_limit
int32_t xmodem_receive_to_buffer(uint8_t *buffer, uint32_t max_limit)
{
uint8_t *dest_ptr = buffer;
uint32_t total_bytes = 0;
uint8_t packet_buf[XMODEM_SOH_FRAME_BYTES];
uint8_t expected_seq = 1;
int retry_count = 0;
int state = 0;
flush_uart();
/*
* 同步阶段 (state==0):接收端周期性向 UART **发送 0x43 ('C')**,
* 对端 Xmodem-CRC 发送程序应在收到 C 后开始发 SOH 帧。
* 进入首包后 state=1,不再发 C,只发 ACK/NAK。
*/
while (retry_count < MAX_RETRYS) {
if (state == 0) {
uart_putchar(CRC_REQ); /* 即 0x43,肉眼/逻辑分析仪可见 */
}
// printf("---1\r\n");
int c = uart_getchar_timeout(2000);
// printf("---2\r\n");
if (c < 0) { retry_count++; continue; }
if (c == CAN) {
printf("\r\n[Xmodem] Canceled by sender.\r\n");
return -1;
}
if (c == EOT) {
uart_putchar(ACK);
return total_bytes;
}
if (c == SOH) {
state = 1;
packet_buf[0] = c;
int i;
for (i = 1; i < 133; i++) {
int data_byte = uart_getchar_timeout(1000);
if (data_byte < 0) break;
packet_buf[i] = (uint8_t)data_byte;
}
if (i < 133) { flush_uart(); uart_putchar(NAK); retry_count++; continue; }
uint8_t seq = packet_buf[1];
if (seq != (uint8_t)(~packet_buf[2])) { flush_uart(); uart_putchar(NAK); retry_count++; continue; }
uint16_t calc_crc = crc16_ccitt(&packet_buf[3], PACKET_SIZE);
uint16_t recv_crc = (packet_buf[131] << 8) | packet_buf[132];
if (calc_crc != recv_crc) { flush_uart(); uart_putchar(NAK); retry_count++; continue; }
if (seq == expected_seq) {
// 【新增越界保护】检查是否超过了 malloc 的大小
if (total_bytes + PACKET_SIZE > max_limit) {
uart_putchar(CAN);
printf("\r\n[Xmodem] Error: File exceeds allocated buffer size (%lu bytes).\r\n", max_limit);
return -1;
}
// 拷贝数据到动态分配的 buffer 中
memcpy(dest_ptr, &packet_buf[3], PACKET_SIZE);
dest_ptr += PACKET_SIZE;
total_bytes += PACKET_SIZE;
expected_seq++;
} else if (seq != (uint8_t)(expected_seq - 1)) {
uart_putchar(CAN);
printf("\r\n[Xmodem] Sequence error. Aborted.\r\n");
return -1;
}
uart_putchar(ACK);
retry_count = 0;
}
}
printf("\r\n[Xmodem] Timeout or too many errors.\r\n");
return -1;
}
6. 标准库适配(_read)
因为后面我们会使用到fgets这样的标准库函数,我们之前对标准库进行适配时,预留了_read函数的存根,
int _read(int file, char *ptr, int len) { ... }
当使用fgets时,最终会调用此函数,并且file传参为0(stdin=0),
因为我们需要实现此函数,
打开common/newlib_stubs.c,
/*
* 串口终端按回车常只发 CR(\\r);fgets/gets 等要等 LF(\\n)才结束一行。
* 将 \\r 当作 \\n 交给 libc;若终端发 CRLF,则丢弃紧跟在 CR 后面的 LF。
*/
static int stdin_after_cr;
int _read(int file, char *ptr, int len)
{
if (file != 0 || len <= 0) {
return -1;
}
for (;;) {
unsigned char c = (unsigned char)uart0_getc();
if (c == '\n' && stdin_after_cr) {
stdin_after_cr = 0;
continue;
}
if (c == '\r') {
stdin_after_cr = 1;
ptr[0] = '\n';
return 1;
}
stdin_after_cr = 0;
ptr[0] = (char)c;
return 1;
}
}
把标准输入(stdin)映射到 UART 接收,只支持读 file == 0(标准输入)
做了串口换行兼容:把 '\r' 转成 '\n',并用 stdin_after_cr 跳过紧随其后的 '\n'(处理 \r\n)
这样上层看到的换行始终是单个 '\n',避免出现双换行
7. cmd-cli终端应用实现
7.1 Nand CLI功能
新建cmd-cli/nand-cli.c,
通过对nand功能和上面XModem功能的调用,实现文件传输至Nand指定位置和nand数据读取的功能,
#include "s3c2440a.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
// 定义单次接收允许的最大固件大小
// 如果你的内核或文件系统大于此值,请调大。受限于 S3C2440 的堆大小。
#define MAX_DOWNLOAD_SIZE (40 * 1024 * 1024)
extern int xmodem_receive_to_buffer(uint8_t *buffer, int max_size);
// --- 预留的 NAND 烧录接口 ---
// nand_addr: NAND 中的起始地址
// buffer: 接收到的文件数据
// file_size: 接收到的文件总大小(注意:Xmodem 可能会用 0x1A 填充尾部对齐 128 字节)
// 返回值:0表示成功,-1表示失败
static int nand_write_firmware(unsigned int nand_addr, const uint8_t *buffer, uint32_t file_size)
{
const unsigned int block_bytes = NAND_PAGE_BYTES * NAND_PAGES_PER_BLOCK;
if (buffer == NULL)
return -1;
/* 按块对齐长度;尾部补到整块,内容来自 buffer(未接收部分为未定义) */
file_size = (file_size + block_bytes - 1) / block_bytes * block_bytes;
unsigned int first_block = nand_addr / block_bytes;
for (unsigned int i = 0; i < file_size; i += block_bytes) {
unsigned int block = first_block + i / block_bytes;
if (nand_erase(block) != 0)
return -1;
for (unsigned int j = 0; j < NAND_PAGES_PER_BLOCK; j++) {
unsigned int page = block * NAND_PAGES_PER_BLOCK + j;
if (nand_write_ecc(page, &buffer[i + j * NAND_PAGE_BYTES]) != 0)
return -1;
}
}
return 0;
}
// --- 命令行解析入口 ---
// 命令行工具解析到了 rx 命令,并将后续字符串传入
void do_rx_nand(const char *addr_str)
{
unsigned int nand_target_addr;
// 1. 解析目标 NAND 地址
if (sscanf(addr_str, "0x%x", (unsigned int *)&nand_target_addr) != 1) {
printf("Error: Invalid format. Usage: rx 0xXXX (NAND Address)\r\n");
printf("Change to: rx 0x0 (NAND Address)\r\n");
nand_target_addr = 0;
}
// [重要提示] 根据K9F2G08 手册,这块 NAND 是按照 Block (128KB) 擦除的。
// 建议在这里加一个地址对齐检查,警告用户
if (nand_target_addr % (NAND_PAGE_BYTES * NAND_PAGES_PER_BLOCK) != 0) {
printf("Warning: Target address 0x%08x is not %dKB Block aligned!\r\n", nand_target_addr, NAND_PAGE_BYTES * NAND_PAGES_PER_BLOCK / 1024);
return;
}
// 2. 从堆中动态分配 SDRAM 缓存
printf("Allocating %d MB buffer in SDRAM...\r\n", MAX_DOWNLOAD_SIZE / (1024*1024));
uint8_t *rx_buffer = (uint8_t *)malloc(MAX_DOWNLOAD_SIZE);
if (rx_buffer == NULL) {
printf("Error: Malloc failed! Not enough heap memory.\r\n");
return;
}
// 3. 开始接收文件到动态分配的 rx_buffer
printf("Ready to receive via Xmodem. Dest NAND Addr: 0x%08x\r\n", nand_target_addr);
printf("Please start Xmodem sender in your terminal...\r\n");
int file_size = xmodem_receive_to_buffer(rx_buffer, MAX_DOWNLOAD_SIZE);
// 4. 判断接收结果并烧录 NAND
if (file_size > 0) {
printf("Download success! File size: %d bytes. Starting NAND flashing...\r\n", file_size);
// 【注意】如果你开启了 MMU 和 D-Cache,在把缓冲区交给 DMA 或 NAND 控制器前,
// 建议在这里调用 Cache 清理函数,例如:
// clean_dcache_range((uint32_t)rx_buffer, file_size);
// 调用你预留的 NAND 烧录接口
int ret = nand_write_firmware(nand_target_addr, rx_buffer, file_size);
if (ret == 0) {
printf("Success: Firmware flashed to NAND offset 0x%08x!\r\n", nand_target_addr);
} else {
printf("Error: NAND flashing failed!\r\n");
}
} else {
printf("Firmware download failed, aborting NAND write.\r\n");
}
// 5. 释放堆内存,防止内存泄漏
free(rx_buffer);
printf("SDRAM buffer freed.\r\n");
}
// nand 读取
int do_nand_read(unsigned int addr, uint8_t *buf, unsigned int size)
{
return nand_read_ecc_bytes(addr, buf, size);
}
7.2 CMD CLI入口程序
这里实现的功能不只是针对文件传输至nand的,而是一个集成多功能应用的命令调用终端,以后我们实现的其他项目的命令也在这里集成!
新建cmd-cli/main.c,
///@file cmd-cli.c
///@brief cmd-cli命令行工具
///@author li
///@date 2026-03-29
#include "s3c2440a.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
extern void do_rx_nand(const char *addr_str);
extern int do_nand_read(unsigned int addr, uint8_t *buf, unsigned int size);
static void helpler(void)
{
printf("命令: \r\n");
printf("\thelp - 显示帮助信息\r\n");
printf("\trx <NAND地址> - 接收文件到NAND\r\n");
printf("\tnandread <NAND地址> <字节数> - 读取NAND数据\r\n");
}
int main()
{
nand_init();
/* 串口交互:避免 stdin/stdout 全缓冲导致行编辑与提示符异常 */
setvbuf(stdin, NULL, _IONBF, 0);
setvbuf(stdout, NULL, _IONBF, 0);
printf("cmd-cli命令行工具\r\n");
char line[160];
while (1) {
printf("cmd-cli> ");
// 先清空接收和UART缓冲
memset(line, 0, sizeof(line));
uart0_getc_timeout(10);
if (fgets(line, sizeof(line), stdin) == NULL) {
continue;
}
// 回显
printf("%s", line);
char cmd[128];
char arg[128];
char arg2[128];
int n = sscanf(line, "%127s %127s %127s", cmd, arg, arg2);
if (n < 1 || cmd[0] == '\0') {
continue;
}
if (strcmp(cmd, "help") == 0) {
helpler();
continue;
}
else if (strcmp(cmd, "rx") == 0) {
if (n < 2) {
printf("用法: rx <NAND地址>\r\n");
continue;
}
do_rx_nand(arg);
}
else if (strcmp(cmd, "nandread") == 0) {
if (n < 3) {
printf("用法: nandread <NAND地址> <字节数>\r\n");
continue;
}
char *end = NULL;
unsigned long a = strtoul(arg, &end, 0);
if (end == arg || *end != '\0') {
printf("错误: 无效地址 \"%s\"\r\n", arg);
continue;
}
end = NULL;
unsigned long sz = strtoul(arg2, &end, 0);
if (end == arg2 || *end != '\0' || sz == 0) {
printf("错误: 无效字节数 \"%s\"(须为正整数)\r\n", arg2);
continue;
}
unsigned int addr = (unsigned int)a;
unsigned int size = (unsigned int)sz;
uint8_t *buf = (uint8_t *)malloc(size);
if (buf == NULL) {
printf("错误: 内存分配失败\r\n");
continue;
}
int ret = do_nand_read(addr, buf, size);
if (ret == 0) {
printf("读取成功: %d 字节\r\n", size);
for (unsigned int i = 0; i < size; i++) {
printf("%02X%s", buf[i], i % 16 == 15 ? "\r\n" : " ");
}
printf("\r\n");
} else {
printf("读取失败\r\n");
}
free(buf);
}
else {
printf("未知命令: %s\r\n", cmd);
helpler();
}
}
}
因为我们已经适配了标准库,所以我们可以使用fgets、sscanf这类的标准库函数,
程序的主要设计是,
8. 初步测试
编译make cmd-cli,烧录,运行,
我们使用支持XModem-CRC文件传输的终端工具,发送一个文件,

可以看到,传输成功完成!
因为工具默认发送的接收命令是rx 巴拉巴拉.pdf, 这不符合我们定义的rx 0xXXXX格式,我们固件端对不符合格式的参数默认当作地址0处理,所以这里会把文件默认传输到Nand的开头。
我们使用WSL命令行生成一个内容为U32数组:0x01 0x02 0x03 … 一直到10000 的二进制文件,
python3 -c "import struct; f=open('output.bin', 'wb'); [f.write(struct.pack('<I', i)) for i in range(1, 10001)]; f.close()"
通过上述方法,将此文件烧录至Nand(此时应该烧录至开头)
然后通过串口终端,我们执行nandread命令,
# 我们烧录的文件是10000个元素的U32数组,所以实际长度为40000
nandread 0 40000
得到结果,
可以看到,内部排列确实是依次递增,并且最后一个数是10 27 00 00, 0x2710十进制为10000,
nandread功能验证OK!
9. 上位机软件实现(KillYou3000)
现在我们要打造一个瑞士军刀式的PC软件,用于测试XModem文件传输功能,并且后续我们开发其它功能都在此基础上进行扩展!
既然是瑞士军刀式的工具箱软件,那就起名为【**要你命3000】**好了!
9.1 方案设计
| 项 | 设计 |
|---|---|
| 功能 | - 串口连接(可设置波特率) - 串口终端显示- XModem-CRC协议传输 - 文件多选并且可分别设置地址 - 显示传输进度 |
| 开发语言和环境 | Python + Cursor(或VSCode)+ venv虚拟环境 |
| 使用框架/组件 | - UI框架使用PySide6 - 串口通信使用pyserial - 打包使用pyinstaller |
| UI开发模式 | .ui可视化编辑界面 → uic.exe工具转换成.py文件 |
9.2 项目环境搭建
新建目录KillYou3000,
打开Powershell,执行python -m venv .venv,
自动生成.venv虚拟环境目录,
Vscode打开此目录,
确保已安装Python和QT插件,
设置项目的解析器为.venv目录下的,
打开终端,此时应该自动进入虚拟环境,
终端执行命令安装必要插件,
pip install PySide6 pyserial pyinstaller
安装完成后,
下面设置QT插件,
找到Qt-ui设置项,设置UIC工具路径为.venv目录下的,
这个是将.ui文件用designer工具打开的路径设置
9.3 UI设计
执行插件命令,
创建一个UI文件,
点击此ui文件,显示此界面,
选择用Designer打开,然后进行界面编辑,
9.4 ui转py
界面编辑完成之后,
终端执行转换命令,
即可得到py文件
9.5 XModem功能实现
直接上代码吧,
新建xmodem_crc.py,
# -*- coding: utf-8 -*-
"""
XMODEM-CRC(128 字节分组)发送端实现(基于 pyserial)。
主要用于:串口上向接收端发送 XMODEM-CRC 文件数据,并支持:
1) 等待接收端握手(接收端发出 CRC 模式请求 'C')
2) 每块数据 ACK/NAK 重试
3) 发送过程可取消(threading.Event)
4) 将握手前后的“非协议字节”作为设备日志回调展示(on_rx_log)
"""
from __future__ import annotations
import time
from typing import BinaryIO, Callable, Optional
SOH = 0x01
EOT = 0x04
ACK = 0x06
NAK = 0x15
CAN = 0x18
CRC_READY = 0x43 # 'C' — receiver requests CRC mode
BLOCK_SIZE = 128
PAD_BYTE = 0x1A
class XmodemSendError(Exception):
"""传输失败或被取消时抛出的异常。"""
def send_rx_command(
stream,
address: str,
cancel_event=None,
line_ending: bytes = b"\r\n",
) -> None:
"""
向设备发送启动接收指令:``rx <address>``(ASCII)。
该函数只负责“发命令”,握手(等待 'C')和后续 XMODEM-CRC 发送由调用方完成。
Args:
stream: pyserial 的 Serial 对象(提供 write/flush)。
address: 烧录地址字符串(例如 ``0x20000000``)。
cancel_event: 取消事件(threading.Event),用于中途退出。
line_ending: 行结束符,默认 ``\\r\\n``。
Returns:
None
"""
if cancel_event is not None and cancel_event.is_set():
raise XmodemSendError("已取消")
addr = address.strip()
if not addr:
raise XmodemSendError("烧录地址为空")
cmd = ("rx " + addr).encode("utf-8") + line_ending
stream.write(cmd)
stream.flush()
def crc16_xmodem(data: bytes) -> int:
"""
计算 XMODEM-CRC 使用的 CRC-16。
- 多项式:0x1021
- 初始值:0
Args:
data: 需要计算 CRC 的数据。
Returns:
CRC-16(0~0xFFFF)
"""
crc = 0
for byte in data:
crc ^= byte << 8
for _ in range(8):
if crc & 0x8000:
crc = ((crc << 1) ^ 0x1021) & 0xFFFF
else:
crc = (crc << 1) & 0xFFFF
return crc
def _read_protocol_byte(
stream,
timeout_s: float,
cancel_event=None,
on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> Optional[int]:
"""
在超时范围内持续读取串口字节,直到读到协议应答之一:
- ACK / NAK / CAN
如果读到的不是协议应答字节,则按“设备日志/回显”处理:
- 若设置了 ``on_rx_log``:回调传递该字节
- 否则丢弃
Args:
stream: pyserial Serial。
timeout_s: 等待超时时间(秒)。
cancel_event: 取消事件。
on_rx_log: 非协议字节回调(bytes,单字节)。
Returns:
ACK / NAK / CAN 的字节值;或超时返回 None。
"""
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if cancel_event is not None and cancel_event.is_set():
raise XmodemSendError("已取消")
n = stream.in_waiting
if n:
b = stream.read(1)
if not b:
break
x = b[0]
if x in (ACK, NAK, CAN):
return x
if on_rx_log:
on_rx_log(bytes([x]))
else:
time.sleep(0.001)
return None
def wait_for_crc_ready(
stream,
timeout_s: float = 120.0,
cancel_event=None,
on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
"""
等待接收端进入 XMODEM-CRC 接收准备状态。
接收端通常会周期性发出 ASCII ``'C'``(0x43)请求“CRC 模式”。
但某些设备在握手之前会输出包含字母 ``C`` 的文本(例如 ``CRC``),
如果简单“看到 0x43 就认为就绪”,可能会误触发导致后续块发送卡住。
本函数采用更稳健的判定策略:
- 若连续收到 ``CC``(两个 0x43 且相邻),直接认为就绪。
- 若先收到单个 ``C``,在约 200ms 宽限期内:
- 若再次收到 ``C``:认为就绪
- 若收到其他字节:把 ``C`` + 该字节作为设备日志回调,并继续寻找下一次握手
- 若宽限期内无更多数据:把单个 ``C`` 视为就绪(兼容“只发送一个 C”的实现)
"""
deadline = time.monotonic() + timeout_s
single_c_grace_s = 0.22
while time.monotonic() < deadline:
if cancel_event is not None and cancel_event.is_set():
raise XmodemSendError("已取消")
n = stream.in_waiting
if not n:
time.sleep(0.01)
continue
raw = stream.read(1)
if not raw:
continue
b0 = raw[0]
if b0 != CRC_READY:
if on_rx_log:
on_rx_log(raw)
continue
# 读到第一个 'C' 后,需要区分:它是握手同步字节,还是日志文本中的字母 C
inner_deadline = time.monotonic() + single_c_grace_s
while time.monotonic() < inner_deadline:
if cancel_event is not None and cancel_event.is_set():
raise XmodemSendError("已取消")
if stream.in_waiting:
b1 = stream.read(1)[0]
if b1 == CRC_READY:
return
if on_rx_log:
on_rx_log(bytes([CRC_READY, b1]))
break
time.sleep(0.002)
else:
# No second byte within grace period — single C is the sync
return
raise XmodemSendError("等待接收端发送 'C' (CRC 就绪) 超时")
def _send_block(
stream,
seq_byte: int,
data128: bytes,
max_retries: int = 20,
cancel_event=None,
on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
"""
发送单个 XMODEM-CRC 数据块(128 字节)。
Args:
stream: pyserial Serial。
seq_byte: 块序号(0~255 的低 8 位)。
data128: 必须长度为 128 的分组数据。
max_retries: 单块最大重试次数。
cancel_event: 取消事件。
on_rx_log: 非协议应答字节回调(用于显示设备日志)。
Returns:
None(成功发送后返回;失败抛异常)
"""
if len(data128) != BLOCK_SIZE:
raise ValueError("block must be 128 bytes")
seq = seq_byte & 0xFF
c = crc16_xmodem(data128)
pkt = bytes([SOH, seq, (~seq) & 0xFF]) + data128 + bytes([c >> 8, c & 0xFF])
for _ in range(max_retries):
if cancel_event is not None and cancel_event.is_set():
raise XmodemSendError("已取消")
stream.write(pkt)
stream.flush()
r = _read_protocol_byte(stream, 3.0, cancel_event, on_rx_log)
if r == ACK:
return
if r == CAN:
raise XmodemSendError("接收端取消 (CAN)")
if r == NAK:
continue
raise XmodemSendError(f"数据块 seq=0x{seq:02x} 发送失败(NAK/超时次数过多)")
def _send_eot(
stream,
max_retries: int = 10,
cancel_event=None,
on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
"""
发送结束符 EOT,并等待 ACK。
Args:
stream: pyserial Serial。
max_retries: EOT 最大重试次数。
cancel_event: 取消事件。
on_rx_log: 非协议应答字节回调(用于显示设备日志)。
Returns:
None(成功后返回;失败抛异常)
"""
for _ in range(max_retries):
if cancel_event is not None and cancel_event.is_set():
raise XmodemSendError("已取消")
stream.write(bytes([EOT]))
stream.flush()
r = _read_protocol_byte(stream, 3.0, cancel_event, on_rx_log)
if r == ACK:
return
if r == CAN:
raise XmodemSendError("接收端取消 (CAN)")
raise XmodemSendError("结束 EOT 未收到 ACK")
def send_file(
stream,
file_obj: BinaryIO,
file_size: int,
on_progress: Optional[Callable[[int, int], None]] = None,
cancel_event=None,
wait_crc: bool = True,
on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
"""
使用 XMODEM-CRC 协议发送文件。
协议行为:
1) (可选)等待接收端握手(等待 'C')
2) 以 128 字节为分组发送每块,等待 ACK/NAK
3) 所有数据发送完成后发送 EOT 并等待 ACK
Args:
stream: pyserial Serial 对象(用于读 ACK/NAK/CAN 与写数据)。
file_obj: 已打开的二进制文件对象(必须从文件开头开始读)。
file_size: 文件总字节数(用于进度与不足 128 字节的 padding)。
on_progress: 进度回调:``on_progress(已发送字节数, file_size)``。
cancel_event: 取消事件(threading.Event)。
wait_crc: 是否在发送第一块前等待接收端的握手 'C'。
on_rx_log: 可选的“非协议字节”回调(设备回显/日志),用于 UI 显示。
Returns:
None
"""
if wait_crc:
wait_for_crc_ready(stream, cancel_event=cancel_event, on_rx_log=on_rx_log)
block_no = 1
sent = 0
while True:
if cancel_event is not None and cancel_event.is_set():
raise XmodemSendError("已取消")
raw = file_obj.read(BLOCK_SIZE)
if not raw:
break
delivered = len(raw)
if delivered < BLOCK_SIZE:
chunk = raw + bytes([PAD_BYTE]) * (BLOCK_SIZE - delivered)
else:
chunk = raw
seq_byte = block_no & 0xFF
_send_block(stream, seq_byte, chunk, cancel_event=cancel_event, on_rx_log=on_rx_log)
sent += delivered
if sent > file_size:
sent = file_size
if on_progress:
on_progress(sent, file_size)
block_no += 1
_send_eot(stream, cancel_event=cancel_event, on_rx_log=on_rx_log)
if on_progress:
on_progress(file_size, file_size)
def send_file_path(
stream,
path: str,
on_progress: Optional[Callable[[int, int], None]] = None,
cancel_event=None,
wait_crc: bool = True,
on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
"""
通过文件路径发送文件(内部打开文件并调用 ``send_file``)。
Args:
stream: pyserial Serial。
path: 待发送文件路径。
on_progress: 进度回调。
cancel_event: 取消事件。
wait_crc: 是否等待握手 'C'。
on_rx_log: 设备日志回调。
Returns:
None
"""
import os
size = os.path.getsize(path)
with open(path, "rb") as f:
send_file(stream, f, size, on_progress, cancel_event, wait_crc, on_rx_log)
# 传输结束后设备可能仍有一行日志
if on_rx_log:
time.sleep(0.02)
try:
n = stream.in_waiting
if n:
on_rx_log(stream.read(n))
except Exception:
pass
9.6 主代码实现
新建main.py,
# -*- coding: utf-8 -*-
"""
要你命3000 工具箱(当前实现:串口连接 + 多文件顺序发送 + XMODEM-CRC)。
功能要点:
1) 串口连接/断开(pyserial)
2) 通过 UI 选择多个文件与对应烧录地址,并按顺序发送
3) 每个文件发送前先下发 ``rx <address>``,再等待接收端握手('C')后进行 XMODEM-CRC
4) 在 UI 的 ``textBrowser`` 中显示串口接收内容(rx 日志)
5) 支持发送过程取消
"""
import os
import threading
import time
import serial
from serial.tools import list_ports
from PySide6.QtCore import QFileInfo, QSettings, QThread, QTimer, Signal, Qt
from PySide6.QtGui import QFont, QTextCursor
from PySide6.QtWidgets import (
QApplication,
QFileDialog,
QMainWindow,
QMessageBox,
)
from ui_main import Ui_Form
from xmodem_crc import XmodemSendError, send_file_path, send_rx_command
def _format_size(n: int) -> str:
if n < 1024:
return f"{n} B"
if n < 1024 * 1024:
return f"{n / 1024:.1f} KB"
return f"{n / (1024 * 1024):.2f} MB"
_RX_TEXT_MAX_CHARS = 400_000
_RX_TEXT_KEEP_CHARS = 300_000
class XmodemBatchSendThread(QThread):
"""
多文件顺序发送线程。
发送策略:
- 对于每个任务(文件路径 + 地址):
1) 发送 ``rx <地址>`` 给设备
2) 等待设备发起 XMODEM-CRC 握手(接收端发出 'C')
3) 发送文件数据(XMODEM-CRC 128 字节分组)
线程内会把:
- 进度:``progress(sent_bytes, total_bytes)``
- 当前任务状态:``status(index, total, filename, addr)``
- 发送过程读到的“非协议字节”(设备日志):``rx_log(bytes)``
- 完成/失败:``finished(ok, message)``
"""
progress = Signal(int, int)
status = Signal(int, int, str, str)
rx_log = Signal(bytes)
finished = Signal(bool, str)
def __init__(
self,
ser: serial.Serial,
jobs: list[tuple[str, str]],
cancel: threading.Event,
inter_file_delay_s: float,
):
super().__init__()
self._ser = ser
self._jobs = jobs
self._cancel = cancel
self._inter_file_delay_s = float(inter_file_delay_s)
def run(self):
try:
total = sum(os.path.getsize(p) for p, _ in self._jobs)
sent_base = 0
for i, (path, addr) in enumerate(self._jobs):
if self._cancel.is_set():
raise XmodemSendError("已取消")
self.status.emit(i + 1, len(self._jobs), os.path.basename(path), addr)
# 进度回调:把“当前文件内已发送字节”映射到“整体任务已发送字节”
def on_prog(s: int, t: int, base=sent_base):
self.progress.emit(base + s, total)
# 设备回显/日志字节:由协议层(握手/块应答等待)回传
def on_rx(data: bytes):
if data:
self.rx_log.emit(data)
# 多文件之间:
# 1) 给设备足够时间完成上一轮 XMODEM 接收/初始化
# 2) 读取尾部日志(避免下一轮 rx 覆盖掉上一轮输出)
if i > 0:
time.sleep(self._inter_file_delay_s)
try:
n = self._ser.in_waiting
if n:
on_rx(self._ser.read(n))
except Exception:
pass
try:
self._ser.reset_input_buffer()
except Exception:
pass
# 下发启动接收命令(进入对应地址接收态)
send_rx_command(self._ser, addr, self._cancel)
# 发送当前文件:内部会等待接收端发 'C'(CRC 握手)
send_file_path(
self._ser,
path,
on_progress=on_prog,
cancel_event=self._cancel,
wait_crc=True,
on_rx_log=on_rx,
)
sent_base += os.path.getsize(path)
self.finished.emit(True, "全部传输完成")
except XmodemSendError as e:
self.finished.emit(False, str(e))
except Exception as e:
self.finished.emit(False, f"{type(e).__name__}: {e}")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("要你命3000")
self.ui = Ui_Form()
self.ui.setupUi(self)
self._serial: serial.Serial | None = None
self._row_paths: list[str | None] = [None, None, None, None]
self._cancel = threading.Event()
self._send_thread: XmodemBatchSendThread | None = None
self._batch_info: tuple[int, int, str, str] | None = None
self._inter_file_delay_s: float = 4.0
# 参数/状态记忆(QSettings)
# 记忆内容:
# - 串口:端口、波特率
# - 批量任务:四行的“文件路径 / 地址 / 勾选状态”
# - 批量发送:文件间隔(秒)
self._settings = QSettings("KillYou3000", "KillYou3000")
# 四行任务槽位:每行包含“选择文件按钮 / 地址输入框 / 勾选框”
self._file_slots: list[tuple] = [
(self.ui.toolButton, self.ui.lineEdit, self.ui.checkBox),
(self.ui.toolButton_2, self.ui.lineEdit_2, self.ui.checkBox_2),
(self.ui.toolButton_3, self.ui.lineEdit_3, self.ui.checkBox_3),
(self.ui.toolButton_4, self.ui.lineEdit_4, self.ui.checkBox_4),
]
self._rx_timer = QTimer(self)
self._rx_timer.setInterval(30)
self._rx_timer.timeout.connect(self._poll_serial_rx)
self.ui.textBrowser.setOpenExternalLinks(False)
self.ui.textBrowser.setPlaceholderText("连接串口后在此显示接收数据(UTF-8,非法字节显示为 �)")
_mono = QFont("Consolas", 9)
if not _mono.exactMatch():
_mono = QFont("Courier New", 9)
self.ui.textBrowser.setFont(_mono)
self.ui.progressBar.setRange(0, 100)
self.ui.progressBar.setValue(0)
self.ui.pushButton_2.setEnabled(False)
self.ui.pushButton_refresh_ports.clicked.connect(self.refresh_ports)
self.ui.pushButton_serial_connect.clicked.connect(self.toggle_serial)
for i, (btn, le, cb) in enumerate(self._file_slots):
btn.clicked.connect(lambda _=False, r=i: self.pick_file_row(r))
cb.stateChanged.connect(lambda _=None: self._update_send_enabled())
le.textChanged.connect(lambda _=None: self._update_send_enabled())
self.ui.pushButton.clicked.connect(self.start_send)
self.ui.pushButton_2.clicked.connect(self.stop_send)
self._load_settings()
self.refresh_ports()
self._apply_serial_settings()
self._apply_batch_settings_to_ui()
self._update_send_enabled()
def _rx_monitor_active(self) -> bool:
return (
self._serial is not None
and self._serial.is_open
and not (self._send_thread and self._send_thread.isRunning())
)
def _poll_serial_rx(self):
if not self._rx_monitor_active():
return
try:
n = self._serial.in_waiting
if not n:
return
data = self._serial.read(n)
except Exception:
return
text = data.decode("utf-8", errors="replace")
self._append_rx_text(text)
def _append_rx_from_worker(self, data: bytes):
"""
接收发送线程回传的“设备日志/回显”并显示到终端窗口。
该显示路径与主线程定时器轮询互斥使用,避免同一时段重复读串口。
"""
if not data:
return
self._append_rx_text(data.decode("utf-8", errors="replace"))
def _append_rx_text(self, text: str):
cursor = self.ui.textBrowser.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End)
cursor.insertText(text)
self.ui.textBrowser.setTextCursor(cursor)
self.ui.textBrowser.ensureCursorVisible()
plain = self.ui.textBrowser.toPlainText()
if len(plain) > _RX_TEXT_MAX_CHARS:
self.ui.textBrowser.setPlainText(plain[-_RX_TEXT_KEEP_CHARS:])
cursor = self.ui.textBrowser.textCursor()
cursor.movePosition(QTextCursor.MoveOperation.End)
self.ui.textBrowser.setTextCursor(cursor)
def refresh_ports(self):
self.ui.comboBox.clear()
for p in list_ports.comports():
self.ui.comboBox.addItem(p.device, p.description)
if self.ui.comboBox.count() == 0:
self.ui.comboBox.addItem("(无串口)")
self.statusBar().showMessage(f"已刷新,共 {self.ui.comboBox.count()} 个端口", 3000)
def toggle_serial(self):
if self._send_thread and self._send_thread.isRunning():
QMessageBox.information(self, "提示", "请先停止传输再断开串口。")
return
if self._serial is not None and self._serial.is_open:
self._close_serial()
else:
self._open_serial()
def _open_serial(self):
port = self.ui.comboBox.currentText()
if not port or port.startswith("("):
QMessageBox.warning(self, "串口", "请选择有效串口。")
return
try:
baud = int(self.ui.comboBox_2.currentText())
except ValueError:
QMessageBox.warning(self, "串口", "波特率无效。")
return
try:
self._serial = serial.Serial(
port,
baud,
timeout=0.2,
write_timeout=5.0,
)
except serial.SerialException as e:
QMessageBox.critical(self, "串口", f"打开失败:\n{e}")
self._serial = None
return
self.ui.pushButton_serial_connect.setText("断开")
self.statusBar().showMessage(f"已连接 {port} @ {baud}", 5000)
self._rx_timer.start()
self._update_send_enabled()
def _close_serial(self):
self._rx_timer.stop()
if self._serial is not None:
try:
if self._serial.is_open:
self._serial.close()
except Exception:
pass
self._serial = None
self.ui.pushButton_serial_connect.setText("连接")
self.statusBar().showMessage("串口已断开", 3000)
self._update_send_enabled()
def pick_file_row(self, row: int):
"""为指定行选择文件,并将文件名显示到对应的按钮上。"""
path, _ = QFileDialog.getOpenFileName(self, "选择要发送的文件", "", "所有文件 (*.*)")
if path:
self._row_paths[row] = path
btn = self._file_slots[row][0]
btn.setText(QFileInfo(path).fileName())
btn.setToolTip(path)
self.statusBar().showMessage(path, 5000)
self._update_send_enabled()
def _collect_checked_jobs(self) -> list[tuple[str, str]] | None:
"""
收集所有已勾选任务。
Returns:
若有效任务存在:返回 ``[(path, addr), ...]``;
若缺少文件或地址:弹窗提示并返回 None。
"""
jobs: list[tuple[str, str]] = []
for i, (_, le, cb) in enumerate(self._file_slots):
if not cb.isChecked():
continue
path = self._row_paths[i]
addr = le.text().strip()
if not path:
QMessageBox.warning(self, "传输", f"第 {i + 1} 行已勾选,请先选择文件。")
return None
if not addr:
QMessageBox.warning(self, "传输", f"第 {i + 1} 行已勾选,请填写烧录地址。")
return None
jobs.append((path, addr))
return jobs
def _update_send_enabled(self):
"""根据勾选状态、已选文件/地址情况,决定“发送”按钮是否可用。"""
ok = False
if self._serial is not None and self._serial.is_open:
for i, (_, le, cb) in enumerate(self._file_slots):
if cb.isChecked() and self._row_paths[i] and le.text().strip():
ok = True
break
ok = ok and not (self._send_thread and self._send_thread.isRunning())
self.ui.pushButton.setEnabled(ok)
def start_send(self):
jobs = self._collect_checked_jobs()
if not jobs:
if self._serial is None or not self._serial.is_open:
return
QMessageBox.warning(self, "传输", "请至少勾选一行,并选择文件、填写地址。")
return
if not self._serial or not self._serial.is_open:
return
self._cancel.clear()
self._batch_info = None
self.ui.progressBar.setValue(0)
self.ui.label_6.setText("")
self.ui.pushButton.setEnabled(False)
self.ui.pushButton_2.setEnabled(True)
self.ui.pushButton_serial_connect.setEnabled(False)
self.ui.pushButton_refresh_ports.setEnabled(False)
self._send_thread = XmodemBatchSendThread(
self._serial,
jobs,
self._cancel,
inter_file_delay_s=self._inter_file_delay_s,
)
self._send_thread.progress.connect(self._on_send_progress, Qt.ConnectionType.QueuedConnection)
self._send_thread.status.connect(self._on_batch_status, Qt.ConnectionType.QueuedConnection)
self._send_thread.rx_log.connect(self._append_rx_from_worker, Qt.ConnectionType.QueuedConnection)
self._send_thread.finished.connect(self._on_send_finished, Qt.ConnectionType.QueuedConnection)
self._rx_timer.stop()
self._send_thread.start()
self.statusBar().showMessage("传输中:先发 rx,再等待 'C' 与 Xmodem…", 0)
def _on_batch_status(self, idx: int, total: int, name: str, addr: str):
self._batch_info = (idx, total, name, addr)
self.statusBar().showMessage(f"{idx}/{total} {addr} {name}", 0)
def _on_send_progress(self, sent: int, total: int):
if total > 0:
pct = int(min(100, round(100.0 * sent / total)))
self.ui.progressBar.setValue(pct)
if self._batch_info:
i, n, name, addr = self._batch_info
self.ui.label_6.setText(
f"[{i}/{n}] {addr} {name} {_format_size(sent)} / {_format_size(total)} ({pct}%)"
)
else:
self.ui.label_6.setText(f"{_format_size(sent)} / {_format_size(total)} ({pct}%)")
def _on_send_finished(self, ok: bool, message: str):
self._batch_info = None
self.ui.pushButton_2.setEnabled(False)
self.ui.pushButton_serial_connect.setEnabled(True)
self.ui.pushButton_refresh_ports.setEnabled(True)
if self._serial is not None and self._serial.is_open:
self._rx_timer.start()
self._update_send_enabled()
self.statusBar().showMessage(message, 8000)
if not ok and message != "已取消":
QMessageBox.warning(self, "传输", message)
def stop_send(self):
self._cancel.set()
self.statusBar().showMessage("正在停止…", 3000)
def closeEvent(self, event):
self._rx_timer.stop()
self._cancel.set()
self._save_settings()
if self._send_thread and self._send_thread.isRunning():
self._send_thread.wait(8000)
if self._serial is not None and self._serial.is_open:
try:
self._serial.close()
except Exception:
pass
event.accept()
def _load_settings(self) -> None:
"""从 QSettings 恢复参数到内部变量(UI 刷新前使用)。"""
try:
self._inter_file_delay_s = float(
self._settings.value("batch/inter_file_delay_s", 4.0)
)
except Exception:
self._inter_file_delay_s = 4.0
self._last_serial_port = str(self._settings.value("serial/port", ""))
self._last_serial_baud = str(self._settings.value("serial/baud", ""))
# 批量任务槽位状态
self._batch_slot_paths: list[str | None] = [None, None, None, None]
self._batch_slot_addrs: list[str] = ["", "", "", ""]
self._batch_slot_enabled: list[bool] = [False, False, False, False]
for i in range(4):
key = f"batch/slot_{i}"
path = self._settings.value(f"{key}/path", "")
self._batch_slot_paths[i] = str(path) if path else None
self._batch_slot_addrs[i] = str(self._settings.value(f"{key}/addr", ""))
# QSettings value 可直接用 type=bool 读取
enabled = self._settings.value(f"{key}/enabled", False, type=bool)
self._batch_slot_enabled[i] = bool(enabled)
def _apply_serial_settings(self) -> None:
"""在端口列表刷新完成后,把记忆的串口参数应用到 UI。"""
# 端口:只有在当前系统存在时才设置
if self._last_serial_port:
for idx in range(self.ui.comboBox.count()):
if self.ui.comboBox.itemText(idx) == self._last_serial_port:
self.ui.comboBox.setCurrentIndex(idx)
break
# 波特率:直接按字符串匹配 comboBox_2 文本
if self._last_serial_baud:
for idx in range(self.ui.comboBox_2.count()):
if self.ui.comboBox_2.itemText(idx) == self._last_serial_baud:
self.ui.comboBox_2.setCurrentIndex(idx)
break
def _apply_batch_settings_to_ui(self) -> None:
"""把记忆的批量槽位信息应用到 UI 组件。"""
for i, (btn, le, cb) in enumerate(self._file_slots):
path = self._batch_slot_paths[i]
if path:
self._row_paths[i] = path
btn.setText(QFileInfo(path).fileName())
btn.setToolTip(path)
else:
self._row_paths[i] = None
btn.setText("...")
btn.setToolTip("")
le.setText(self._batch_slot_addrs[i] or "")
cb.setChecked(self._batch_slot_enabled[i])
def _save_settings(self) -> None:
"""把当前参数保存到 QSettings。"""
if self.ui.comboBox.currentText():
self._settings.setValue("serial/port", self.ui.comboBox.currentText())
if self.ui.comboBox_2.currentText():
self._settings.setValue("serial/baud", self.ui.comboBox_2.currentText())
self._settings.setValue("batch/inter_file_delay_s", self._inter_file_delay_s)
for i, (_, le, cb) in enumerate(self._file_slots):
key = f"batch/slot_{i}"
path = self._row_paths[i] or ""
self._settings.setValue(f"{key}/path", path)
self._settings.setValue(f"{key}/addr", le.text().strip())
self._settings.setValue(f"{key}/enabled", cb.isChecked())
if __name__ == "__main__":
app = QApplication([])
window = MainWindow()
window.show()
app.exec()
9.7 测试

更多推荐



所有评论(0)