⚠️上位机代码仓库(要你命3000):https://gitee.com/simonchina_carel_li/killyou3000.git
Tag: 0330

⚠️裸机仓库:https://gitee.com/simonchina_carel_li/mini2440-bare-metal.git
Tag: 21-xmodem2nand

1. 项目简介

从这节开始,我们要以项目的方式去完成剩下的裸机编程章节。

1.1 常见的几种烧录方式

方式 说明
裸机烧录 利用芯片自带的 JTAG 调试接口,通过专用的仿真器(如 J-Link、H-JTAG)将引导程序直接写入 NOR Flash 或 NAND Flash
这种方式一般烧录Bootloader或裸机固件
Bootloader 烧录
(In-System Programming, ISP)
- USB 接口烧录(最高效)
- 串口烧录(速度较慢,但几乎所有嵌入式设备都具备串口)
- 网络烧录(TFTP/NFS)(如果硬件带有以太网控制器)
脱机烧录/离线烧录
(Offline Programming)
- 芯片预烧录:使用专门的烧录器/编程器,在 Flash 芯片焊接到电路板之前,先将固件批量灌入芯片中。这种方式适合极大规模生产
- SD卡/外部存储卡烧录:需要支持从 SD 卡自动引导并运行自动烧录脚本

我们此节实现的烧录方式,严格来说不属于上述任意一种,

我们实现在裸机固件上支持串口烧录,将文件写入Nand(类似于ISP+串口烧录)

1.2 配套上位机

为了配合设备端烧录固件,我们还会实现一个带界面的上位机软件,并支持我们的裸机测串口烧录;

在后面,随着我们实现其他的烧录方式,也会同时升级扩展上位机软件的功能!

上位机定位为一个工具箱,那就叫“要你命3000”好啦!

在这里插入图片描述

2. 方案设计

子项 方案/说明
烧录目标 Nand
传输协议 串口XModem
上位机方案 - 基于Python,PySide6 UI框架
- 支持多文件不同地址发送
传输约定/步骤 1. 上位机发送命令rx <地址>通知设备开始接收
2. 设备周期性发送’C’准备接收
3. 使用XModem-CRC协议传输文件
设备端设计之应用 - 开发cmd-cli程序,以CMD终端的方式接受控制
- 增加两个CMD命令实现:
1. rx <地址>传输文件至Nand的指定地址处(先保存至RAM,然后写入Nand)
2. nandread <地址> <长度>从Nand的指定地址读取指定长度的数据打印出来(用于数据验证)
设备端设计之Nand 原有的Nand功能需要增加任意地址(不要求对齐)的字节数据读取
设备端设计之xmodem传输 需要增加xmodem传输功能,使用CRC协议,128字节块传输;保存至SDRAM(堆)中

3. 增加Nand功能

打开common/nand.c,增加函数

/// @brief 读取指定字节地址的数据(带ECC校验)
/// @param byte_addr 线性字节地址(从 0 起,不用对齐到页)
/// @param buf 数据目标
/// @param size 字节数
/// @return 0 成功,-1 失败(读 ECC 不可纠正)
int nand_read_ecc_bytes(unsigned int byte_addr, unsigned char *buf, unsigned int size)
{
    static unsigned char page_buf[NAND_PAGE_BYTES];

    if (size == 0)
        return 0;
    if (buf == NULL)
        return -1;

    while (size > 0) {
        unsigned int page = byte_addr / NAND_PAGE_BYTES;
        unsigned int off = byte_addr % NAND_PAGE_BYTES;
        unsigned int chunk = NAND_PAGE_BYTES - off;
        if (chunk > size)
            chunk = size;

        if (off == 0 && chunk == NAND_PAGE_BYTES) {
            if (nand_read_ecc(page, buf) != 0)
                return -1;
        } else {
            if (nand_read_ecc(page, page_buf) != 0)
                return -1;
            memcpy(buf, page_buf + off, chunk);
        }

        byte_addr += chunk;
        buf += chunk;
        size -= chunk;
    }
    return 0;
}

我们原有的nand_read_ecc函数,是地址页对齐的页大小读取函数,

新增的nand_read_ecc_bytes是对前者的封装,以实现任意地址任意长度的读取

4. XModem复盘

Xmodem 采用“停-等”(Stop-and-Wait)协议:发送方发送一个数据包,必须等待接收方的确认信号(ACK/NAK)后,才能发送下一个包

4.1 数据包结构

标准 Xmodem 数据包长度固定为 132 字节,格式如下:

字段 长度 说明
SOH 1 字节 控制字符,固定为 0x01(Start Of Header)
包序号 1 字节 0x01 开始,每发一包自增 1,超过 255 后回绕到 0
包序号反码 1 字节 包序号的补码(255 - 包序号),用于校验包序号准确性
数据区 128 字节 实际传输的内容。若文件不足 128 字节,通常填 0x1A (CTRL+Z)
校验和 1 字节 128 字节数据的累加和(Checksum)取模 256

4.2 交互控制字符

Xmodem 依赖几个关键的 ASCII 字符来控制流程:

  • SOH (0x01):数据包起始。

  • ACK (0x06):接收正确,告诉发送方发下一包。

  • NAK (0x15):接收失败或校验错误,请求重发当前包。

  • EOT (0x04):发送结束,发送方告知接收方文件已传完。

  • CAN (0x18):取消传输。

  • C (0x43):用于 Xmodem-CRC 模式,接收方发送 ‘C’ 代替 NAK 来发起请求

4.3 传输流程

步骤 说明
0. 启动(发送侧) 发送方向设备终端发送命令rx 0xXXXXXXXX
1. 启动 接收方每隔几秒发送一个 NAK/‘C’(这里用CRC协议,所以是后者)
告知发送方“我准备好了”
2. 发送 发送方收到 NAK 后,开始发送第 1 个包(SOH + 01 + FE + Data + Checksum)
3. 确认 接收方检查校验和及包序号。若正确则回 ACK,错误则回 NAK
4. 循环 发送方收到 ACK 后发送下一包;收到 NAK 则重发上一包
5. 结束 发送方发完数据后发送 EOT,接收方回 ACK,传输正式关闭

4.4 变种杂谈

  • Xmodem-1K:将数据区扩展为 1024 字节(SOH 变为 STX 0x02),显著提高了大文件传输效率

  • Ymodem / Zmodem:在 Xmodem 基础上支持了文件名传输、多文件传输和断点续传

5. XModem实现

新建common/xmodem-crc.c,

实现接收文件到SDRAM(malloc堆的方式)的功能,

#include "s3c2440a.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>  // 为了使用 malloc 和 free

// --- Xmodem 控制字符定义 ---
#define SOH  0x01
#define EOT  0x04
#define ACK  0x06
#define NAK  0x15
#define CAN  0x18
/* Xmodem-CRC:接收端用该字节发起同步,请求发送方使用 CRC-16(而非校验和) */
#define CRC_REQ ((uint8_t)'C') /* 0x43 */

#define PACKET_SIZE 128
/* SOH(1)+blk(1)+~blk(1)+data(128)+CRC(2) — 下标 0..132 共 133 字节 */
#define XMODEM_SOH_FRAME_BYTES (1 + 1 + 1 + PACKET_SIZE + 2)
#define MAX_RETRYS  10

static void uart_putchar(uint8_t c)
{
    uart0_putc(c);
}
static int uart_getchar_timeout(uint32_t timeout_ms)
{
    return uart0_getc_timeout(timeout_ms);
}

// --- CRC16 计算函数 (CCITT标准) ---
static uint16_t crc16_ccitt(const uint8_t *data, uint16_t length) {
    uint16_t crc = 0;
    while (length--) {
        crc ^= (uint16_t)(*data++) << 8;
        for (int i = 0; i < 8; ++i) {
            if (crc & 0x8000)
                crc = (crc << 1) ^ 0x1021;
            else
                crc <<= 1;
        }
    }
    return crc;
}

// --- 清空串口接收缓冲区 ---
static void flush_uart(void) {
    // while (uart_getchar_timeout(100) >= 0);
}

// --- Xmodem 接收主函数 ---
// 返回值:>0 表示成功接收的文件大小,<=0 表示失败
// 接收到预先分配好的 buffer 中,并保护不超出 max_limit
int32_t xmodem_receive_to_buffer(uint8_t *buffer, uint32_t max_limit) 
{
    uint8_t *dest_ptr = buffer;
    uint32_t total_bytes = 0;
    uint8_t packet_buf[XMODEM_SOH_FRAME_BYTES];
    uint8_t expected_seq = 1;
    int retry_count = 0;
    int state = 0; 

    flush_uart();

    /*
     * 同步阶段 (state==0):接收端周期性向 UART **发送 0x43 ('C')**,
     * 对端 Xmodem-CRC 发送程序应在收到 C 后开始发 SOH 帧。
     * 进入首包后 state=1,不再发 C,只发 ACK/NAK。
     */
    while (retry_count < MAX_RETRYS) {
        if (state == 0) {
            uart_putchar(CRC_REQ); /* 即 0x43,肉眼/逻辑分析仪可见 */
        }
        // printf("---1\r\n");
        int c = uart_getchar_timeout(2000); 
        // printf("---2\r\n");
        if (c < 0) { retry_count++; continue; }

        if (c == CAN) { 
            printf("\r\n[Xmodem] Canceled by sender.\r\n"); 
            return -1; 
        }

        if (c == EOT) { 
            uart_putchar(ACK);
            return total_bytes; 
        }

        if (c == SOH) { 
            state = 1;  
            packet_buf[0] = c;
            
            int i;
            for (i = 1; i < 133; i++) {
                int data_byte = uart_getchar_timeout(1000);
                if (data_byte < 0) break;
                packet_buf[i] = (uint8_t)data_byte;
            }

            if (i < 133) { flush_uart(); uart_putchar(NAK); retry_count++; continue; }

            uint8_t seq = packet_buf[1];
            if (seq != (uint8_t)(~packet_buf[2])) { flush_uart(); uart_putchar(NAK); retry_count++; continue; }

            uint16_t calc_crc = crc16_ccitt(&packet_buf[3], PACKET_SIZE);
            uint16_t recv_crc = (packet_buf[131] << 8) | packet_buf[132];
            if (calc_crc != recv_crc) { flush_uart(); uart_putchar(NAK); retry_count++; continue; }

            if (seq == expected_seq) {
                // 【新增越界保护】检查是否超过了 malloc 的大小
                if (total_bytes + PACKET_SIZE > max_limit) {
                    uart_putchar(CAN);
                    printf("\r\n[Xmodem] Error: File exceeds allocated buffer size (%lu bytes).\r\n", max_limit);
                    return -1;
                }
                
                // 拷贝数据到动态分配的 buffer 中
                memcpy(dest_ptr, &packet_buf[3], PACKET_SIZE);
                dest_ptr += PACKET_SIZE;
                total_bytes += PACKET_SIZE;
                expected_seq++;
            } else if (seq != (uint8_t)(expected_seq - 1)) {
                uart_putchar(CAN);
                printf("\r\n[Xmodem] Sequence error. Aborted.\r\n");
                return -1;
            }

            uart_putchar(ACK);
            retry_count = 0; 
        }
    }
    printf("\r\n[Xmodem] Timeout or too many errors.\r\n");
    return -1;
}


6. 标准库适配(_read

因为后面我们会使用到fgets这样的标准库函数,我们之前对标准库进行适配时,预留了_read函数的存根,

int _read(int file, char *ptr, int len) { ... }

当使用fgets时,最终会调用此函数,并且file传参为0(stdin=0),

因为我们需要实现此函数,

打开common/newlib_stubs.c

/*
 * 串口终端按回车常只发 CR(\\r);fgets/gets 等要等 LF(\\n)才结束一行。
 * 将 \\r 当作 \\n 交给 libc;若终端发 CRLF,则丢弃紧跟在 CR 后面的 LF。
 */
static int stdin_after_cr;

int _read(int file, char *ptr, int len)
{
    if (file != 0 || len <= 0) {
        return -1;
    }

    for (;;) {
        unsigned char c = (unsigned char)uart0_getc();

        if (c == '\n' && stdin_after_cr) {
            stdin_after_cr = 0;
            continue;
        }

        if (c == '\r') {
            stdin_after_cr = 1;
            ptr[0] = '\n';
            return 1;
        }

        stdin_after_cr = 0;
        ptr[0] = (char)c;
        return 1;
    }
}

把标准输入(stdin)映射到 UART 接收,只支持读 file == 0(标准输入)

做了串口换行兼容:把 '\r' 转成 '\n',并用 stdin_after_cr 跳过紧随其后的 '\n'(处理 \r\n

这样上层看到的换行始终是单个 '\n',避免出现双换行

7. cmd-cli终端应用实现

7.1 Nand CLI功能

新建cmd-cli/nand-cli.c

通过对nand功能和上面XModem功能的调用,实现文件传输至Nand指定位置和nand数据读取的功能,

#include "s3c2440a.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>


// 定义单次接收允许的最大固件大小
// 如果你的内核或文件系统大于此值,请调大。受限于 S3C2440 的堆大小。
#define MAX_DOWNLOAD_SIZE (40 * 1024 * 1024)

extern int xmodem_receive_to_buffer(uint8_t *buffer, int max_size);

// --- 预留的 NAND 烧录接口 ---
// nand_addr: NAND 中的起始地址
// buffer: 接收到的文件数据
// file_size: 接收到的文件总大小(注意:Xmodem 可能会用 0x1A 填充尾部对齐 128 字节)
// 返回值:0表示成功,-1表示失败
static int nand_write_firmware(unsigned int nand_addr, const uint8_t *buffer, uint32_t file_size)
{
    const unsigned int block_bytes = NAND_PAGE_BYTES * NAND_PAGES_PER_BLOCK;

    if (buffer == NULL)
        return -1;

    /* 按块对齐长度;尾部补到整块,内容来自 buffer(未接收部分为未定义) */
    file_size = (file_size + block_bytes - 1) / block_bytes * block_bytes;

    unsigned int first_block = nand_addr / block_bytes;

    for (unsigned int i = 0; i < file_size; i += block_bytes) {
        unsigned int block = first_block + i / block_bytes;
        if (nand_erase(block) != 0)
            return -1;
        for (unsigned int j = 0; j < NAND_PAGES_PER_BLOCK; j++) {
            unsigned int page = block * NAND_PAGES_PER_BLOCK + j;
            if (nand_write_ecc(page, &buffer[i + j * NAND_PAGE_BYTES]) != 0)
                return -1;
        }
    }

    return 0;
}

// --- 命令行解析入口 ---
// 命令行工具解析到了 rx 命令,并将后续字符串传入
void do_rx_nand(const char *addr_str) 
{
    unsigned int nand_target_addr;
    
    // 1. 解析目标 NAND 地址
    if (sscanf(addr_str, "0x%x", (unsigned int *)&nand_target_addr) != 1) {
        printf("Error: Invalid format. Usage: rx 0xXXX (NAND Address)\r\n");
        printf("Change to: rx 0x0 (NAND Address)\r\n");
        nand_target_addr = 0;
    }

    // [重要提示] 根据K9F2G08 手册,这块 NAND 是按照 Block (128KB) 擦除的。
    // 建议在这里加一个地址对齐检查,警告用户
    if (nand_target_addr % (NAND_PAGE_BYTES * NAND_PAGES_PER_BLOCK) != 0) {
        printf("Warning: Target address 0x%08x is not %dKB Block aligned!\r\n", nand_target_addr, NAND_PAGE_BYTES * NAND_PAGES_PER_BLOCK / 1024);
        return;
    }

    // 2. 从堆中动态分配 SDRAM 缓存
    printf("Allocating %d MB buffer in SDRAM...\r\n", MAX_DOWNLOAD_SIZE / (1024*1024));
    uint8_t *rx_buffer = (uint8_t *)malloc(MAX_DOWNLOAD_SIZE);
    
    if (rx_buffer == NULL) {
        printf("Error: Malloc failed! Not enough heap memory.\r\n");
        return;
    }

    // 3. 开始接收文件到动态分配的 rx_buffer
    printf("Ready to receive via Xmodem. Dest NAND Addr: 0x%08x\r\n", nand_target_addr);
    printf("Please start Xmodem sender in your terminal...\r\n");

    int file_size = xmodem_receive_to_buffer(rx_buffer, MAX_DOWNLOAD_SIZE);

    // 4. 判断接收结果并烧录 NAND
    if (file_size > 0) {
        printf("Download success! File size: %d bytes. Starting NAND flashing...\r\n", file_size);
        
        // 【注意】如果你开启了 MMU 和 D-Cache,在把缓冲区交给 DMA 或 NAND 控制器前,
        // 建议在这里调用 Cache 清理函数,例如:
        // clean_dcache_range((uint32_t)rx_buffer, file_size);

        // 调用你预留的 NAND 烧录接口
        int ret = nand_write_firmware(nand_target_addr, rx_buffer, file_size);
        
        if (ret == 0) {
            printf("Success: Firmware flashed to NAND offset 0x%08x!\r\n", nand_target_addr);
        } else {
            printf("Error: NAND flashing failed!\r\n");
        }
    } else {
        printf("Firmware download failed, aborting NAND write.\r\n");
    }

    // 5. 释放堆内存,防止内存泄漏
    free(rx_buffer);
    printf("SDRAM buffer freed.\r\n");
}

// nand 读取
int do_nand_read(unsigned int addr, uint8_t *buf, unsigned int size)
{
    return nand_read_ecc_bytes(addr, buf, size);
}

7.2 CMD CLI入口程序

这里实现的功能不只是针对文件传输至nand的,而是一个集成多功能应用的命令调用终端,以后我们实现的其他项目的命令也在这里集成!

新建cmd-cli/main.c

///@file cmd-cli.c
///@brief cmd-cli命令行工具
///@author li
///@date 2026-03-29

#include "s3c2440a.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

extern void do_rx_nand(const char *addr_str);
extern int do_nand_read(unsigned int addr, uint8_t *buf, unsigned int size);

static void helpler(void)
{
    printf("命令: \r\n");
    printf("\thelp - 显示帮助信息\r\n");
    printf("\trx <NAND地址> - 接收文件到NAND\r\n");
    printf("\tnandread <NAND地址> <字节数> - 读取NAND数据\r\n");
}

int main()
{
    nand_init();

    /* 串口交互:避免 stdin/stdout 全缓冲导致行编辑与提示符异常 */
    setvbuf(stdin, NULL, _IONBF, 0);
    setvbuf(stdout, NULL, _IONBF, 0);

    printf("cmd-cli命令行工具\r\n");

    char line[160];

    while (1) {
        printf("cmd-cli> ");

        // 先清空接收和UART缓冲
        memset(line, 0, sizeof(line));
        uart0_getc_timeout(10);

        if (fgets(line, sizeof(line), stdin) == NULL) {
            continue;
        }
        // 回显
        printf("%s", line);

        char cmd[128];
        char arg[128];
        char arg2[128];
        int n = sscanf(line, "%127s %127s %127s", cmd, arg, arg2);

        if (n < 1 || cmd[0] == '\0') {
            continue;
        }

        if (strcmp(cmd, "help") == 0) {
            helpler();
            continue;
        }

        else if (strcmp(cmd, "rx") == 0) {
            if (n < 2) {
                printf("用法: rx <NAND地址>\r\n");
                continue;
            }
            do_rx_nand(arg);
        }

        else if (strcmp(cmd, "nandread") == 0) {
            if (n < 3) {
                printf("用法: nandread <NAND地址> <字节数>\r\n");
                continue;
            }
            char *end = NULL;
            unsigned long a = strtoul(arg, &end, 0);
            if (end == arg || *end != '\0') {
                printf("错误: 无效地址 \"%s\"\r\n", arg);
                continue;
            }
            end = NULL;
            unsigned long sz = strtoul(arg2, &end, 0);
            if (end == arg2 || *end != '\0' || sz == 0) {
                printf("错误: 无效字节数 \"%s\"(须为正整数)\r\n", arg2);
                continue;
            }
            unsigned int addr = (unsigned int)a;
            unsigned int size = (unsigned int)sz;
            uint8_t *buf = (uint8_t *)malloc(size);
            if (buf == NULL) {
                printf("错误: 内存分配失败\r\n");
                continue;
            }
            int ret = do_nand_read(addr, buf, size);
            if (ret == 0) {
                printf("读取成功: %d 字节\r\n", size);
                for (unsigned int i = 0; i < size; i++) {
                    printf("%02X%s", buf[i], i % 16 == 15 ? "\r\n" : " ");
                }
                printf("\r\n");
            } else {
                printf("读取失败\r\n");
            }
            free(buf);
        }
        else {
            printf("未知命令: %s\r\n", cmd);
            helpler();
        }
    }
}

因为我们已经适配了标准库,所以我们可以使用fgetssscanf这类的标准库函数,

程序的主要设计是,
在这里插入图片描述

8. 初步测试

编译make cmd-cli,烧录,运行,
请添加图片描述

我们使用支持XModem-CRC文件传输的终端工具,发送一个文件,

请添加图片描述
可以看到,传输成功完成!
请添加图片描述

因为工具默认发送的接收命令是rx 巴拉巴拉.pdf, 这不符合我们定义的rx 0xXXXX格式,我们固件端对不符合格式的参数默认当作地址0处理,所以这里会把文件默认传输到Nand的开头。

我们使用WSL命令行生成一个内容为U32数组:0x01 0x02 0x03 … 一直到10000 的二进制文件,

python3 -c "import struct; f=open('output.bin', 'wb'); [f.write(struct.pack('<I', i)) for i in range(1, 10001)]; f.close()"

通过上述方法,将此文件烧录至Nand(此时应该烧录至开头)

然后通过串口终端,我们执行nandread命令,

# 我们烧录的文件是10000个元素的U32数组,所以实际长度为40000
nandread 0 40000

得到结果,
请添加图片描述

可以看到,内部排列确实是依次递增,并且最后一个数是10 27 00 00, 0x2710十进制为10000,

nandread功能验证OK!

9. 上位机软件实现(KillYou3000)

现在我们要打造一个瑞士军刀式的PC软件,用于测试XModem文件传输功能,并且后续我们开发其它功能都在此基础上进行扩展!

既然是瑞士军刀式的工具箱软件,那就起名为【**要你命3000】**好了!
请添加图片描述

9.1 方案设计

设计
功能 - 串口连接(可设置波特率)
- 串口终端显示- XModem-CRC协议传输
- 文件多选并且可分别设置地址
- 显示传输进度
开发语言和环境 Python + Cursor(或VSCode)+ venv虚拟环境
使用框架/组件 - UI框架使用PySide6
- 串口通信使用pyserial
- 打包使用pyinstaller
UI开发模式 .ui可视化编辑界面 → uic.exe工具转换成.py文件

9.2 项目环境搭建

新建目录KillYou3000

打开Powershell,执行python -m venv .venv,

自动生成.venv虚拟环境目录,

Vscode打开此目录,

确保已安装Python和QT插件,
请添加图片描述

设置项目的解析器为.venv目录下的,
请添加图片描述

打开终端,此时应该自动进入虚拟环境,
请添加图片描述

终端执行命令安装必要插件,

pip install PySide6 pyserial pyinstaller

安装完成后,
请添加图片描述

下面设置QT插件,

找到Qt-ui设置项,设置UIC工具路径为.venv目录下的,
请添加图片描述

这个是将.ui文件用designer工具打开的路径设置

9.3 UI设计

执行插件命令,
请添加图片描述

创建一个UI文件,
请添加图片描述

点击此ui文件,显示此界面,
请添加图片描述

选择用Designer打开,然后进行界面编辑,请添加图片描述

9.4 ui转py

界面编辑完成之后,

终端执行转换命令,
请添加图片描述

即可得到py文件

9.5 XModem功能实现

直接上代码吧,

新建xmodem_crc.py

# -*- coding: utf-8 -*-
"""
XMODEM-CRC(128 字节分组)发送端实现(基于 pyserial)。

主要用于:串口上向接收端发送 XMODEM-CRC 文件数据,并支持:
1) 等待接收端握手(接收端发出 CRC 模式请求 'C')
2) 每块数据 ACK/NAK 重试
3) 发送过程可取消(threading.Event)
4) 将握手前后的“非协议字节”作为设备日志回调展示(on_rx_log)
"""

from __future__ import annotations

import time
from typing import BinaryIO, Callable, Optional

SOH = 0x01
EOT = 0x04
ACK = 0x06
NAK = 0x15
CAN = 0x18
CRC_READY = 0x43  # 'C' — receiver requests CRC mode

BLOCK_SIZE = 128
PAD_BYTE = 0x1A


class XmodemSendError(Exception):
    """传输失败或被取消时抛出的异常。"""


def send_rx_command(
    stream,
    address: str,
    cancel_event=None,
    line_ending: bytes = b"\r\n",
) -> None:
    """
    向设备发送启动接收指令:``rx <address>``(ASCII)。

    该函数只负责“发命令”,握手(等待 'C')和后续 XMODEM-CRC 发送由调用方完成。

    Args:
        stream: pyserial 的 Serial 对象(提供 write/flush)。
        address: 烧录地址字符串(例如 ``0x20000000``)。
        cancel_event: 取消事件(threading.Event),用于中途退出。
        line_ending: 行结束符,默认 ``\\r\\n``。

    Returns:
        None
    """
    if cancel_event is not None and cancel_event.is_set():
        raise XmodemSendError("已取消")
    addr = address.strip()
    if not addr:
        raise XmodemSendError("烧录地址为空")
    cmd = ("rx " + addr).encode("utf-8") + line_ending
    stream.write(cmd)
    stream.flush()


def crc16_xmodem(data: bytes) -> int:
    """
    计算 XMODEM-CRC 使用的 CRC-16。

    - 多项式:0x1021
    - 初始值:0

    Args:
        data: 需要计算 CRC 的数据。

    Returns:
        CRC-16(0~0xFFFF)
    """
    crc = 0
    for byte in data:
        crc ^= byte << 8
        for _ in range(8):
            if crc & 0x8000:
                crc = ((crc << 1) ^ 0x1021) & 0xFFFF
            else:
                crc = (crc << 1) & 0xFFFF
    return crc


def _read_protocol_byte(
    stream,
    timeout_s: float,
    cancel_event=None,
    on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> Optional[int]:
    """
    在超时范围内持续读取串口字节,直到读到协议应答之一:
    - ACK / NAK / CAN

    如果读到的不是协议应答字节,则按“设备日志/回显”处理:
    - 若设置了 ``on_rx_log``:回调传递该字节
    - 否则丢弃

    Args:
        stream: pyserial Serial。
        timeout_s: 等待超时时间(秒)。
        cancel_event: 取消事件。
        on_rx_log: 非协议字节回调(bytes,单字节)。

    Returns:
        ACK / NAK / CAN 的字节值;或超时返回 None。
    """
    deadline = time.monotonic() + timeout_s
    while time.monotonic() < deadline:
        if cancel_event is not None and cancel_event.is_set():
            raise XmodemSendError("已取消")
        n = stream.in_waiting
        if n:
            b = stream.read(1)
            if not b:
                break
            x = b[0]
            if x in (ACK, NAK, CAN):
                return x
            if on_rx_log:
                on_rx_log(bytes([x]))
        else:
            time.sleep(0.001)
    return None


def wait_for_crc_ready(
    stream,
    timeout_s: float = 120.0,
    cancel_event=None,
    on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
    """
    等待接收端进入 XMODEM-CRC 接收准备状态。

    接收端通常会周期性发出 ASCII ``'C'``(0x43)请求“CRC 模式”。
    但某些设备在握手之前会输出包含字母 ``C`` 的文本(例如 ``CRC``),
    如果简单“看到 0x43 就认为就绪”,可能会误触发导致后续块发送卡住。

    本函数采用更稳健的判定策略:
    - 若连续收到 ``CC``(两个 0x43 且相邻),直接认为就绪。
    - 若先收到单个 ``C``,在约 200ms 宽限期内:
        - 若再次收到 ``C``:认为就绪
        - 若收到其他字节:把 ``C`` + 该字节作为设备日志回调,并继续寻找下一次握手
        - 若宽限期内无更多数据:把单个 ``C`` 视为就绪(兼容“只发送一个 C”的实现)
    """
    deadline = time.monotonic() + timeout_s
    single_c_grace_s = 0.22

    while time.monotonic() < deadline:
        if cancel_event is not None and cancel_event.is_set():
            raise XmodemSendError("已取消")
        n = stream.in_waiting
        if not n:
            time.sleep(0.01)
            continue
        raw = stream.read(1)
        if not raw:
            continue
        b0 = raw[0]
        if b0 != CRC_READY:
            if on_rx_log:
                on_rx_log(raw)
            continue

        # 读到第一个 'C' 后,需要区分:它是握手同步字节,还是日志文本中的字母 C
        inner_deadline = time.monotonic() + single_c_grace_s
        while time.monotonic() < inner_deadline:
            if cancel_event is not None and cancel_event.is_set():
                raise XmodemSendError("已取消")
            if stream.in_waiting:
                b1 = stream.read(1)[0]
                if b1 == CRC_READY:
                    return
                if on_rx_log:
                    on_rx_log(bytes([CRC_READY, b1]))
                break
            time.sleep(0.002)
        else:
            # No second byte within grace period — single C is the sync
            return

    raise XmodemSendError("等待接收端发送 'C' (CRC 就绪) 超时")


def _send_block(
    stream,
    seq_byte: int,
    data128: bytes,
    max_retries: int = 20,
    cancel_event=None,
    on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
    """
    发送单个 XMODEM-CRC 数据块(128 字节)。

    Args:
        stream: pyserial Serial。
        seq_byte: 块序号(0~255 的低 8 位)。
        data128: 必须长度为 128 的分组数据。
        max_retries: 单块最大重试次数。
        cancel_event: 取消事件。
        on_rx_log: 非协议应答字节回调(用于显示设备日志)。

    Returns:
        None(成功发送后返回;失败抛异常)
    """
    if len(data128) != BLOCK_SIZE:
        raise ValueError("block must be 128 bytes")
    seq = seq_byte & 0xFF
    c = crc16_xmodem(data128)
    pkt = bytes([SOH, seq, (~seq) & 0xFF]) + data128 + bytes([c >> 8, c & 0xFF])

    for _ in range(max_retries):
        if cancel_event is not None and cancel_event.is_set():
            raise XmodemSendError("已取消")
        stream.write(pkt)
        stream.flush()
        r = _read_protocol_byte(stream, 3.0, cancel_event, on_rx_log)
        if r == ACK:
            return
        if r == CAN:
            raise XmodemSendError("接收端取消 (CAN)")
        if r == NAK:
            continue
    raise XmodemSendError(f"数据块 seq=0x{seq:02x} 发送失败(NAK/超时次数过多)")


def _send_eot(
    stream,
    max_retries: int = 10,
    cancel_event=None,
    on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
    """
    发送结束符 EOT,并等待 ACK。

    Args:
        stream: pyserial Serial。
        max_retries: EOT 最大重试次数。
        cancel_event: 取消事件。
        on_rx_log: 非协议应答字节回调(用于显示设备日志)。

    Returns:
        None(成功后返回;失败抛异常)
    """
    for _ in range(max_retries):
        if cancel_event is not None and cancel_event.is_set():
            raise XmodemSendError("已取消")
        stream.write(bytes([EOT]))
        stream.flush()
        r = _read_protocol_byte(stream, 3.0, cancel_event, on_rx_log)
        if r == ACK:
            return
        if r == CAN:
            raise XmodemSendError("接收端取消 (CAN)")
    raise XmodemSendError("结束 EOT 未收到 ACK")


def send_file(
    stream,
    file_obj: BinaryIO,
    file_size: int,
    on_progress: Optional[Callable[[int, int], None]] = None,
    cancel_event=None,
    wait_crc: bool = True,
    on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
    """
    使用 XMODEM-CRC 协议发送文件。

    协议行为:
    1) (可选)等待接收端握手(等待 'C')
    2) 以 128 字节为分组发送每块,等待 ACK/NAK
    3) 所有数据发送完成后发送 EOT 并等待 ACK

    Args:
        stream: pyserial Serial 对象(用于读 ACK/NAK/CAN 与写数据)。
        file_obj: 已打开的二进制文件对象(必须从文件开头开始读)。
        file_size: 文件总字节数(用于进度与不足 128 字节的 padding)。
        on_progress: 进度回调:``on_progress(已发送字节数, file_size)``。
        cancel_event: 取消事件(threading.Event)。
        wait_crc: 是否在发送第一块前等待接收端的握手 'C'。
        on_rx_log: 可选的“非协议字节”回调(设备回显/日志),用于 UI 显示。

    Returns:
        None
    """
    if wait_crc:
        wait_for_crc_ready(stream, cancel_event=cancel_event, on_rx_log=on_rx_log)

    block_no = 1
    sent = 0

    while True:
        if cancel_event is not None and cancel_event.is_set():
            raise XmodemSendError("已取消")
        raw = file_obj.read(BLOCK_SIZE)
        if not raw:
            break
        delivered = len(raw)
        if delivered < BLOCK_SIZE:
            chunk = raw + bytes([PAD_BYTE]) * (BLOCK_SIZE - delivered)
        else:
            chunk = raw

        seq_byte = block_no & 0xFF
        _send_block(stream, seq_byte, chunk, cancel_event=cancel_event, on_rx_log=on_rx_log)

        sent += delivered
        if sent > file_size:
            sent = file_size
        if on_progress:
            on_progress(sent, file_size)
        block_no += 1

    _send_eot(stream, cancel_event=cancel_event, on_rx_log=on_rx_log)
    if on_progress:
        on_progress(file_size, file_size)


def send_file_path(
    stream,
    path: str,
    on_progress: Optional[Callable[[int, int], None]] = None,
    cancel_event=None,
    wait_crc: bool = True,
    on_rx_log: Optional[Callable[[bytes], None]] = None,
) -> None:
    """
    通过文件路径发送文件(内部打开文件并调用 ``send_file``)。

    Args:
        stream: pyserial Serial。
        path: 待发送文件路径。
        on_progress: 进度回调。
        cancel_event: 取消事件。
        wait_crc: 是否等待握手 'C'。
        on_rx_log: 设备日志回调。

    Returns:
        None
    """
    import os

    size = os.path.getsize(path)
    with open(path, "rb") as f:
        send_file(stream, f, size, on_progress, cancel_event, wait_crc, on_rx_log)
    # 传输结束后设备可能仍有一行日志
    if on_rx_log:
        time.sleep(0.02)
        try:
            n = stream.in_waiting
            if n:
                on_rx_log(stream.read(n))
        except Exception:
            pass

9.6 主代码实现

新建main.py

# -*- coding: utf-8 -*-
"""
要你命3000 工具箱(当前实现:串口连接 + 多文件顺序发送 + XMODEM-CRC)。

功能要点:
1) 串口连接/断开(pyserial)
2) 通过 UI 选择多个文件与对应烧录地址,并按顺序发送
3) 每个文件发送前先下发 ``rx <address>``,再等待接收端握手('C')后进行 XMODEM-CRC
4) 在 UI 的 ``textBrowser`` 中显示串口接收内容(rx 日志)
5) 支持发送过程取消
"""

import os
import threading
import time

import serial
from serial.tools import list_ports
from PySide6.QtCore import QFileInfo, QSettings, QThread, QTimer, Signal, Qt
from PySide6.QtGui import QFont, QTextCursor
from PySide6.QtWidgets import (
    QApplication,
    QFileDialog,
    QMainWindow,
    QMessageBox,
)

from ui_main import Ui_Form
from xmodem_crc import XmodemSendError, send_file_path, send_rx_command


def _format_size(n: int) -> str:
    if n < 1024:
        return f"{n} B"
    if n < 1024 * 1024:
        return f"{n / 1024:.1f} KB"
    return f"{n / (1024 * 1024):.2f} MB"


_RX_TEXT_MAX_CHARS = 400_000
_RX_TEXT_KEEP_CHARS = 300_000


class XmodemBatchSendThread(QThread):
    """
    多文件顺序发送线程。

    发送策略:
    - 对于每个任务(文件路径 + 地址):
        1) 发送 ``rx <地址>`` 给设备
        2) 等待设备发起 XMODEM-CRC 握手(接收端发出 'C')
        3) 发送文件数据(XMODEM-CRC 128 字节分组)

    线程内会把:
    - 进度:``progress(sent_bytes, total_bytes)``
    - 当前任务状态:``status(index, total, filename, addr)``
    - 发送过程读到的“非协议字节”(设备日志):``rx_log(bytes)``
    - 完成/失败:``finished(ok, message)``
    """

    progress = Signal(int, int)
    status = Signal(int, int, str, str)
    rx_log = Signal(bytes)
    finished = Signal(bool, str)

    def __init__(
        self,
        ser: serial.Serial,
        jobs: list[tuple[str, str]],
        cancel: threading.Event,
        inter_file_delay_s: float,
    ):
        super().__init__()
        self._ser = ser
        self._jobs = jobs
        self._cancel = cancel
        self._inter_file_delay_s = float(inter_file_delay_s)

    def run(self):
        try:
            total = sum(os.path.getsize(p) for p, _ in self._jobs)
            sent_base = 0
            for i, (path, addr) in enumerate(self._jobs):
                if self._cancel.is_set():
                    raise XmodemSendError("已取消")
                self.status.emit(i + 1, len(self._jobs), os.path.basename(path), addr)

                # 进度回调:把“当前文件内已发送字节”映射到“整体任务已发送字节”
                def on_prog(s: int, t: int, base=sent_base):
                    self.progress.emit(base + s, total)

                # 设备回显/日志字节:由协议层(握手/块应答等待)回传
                def on_rx(data: bytes):
                    if data:
                        self.rx_log.emit(data)

                # 多文件之间:
                # 1) 给设备足够时间完成上一轮 XMODEM 接收/初始化
                # 2) 读取尾部日志(避免下一轮 rx 覆盖掉上一轮输出)
                if i > 0:
                    time.sleep(self._inter_file_delay_s)
                    try:
                        n = self._ser.in_waiting
                        if n:
                            on_rx(self._ser.read(n))
                    except Exception:
                        pass
                try:
                    self._ser.reset_input_buffer()
                except Exception:
                    pass
                # 下发启动接收命令(进入对应地址接收态)
                send_rx_command(self._ser, addr, self._cancel)

                # 发送当前文件:内部会等待接收端发 'C'(CRC 握手)
                send_file_path(
                    self._ser,
                    path,
                    on_progress=on_prog,
                    cancel_event=self._cancel,
                    wait_crc=True,
                    on_rx_log=on_rx,
                )
                sent_base += os.path.getsize(path)
            self.finished.emit(True, "全部传输完成")
        except XmodemSendError as e:
            self.finished.emit(False, str(e))
        except Exception as e:
            self.finished.emit(False, f"{type(e).__name__}: {e}")


class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle("要你命3000")
        self.ui = Ui_Form()
        self.ui.setupUi(self)

        self._serial: serial.Serial | None = None
        self._row_paths: list[str | None] = [None, None, None, None]
        self._cancel = threading.Event()
        self._send_thread: XmodemBatchSendThread | None = None
        self._batch_info: tuple[int, int, str, str] | None = None
        self._inter_file_delay_s: float = 4.0

        # 参数/状态记忆(QSettings)
        # 记忆内容:
        # - 串口:端口、波特率
        # - 批量任务:四行的“文件路径 / 地址 / 勾选状态”
        # - 批量发送:文件间隔(秒)
        self._settings = QSettings("KillYou3000", "KillYou3000")

        # 四行任务槽位:每行包含“选择文件按钮 / 地址输入框 / 勾选框”
        self._file_slots: list[tuple] = [
            (self.ui.toolButton, self.ui.lineEdit, self.ui.checkBox),
            (self.ui.toolButton_2, self.ui.lineEdit_2, self.ui.checkBox_2),
            (self.ui.toolButton_3, self.ui.lineEdit_3, self.ui.checkBox_3),
            (self.ui.toolButton_4, self.ui.lineEdit_4, self.ui.checkBox_4),
        ]

        self._rx_timer = QTimer(self)
        self._rx_timer.setInterval(30)
        self._rx_timer.timeout.connect(self._poll_serial_rx)

        self.ui.textBrowser.setOpenExternalLinks(False)
        self.ui.textBrowser.setPlaceholderText("连接串口后在此显示接收数据(UTF-8,非法字节显示为 �)")
        _mono = QFont("Consolas", 9)
        if not _mono.exactMatch():
            _mono = QFont("Courier New", 9)
        self.ui.textBrowser.setFont(_mono)

        self.ui.progressBar.setRange(0, 100)
        self.ui.progressBar.setValue(0)
        self.ui.pushButton_2.setEnabled(False)

        self.ui.pushButton_refresh_ports.clicked.connect(self.refresh_ports)
        self.ui.pushButton_serial_connect.clicked.connect(self.toggle_serial)
        for i, (btn, le, cb) in enumerate(self._file_slots):
            btn.clicked.connect(lambda _=False, r=i: self.pick_file_row(r))
            cb.stateChanged.connect(lambda _=None: self._update_send_enabled())
            le.textChanged.connect(lambda _=None: self._update_send_enabled())

        self.ui.pushButton.clicked.connect(self.start_send)
        self.ui.pushButton_2.clicked.connect(self.stop_send)

        self._load_settings()
        self.refresh_ports()
        self._apply_serial_settings()
        self._apply_batch_settings_to_ui()
        self._update_send_enabled()

    def _rx_monitor_active(self) -> bool:
        return (
            self._serial is not None
            and self._serial.is_open
            and not (self._send_thread and self._send_thread.isRunning())
        )

    def _poll_serial_rx(self):
        if not self._rx_monitor_active():
            return
        try:
            n = self._serial.in_waiting
            if not n:
                return
            data = self._serial.read(n)
        except Exception:
            return
        text = data.decode("utf-8", errors="replace")
        self._append_rx_text(text)

    def _append_rx_from_worker(self, data: bytes):
        """
        接收发送线程回传的“设备日志/回显”并显示到终端窗口。

        该显示路径与主线程定时器轮询互斥使用,避免同一时段重复读串口。
        """
        if not data:
            return
        self._append_rx_text(data.decode("utf-8", errors="replace"))

    def _append_rx_text(self, text: str):
        cursor = self.ui.textBrowser.textCursor()
        cursor.movePosition(QTextCursor.MoveOperation.End)
        cursor.insertText(text)
        self.ui.textBrowser.setTextCursor(cursor)
        self.ui.textBrowser.ensureCursorVisible()
        plain = self.ui.textBrowser.toPlainText()
        if len(plain) > _RX_TEXT_MAX_CHARS:
            self.ui.textBrowser.setPlainText(plain[-_RX_TEXT_KEEP_CHARS:])
            cursor = self.ui.textBrowser.textCursor()
            cursor.movePosition(QTextCursor.MoveOperation.End)
            self.ui.textBrowser.setTextCursor(cursor)

    def refresh_ports(self):
        self.ui.comboBox.clear()
        for p in list_ports.comports():
            self.ui.comboBox.addItem(p.device, p.description)
        if self.ui.comboBox.count() == 0:
            self.ui.comboBox.addItem("(无串口)")
        self.statusBar().showMessage(f"已刷新,共 {self.ui.comboBox.count()} 个端口", 3000)

    def toggle_serial(self):
        if self._send_thread and self._send_thread.isRunning():
            QMessageBox.information(self, "提示", "请先停止传输再断开串口。")
            return
        if self._serial is not None and self._serial.is_open:
            self._close_serial()
        else:
            self._open_serial()

    def _open_serial(self):
        port = self.ui.comboBox.currentText()
        if not port or port.startswith("("):
            QMessageBox.warning(self, "串口", "请选择有效串口。")
            return
        try:
            baud = int(self.ui.comboBox_2.currentText())
        except ValueError:
            QMessageBox.warning(self, "串口", "波特率无效。")
            return
        try:
            self._serial = serial.Serial(
                port,
                baud,
                timeout=0.2,
                write_timeout=5.0,
            )
        except serial.SerialException as e:
            QMessageBox.critical(self, "串口", f"打开失败:\n{e}")
            self._serial = None
            return
        self.ui.pushButton_serial_connect.setText("断开")
        self.statusBar().showMessage(f"已连接 {port} @ {baud}", 5000)
        self._rx_timer.start()
        self._update_send_enabled()

    def _close_serial(self):
        self._rx_timer.stop()
        if self._serial is not None:
            try:
                if self._serial.is_open:
                    self._serial.close()
            except Exception:
                pass
            self._serial = None
        self.ui.pushButton_serial_connect.setText("连接")
        self.statusBar().showMessage("串口已断开", 3000)
        self._update_send_enabled()

    def pick_file_row(self, row: int):
        """为指定行选择文件,并将文件名显示到对应的按钮上。"""
        path, _ = QFileDialog.getOpenFileName(self, "选择要发送的文件", "", "所有文件 (*.*)")
        if path:
            self._row_paths[row] = path
            btn = self._file_slots[row][0]
            btn.setText(QFileInfo(path).fileName())
            btn.setToolTip(path)
            self.statusBar().showMessage(path, 5000)
        self._update_send_enabled()

    def _collect_checked_jobs(self) -> list[tuple[str, str]] | None:
        """
        收集所有已勾选任务。

        Returns:
            若有效任务存在:返回 ``[(path, addr), ...]``;
            若缺少文件或地址:弹窗提示并返回 None。
        """
        jobs: list[tuple[str, str]] = []
        for i, (_, le, cb) in enumerate(self._file_slots):
            if not cb.isChecked():
                continue
            path = self._row_paths[i]
            addr = le.text().strip()
            if not path:
                QMessageBox.warning(self, "传输", f"第 {i + 1} 行已勾选,请先选择文件。")
                return None
            if not addr:
                QMessageBox.warning(self, "传输", f"第 {i + 1} 行已勾选,请填写烧录地址。")
                return None
            jobs.append((path, addr))
        return jobs

    def _update_send_enabled(self):
        """根据勾选状态、已选文件/地址情况,决定“发送”按钮是否可用。"""
        ok = False
        if self._serial is not None and self._serial.is_open:
            for i, (_, le, cb) in enumerate(self._file_slots):
                if cb.isChecked() and self._row_paths[i] and le.text().strip():
                    ok = True
                    break
        ok = ok and not (self._send_thread and self._send_thread.isRunning())
        self.ui.pushButton.setEnabled(ok)

    def start_send(self):
        jobs = self._collect_checked_jobs()
        if not jobs:
            if self._serial is None or not self._serial.is_open:
                return
            QMessageBox.warning(self, "传输", "请至少勾选一行,并选择文件、填写地址。")
            return
        if not self._serial or not self._serial.is_open:
            return
        self._cancel.clear()
        self._batch_info = None
        self.ui.progressBar.setValue(0)
        self.ui.label_6.setText("")
        self.ui.pushButton.setEnabled(False)
        self.ui.pushButton_2.setEnabled(True)
        self.ui.pushButton_serial_connect.setEnabled(False)
        self.ui.pushButton_refresh_ports.setEnabled(False)

        self._send_thread = XmodemBatchSendThread(
            self._serial,
            jobs,
            self._cancel,
            inter_file_delay_s=self._inter_file_delay_s,
        )
        self._send_thread.progress.connect(self._on_send_progress, Qt.ConnectionType.QueuedConnection)
        self._send_thread.status.connect(self._on_batch_status, Qt.ConnectionType.QueuedConnection)
        self._send_thread.rx_log.connect(self._append_rx_from_worker, Qt.ConnectionType.QueuedConnection)
        self._send_thread.finished.connect(self._on_send_finished, Qt.ConnectionType.QueuedConnection)
        self._rx_timer.stop()
        self._send_thread.start()
        self.statusBar().showMessage("传输中:先发 rx,再等待 'C' 与 Xmodem…", 0)

    def _on_batch_status(self, idx: int, total: int, name: str, addr: str):
        self._batch_info = (idx, total, name, addr)
        self.statusBar().showMessage(f"{idx}/{total}  {addr}  {name}", 0)

    def _on_send_progress(self, sent: int, total: int):
        if total > 0:
            pct = int(min(100, round(100.0 * sent / total)))
            self.ui.progressBar.setValue(pct)
            if self._batch_info:
                i, n, name, addr = self._batch_info
                self.ui.label_6.setText(
                    f"[{i}/{n}] {addr}  {name}  {_format_size(sent)} / {_format_size(total)}  ({pct}%)"
                )
            else:
                self.ui.label_6.setText(f"{_format_size(sent)} / {_format_size(total)}  ({pct}%)")

    def _on_send_finished(self, ok: bool, message: str):
        self._batch_info = None
        self.ui.pushButton_2.setEnabled(False)
        self.ui.pushButton_serial_connect.setEnabled(True)
        self.ui.pushButton_refresh_ports.setEnabled(True)
        if self._serial is not None and self._serial.is_open:
            self._rx_timer.start()
        self._update_send_enabled()
        self.statusBar().showMessage(message, 8000)
        if not ok and message != "已取消":
            QMessageBox.warning(self, "传输", message)

    def stop_send(self):
        self._cancel.set()
        self.statusBar().showMessage("正在停止…", 3000)

    def closeEvent(self, event):
        self._rx_timer.stop()
        self._cancel.set()
        self._save_settings()
        if self._send_thread and self._send_thread.isRunning():
            self._send_thread.wait(8000)
        if self._serial is not None and self._serial.is_open:
            try:
                self._serial.close()
            except Exception:
                pass
        event.accept()

    def _load_settings(self) -> None:
        """从 QSettings 恢复参数到内部变量(UI 刷新前使用)。"""
        try:
            self._inter_file_delay_s = float(
                self._settings.value("batch/inter_file_delay_s", 4.0)
            )
        except Exception:
            self._inter_file_delay_s = 4.0

        self._last_serial_port = str(self._settings.value("serial/port", ""))
        self._last_serial_baud = str(self._settings.value("serial/baud", ""))

        # 批量任务槽位状态
        self._batch_slot_paths: list[str | None] = [None, None, None, None]
        self._batch_slot_addrs: list[str] = ["", "", "", ""]
        self._batch_slot_enabled: list[bool] = [False, False, False, False]
        for i in range(4):
            key = f"batch/slot_{i}"
            path = self._settings.value(f"{key}/path", "")
            self._batch_slot_paths[i] = str(path) if path else None
            self._batch_slot_addrs[i] = str(self._settings.value(f"{key}/addr", ""))
            # QSettings value 可直接用 type=bool 读取
            enabled = self._settings.value(f"{key}/enabled", False, type=bool)
            self._batch_slot_enabled[i] = bool(enabled)

    def _apply_serial_settings(self) -> None:
        """在端口列表刷新完成后,把记忆的串口参数应用到 UI。"""
        # 端口:只有在当前系统存在时才设置
        if self._last_serial_port:
            for idx in range(self.ui.comboBox.count()):
                if self.ui.comboBox.itemText(idx) == self._last_serial_port:
                    self.ui.comboBox.setCurrentIndex(idx)
                    break

        # 波特率:直接按字符串匹配 comboBox_2 文本
        if self._last_serial_baud:
            for idx in range(self.ui.comboBox_2.count()):
                if self.ui.comboBox_2.itemText(idx) == self._last_serial_baud:
                    self.ui.comboBox_2.setCurrentIndex(idx)
                    break

    def _apply_batch_settings_to_ui(self) -> None:
        """把记忆的批量槽位信息应用到 UI 组件。"""
        for i, (btn, le, cb) in enumerate(self._file_slots):
            path = self._batch_slot_paths[i]
            if path:
                self._row_paths[i] = path
                btn.setText(QFileInfo(path).fileName())
                btn.setToolTip(path)
            else:
                self._row_paths[i] = None
                btn.setText("...")
                btn.setToolTip("")

            le.setText(self._batch_slot_addrs[i] or "")
            cb.setChecked(self._batch_slot_enabled[i])

    def _save_settings(self) -> None:
        """把当前参数保存到 QSettings。"""
        if self.ui.comboBox.currentText():
            self._settings.setValue("serial/port", self.ui.comboBox.currentText())
        if self.ui.comboBox_2.currentText():
            self._settings.setValue("serial/baud", self.ui.comboBox_2.currentText())

        self._settings.setValue("batch/inter_file_delay_s", self._inter_file_delay_s)
        for i, (_, le, cb) in enumerate(self._file_slots):
            key = f"batch/slot_{i}"
            path = self._row_paths[i] or ""
            self._settings.setValue(f"{key}/path", path)
            self._settings.setValue(f"{key}/addr", le.text().strip())
            self._settings.setValue(f"{key}/enabled", cb.isChecked())


if __name__ == "__main__":
    app = QApplication([])
    window = MainWindow()
    window.show()
    app.exec()

9.7 测试

请添加图片描述

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐