ThreadX 同步与通信机制(队列、信号量、事件标志位)
·
文章声明
本文档为个人学习笔记,内容由 AI 辅助整理、总结与完善,仅供学习交流、技术参考使用。
任何直接用于生产环境的代码与逻辑,请结合实际硬件、需求与官方手册进行验证与调试。
一、核心组件总览
RTOS 同步通信的核心是「数据缓冲 + 事件通知 + 资源保护」,对应 ThreadX 三大组件:
- 队列:负责数据缓存,解决生产 / 消费速度不匹配
- 信号量:负责事件同步或资源保护
- 事件标志组:负责多事件组合判断
二、队列(Queue)—— 数据缓冲核心
1. 核心作用
- 缓存数据,防止丢失(如串口 / CAN 接收的数据)
- 解耦「数据生产」(中断 / 任务)和「数据消费」(处理线程)
- 只存数据,不负责唤醒任务(需搭配信号量 / 事件)
2. 关键 API & 代码示例
(1)队列初始化
#include "tx_api.h"
// 定义队列句柄和缓冲区(CAN数据项8字节,缓存10个)
#define QUEUE_ITEM_SIZE 8
#define QUEUE_LENGTH 10
TX_QUEUE xy_data_queue;
uint8_t queue_buf[QUEUE_LENGTH * QUEUE_ITEM_SIZE];
// 初始化函数(系统启动时调用)
void queue_init(void) {
UINT status = tx_queue_create(
&xy_data_queue, // 队列句柄
"XY_CAN_QUEUE", // 队列名称
QUEUE_ITEM_SIZE, // 单个数据项大小
queue_buf, // 队列缓冲区
sizeof(queue_buf) // 缓冲区总大小
);
if (status != TX_SUCCESS) {
printf("队列创建失败:0x%02X\r\n", status);
}
}
(2)任务中发送数据(非阻塞)
// 向队列写入CAN数据
uint8_t can_data[8] = {0x01,0x02,0x03,0x04,0x05,0x06,0x07,0x08};
UINT status = tx_queue_send(
&xy_data_queue, // 目标队列
can_data, // 待发送数据指针
TX_NO_WAIT // 非阻塞:队列满则立即返回
);
if (status == TX_QUEUE_FULL) {
printf("队列满,数据写入失败!\r\n");
}
(3)中断中发送数据(ISR 安全版)
// 串口3中断中写入数据
void USART3_IRQHandler(void) {
BaseType_t xHigherPriorityTaskWoken = TX_FALSE;
uint8_t recv_byte = USART3->DR;
// 中断中必须用_from_isr版本
tx_queue_send_from_isr(
&Fd_Uart3_queue,
&recv_byte,
&xHigherPriorityTaskWoken
);
USART3->SR &= ~USART_SR_RXNE; // 清除中断标志
}
(4)任务中接收数据
// 从队列读取数据(非阻塞)
uint8_t read_data[8];
UINT status = tx_queue_receive(
&xy_data_queue,
read_data,
TX_NO_WAIT
);
if (status == TX_SUCCESS) {
printf("读取到队列数据:%02X\r\n", read_data[0]);
}
3. 核心特点
- 数据拷贝:ThreadX 会拷贝数据到队列缓冲区,原数据可复用
- 非阻塞 / 阻塞:
TX_NO_WAIT(立即返回)、TX_WAIT_FOREVER(永久阻塞)、数值(阻塞指定节拍) - 中断安全:必须用
_from_isr后缀接口
三、信号量(Semaphore)—— 事件同步 / 资源保护
信号量分三类:二值信号量、互斥信号量、计数信号量,核心差异在「初始值」和「所有权」。
1. 二值信号量(Binary Semaphore)—— 单事件同步
(1)核心作用
- 同步单一事件(如 “串口有数据”“按键按下”)
- 无所有权,中断可释放,任务仅获取
(2)关键 API & 代码示例
初始化(初始值 = 0,无事件)
TX_SEMAPHORE uart_get_semaphore; // 串口数据就绪信号量
void sem_binary_init(void) {
tx_semaphore_create(
&uart_get_semaphore,
"UART_DATA_SEM",
0 // 初始值=0:无事件,任务一开始阻塞
);
}
任务中获取(阻塞等待)
// 串口数据处理线程
void uart_process_thread(ULONG input) {
UINT status;
uint8_t recv_data;
for(;;) {
// 永久阻塞等待信号量(串口有数据才唤醒)
status = tx_semaphore_get(&uart_get_semaphore, TX_WAIT_FOREVER);
if (status != TX_SUCCESS) {
tx_thread_sleep(100);
continue;
}
// 获取成功,读取队列数据
tx_queue_receive(&Fd_Uart3_queue, &recv_data, TX_NO_WAIT);
process_uart_data(recv_data); // 处理数据
}
}
中断中释放(触发事件)
void USART3_IRQHandler(void) {
BaseType_t xHigherPriorityTaskWoken = TX_FALSE;
uint8_t recv_byte = USART3->DR;
// 1. 写入队列
tx_queue_send_from_isr(&Fd_Uart3_queue, &recv_byte, &xHigherPriorityTaskWoken);
// 2. 释放信号量(唤醒任务)
tx_semaphore_put_from_isr(&uart_get_semaphore, &xHigherPriorityTaskWoken);
}
(3)核心特点
- 初始值:0(无事件)
- 计数值:仅 0/1
- 所有权:无(谁都能释放,中断也可)
- 释放方:生产者(中断 / 任务),获取方:消费者(任务)
2. 互斥信号量(Mutex)—— 临界资源保护
(1)核心作用
- 保护共享资源(如 CAN 外设、串口打印、全局变量)
- 带优先级继承,解决 “优先级翻转” 问题
- 有所有权:谁获取,谁释放
- 设计初衷:支持多线程安全访问同一资源,单线程场景无意义
(2)关键特性
| 核心特性 | 具体说明 |
|---|---|
| 所有权 | 谁获取,谁释放(ThreadX 记录获取线程 ID) |
| 初始值 | 1(资源空闲) |
| 计数值 | 仅 0/1(0 = 占用,1 = 空闲) |
| 中断兼容性 | 绝对禁用(无线程上下文,无法判定所有权) |
| 优先级继承 | 低优先级线程占用锁时,临时提升至等待线程的优先级,防止优先级翻转 |
| 多线程支持 | 多个线程共用同一把锁,同一时间仅一个线程可操作资源 |
(3)关键 API & 代码示例
初始化(初始值 = 1,资源空闲)
TX_SEMAPHORE can_mutex_sem; // CAN外设互斥信号量
void sem_mutex_init(void) {
tx_semaphore_create(
&can_mutex_sem,
"CAN_MUTEX_SEM",
1 // 初始值=1:资源空闲
);
}
单线程使用(基础示例)
// XY模块发送CAN报文线程
void xy_can_send_thread(ULONG input) {
TCanFrame send_msg;
UINT status;
for(;;) {
// 等待事件触发(原有逻辑)
tx_event_flags_get(&xy_data_get_flags, SEND_FLAG, TX_OR | TX_CLEAR, &xy_send_events, TX_WAIT_FOREVER);
// 1. 获取互斥信号量(占用CAN外设,超时1s)
status = tx_semaphore_get(&can_mutex_sem, 10000);
if (status != TX_SUCCESS) {
printf("获取CAN互斥锁失败:0x%02X\r\n", status);
continue;
}
// 2. 临界区:操作CAN外设(同一时间仅一个线程执行)
memset(&send_msg, 0, sizeof(TCanFrame));
send_msg.Id = WRJ_MODULE << 24 | COMM_MODULE << 16 | WRJ_SELF_MSG << 8 | 0x00;
send_msg.Dlc = 8;
memcpy(send_msg.Data, wrj_tb, 8);
can_sendMessage(&send_msg);
// 3. 释放互斥信号量(归还资源,必须手动!)
tx_semaphore_put(&can_mutex_sem);
}
}
多线程共用互斥锁(核心场景)
// 全局互斥锁(所有线程共用这一把)
TX_SEMAPHORE can_mutex_sem;
// 线程A:XY Self数据发送
void thread_A(ULONG input) {
TCanFrame msgA;
for(;;) {
// 1. 申请锁(所有线程共用同一把锁)
tx_semaphore_get(&can_mutex_sem, TX_WAIT_FOREVER);
// 2. 临界区:操作CAN(同一时间仅一个线程执行)
msgA.Id = 0x123;
can_sendMessage(&msgA);
printf("线程A发送CAN:ID=0x123\r\n");
// 3. 释放锁(必须成对!)
tx_semaphore_put(&can_mutex_sem);
tx_thread_sleep(1000);
}
}
// 线程B:故障上报发送
void thread_B(ULONG input) {
TCanFrame msgB;
for(;;) {
// 申请同一把锁
tx_semaphore_get(&can_mutex_sem, TX_WAIT_FOREVER);
// 临界区:操作CAN
msgB.Id = 0x456;
can_sendMessage(&msgB);
printf("线程B发送CAN:ID=0x456\r\n");
tx_semaphore_put(&can_mutex_sem);
tx_thread_sleep(500);
}
}
// 线程C:串口触发Sta发送
void thread_C(ULONG input) {
TCanFrame msgC;
for(;;) {
// 申请同一把锁
tx_semaphore_get(&can_mutex_sem, TX_WAIT_FOREVER);
// 临界区:操作CAN
msgC.Id = 0x789;
can_sendMessage(&msgC);
printf("线程C发送CAN:ID=0x789\r\n");
tx_semaphore_put(&can_mutex_sem);
tx_thread_sleep(800);
}
}
(4)互斥锁 “锁生效” 三要素(缺一不可)
- 锁的唯一性:一个资源(如 CAN)对应一把专属锁,所有线程共用这把锁;
- 操作的排他性:所有操作资源的代码,必须放在
tx_semaphore_get/tx_semaphore_put之间; - 规则的一致性:所有线程遵守 “先拿锁、再操作、最后释放”,无例外线程。
(5)多线程执行流程(核心逻辑)
表格
| 时间点 | 线程 A 状态 | 线程 B 状态 | 线程 C 状态 | 互斥锁计数值 | 执行结果 |
|---|---|---|---|---|---|
| T0 | 就绪 | 就绪 | 就绪 | 1(空闲) | 无 |
| T1 | 抢到 CPU → 申请锁成功 | 就绪 | 就绪 | 0(被占用) | 线程 A 执行can_sendMessage |
| T2 | 释放锁 → 休眠 | 抢到 CPU → 申请锁成功 | 就绪 | 0(被占用) | 线程 B 执行can_sendMessage |
| T3 | 就绪 | 释放锁 → 休眠 | 抢到 CPU → 申请锁成功 | 0(被占用) | 线程 C 执行can_sendMessage |
| T4 | 就绪 | 就绪 | 释放锁 → 休眠 | 1(空闲) | 回到初始状态,循环排队 |
(6)常见误区 & 避坑指南
表格
| 误区 | 后果 | 正确做法 |
|---|---|---|
| 每个线程创建一把锁 | 锁失效,多线程同时操作资源 | 一个资源对应一把全局锁 |
中断中调用tx_semaphore_put |
系统崩溃 / 断言失败 | 中断用二值信号量通知任务,任务中用互斥锁 |
| 忘记释放锁(分支 return) | 资源死锁,其他线程永久阻塞 | 分支退出前必须释放锁 |
| 锁范围过大(包含休眠) | 线程阻塞时间过长,效率降低 | 仅锁 “操作资源的核心代码” |
| 锁范围过小(仅锁部分操作) | 资源操作不完整,数据错乱 | 锁覆盖所有资源相关操作 |
3. 计数信号量(Counting Semaphore)—— 资源计数
(1)核心作用
- 标记 “可用资源数量”(如 10 个缓存区、5 个连接数)
(2)代码示例
TX_SEMAPHORE buf_pool_sem; // 缓存池计数信号量
// 初始化:10个缓存可用
void sem_count_init(void) {
tx_semaphore_create(&buf_pool_sem, "BUF_POOL_SEM", 10);
}
// 任务中获取缓存(计数值-1)
void get_buf_thread(ULONG input) {
tx_semaphore_get(&buf_pool_sem, TX_WAIT_FOREVER);
// 使用缓存...
// 释放缓存(计数值+1)
tx_semaphore_put(&buf_pool_sem);
}
4. 信号量三类对比表
| 类型 | 初始值 | 计数值范围 | 核心用途 | 所有权 | 中断可用 | 释放方 | 多线程支持 |
|---|---|---|---|---|---|---|---|
| 二值信号量 | 0 | 0/1 | 单事件同步 | 无 | 是(释放) | 生产者(中断 / 任务) | - |
| 互斥信号量 | 1 | 0/1 | 临界资源保护 | 有 | 否 | 获取的线程(手动) | ✅ 核心场景 |
| 计数信号量 | ≥1 | 0~ 最大值 | 资源数量计数 | 无 | 是(释放) | 释放资源的任务 | ✅ 支持 |
四、事件标志组(Event Flags)—— 多事件组合
(1)核心作用
- 管理多个独立事件(如 “Self 数据发送”“Sta 数据发送”“故障上报”)
- 支持 “任意事件触发”(TX_OR)或 “所有事件触发”(TX_AND)
(2)关键 API & 代码示例
初始化
TX_EVENT_FLAGS_GROUP xy_data_get_flags; // CAN发送事件组
#define SEND_Self_flag (1UL << 0) // 第0位:Self数据发送
#define SEND_Sta_flag (1UL << 1) // 第1位:Sta数据发送
#define SEND_FLAG (SEND_Self_flag | SEND_Sta_flag)
void event_init(void) {
tx_event_flags_create(&xy_data_get_flags, "XY_EVENT_GROUP");
}
任务中等待事件
// CAN发送线程等待事件
void can_event_wait_thread(ULONG input) {
ULONG xy_send_events;
UINT status;
for(;;) {
// 永久阻塞,等待任意事件,触发后清除标志
status = tx_event_flags_get(
&xy_data_get_flags,
SEND_FLAG,
TX_OR | TX_CLEAR, // 核心:任意事件+清除标志
&xy_send_events,
TX_WAIT_FOREVER
);
if (status != TX_SUCCESS) continue;
// 判断具体触发的事件
if (xy_send_events & SEND_Self_flag) {
printf("Self数据发送事件触发\r\n");
}
if (xy_send_events & SEND_Sta_flag) {
printf("Sta数据发送事件触发\r\n");
}
}
}
任务 / 中断中设置事件
// 任务中设置Self事件
tx_event_flags_set(&xy_data_get_flags, SEND_Self_flag, TX_OR);
// 中断中设置Sta事件(ISR安全版)
void CAN_IRQHandler(void) {
BaseType_t xHigherPriorityTaskWoken = TX_FALSE;
tx_event_flags_set_from_isr(&xy_data_get_flags, SEND_Sta_flag, TX_OR, &xHigherPriorityTaskWoken);
}
(3)核心特点
- 事件位:32 位,每一位对应一个事件
- 触发规则:TX_OR(任意)、TX_AND(全部)
- 清除规则:TX_CLEAR(获取后清除,避免重复触发)
五、核心组件对比总表
| 组件 | 核心能力 | 典型使用场景 | 关键注意事项 |
|---|---|---|---|
| 队列 | 数据缓存 | 串口 / CAN 数据暂存 | 中断用_from_isr,检查队列满 |
| 二值信号量 | 单事件同步 | 串口数据就绪通知 | 中断可释放,任务仅获取 |
| 互斥信号量 | 临界资源保护 | CAN / 串口外设独占 | 必须手动释放,中断禁用 |
| 计数信号量 | 资源数量计数 | 缓存池 / 连接数管理 | 计数值≥0 |
| 事件标志组 | 多事件组合判断 | CAN 多类型数据发送触发 | 搭配 TX_CLEAR 避免重复触发 |
六、组合使用方法 & 实战案例
1. 组合原则
- 队列 + 二值信号量:单事件 + 数据缓冲(串口数据处理)
- 队列 + 事件标志组:多事件 + 数据缓冲(CAN 多类型发送)
- 互斥信号量 + 外设操作:保护共享外设(多线程发 CAN)
2. 案例 1:串口数据处理(队列 + 二值信号量)
流程
- 串口中断接收字节 → 写入队列
- 中断释放二值信号量 → 唤醒处理线程
- 线程获取信号量 → 从队列读数据 → 解析处理
完整代码
// 1. 初始化(系统启动时)
TX_QUEUE Fd_Uart3_queue;
TX_SEMAPHORE uart_get_semaphore;
uint8_t uart_queue_buf[10*1]; // 1字节/项,10项
void uart_sync_init(void) {
// 队列初始化
tx_queue_create(&Fd_Uart3_queue, "UART3_QUEUE", 1, uart_queue_buf, sizeof(uart_queue_buf));
// 二值信号量初始化
tx_semaphore_create(&uart_get_semaphore, "UART3_SEM", 0);
}
// 2. 串口3中断(生产数据)
void USART3_IRQHandler(void) {
BaseType_t xHigherPriorityTaskWoken = TX_FALSE;
uint8_t recv_byte = USART3->DR;
// 写入队列
tx_queue_send_from_isr(&Fd_Uart3_queue, &recv_byte, &xHigherPriorityTaskWoken);
// 释放信号量
tx_semaphore_put_from_isr(&uart_get_semaphore, &xHigherPriorityTaskWoken);
USART3->SR &= ~USART_SR_RXNE;
}
// 3. 串口数据处理线程(消费数据)
void uart_process_thread(ULONG input) {
uint8_t recv_data;
UINT status;
for(;;) {
// 获取信号量(阻塞等待)
status = tx_semaphore_get(&uart_get_semaphore, TX_WAIT_FOREVER);
if (status != TX_SUCCESS) continue;
// 读取队列数据
tx_queue_receive(&Fd_Uart3_queue, &recv_data, TX_NO_WAIT);
// 解析数据(业务逻辑)
parse_uart_data(recv_data);
}
}
3. 案例 2:CAN 多事件发送(队列 + 事件标志组 + 互斥信号量)
流程
- 数据解析任务生成 Self/Sta 数据 → 写入队列
- 设置对应事件位 → 唤醒 CAN 发送线程
- 线程获取事件 → 从队列读数据 → 用互斥信号量保护 CAN 发送
完整代码
// 1. 初始化
TX_QUEUE xy_data_queue;
TX_EVENT_FLAGS_GROUP xy_data_get_flags;
TX_SEMAPHORE can_mutex_sem;
uint8_t can_queue_buf[10*8];
void can_sync_init(void) {
// 队列初始化
tx_queue_create(&xy_data_queue, "CAN_QUEUE", 8, can_queue_buf, sizeof(can_queue_buf));
// 事件标志组初始化
tx_event_flags_create(&xy_data_get_flags, "CAN_EVENT_GROUP");
// 互斥信号量初始化
tx_semaphore_create(&can_mutex_sem, "CAN_MUTEX", 1);
}
// 2. 数据生产任务(生成Self数据)
void self_data_produce_thread(ULONG input) {
uint8_t self_data[8] = {0x11,0x22,0x33,0x44,0x55,0x66,0x77,0x88};
for(;;) {
// 写入队列
tx_queue_send(&xy_data_queue, self_data, TX_NO_WAIT);
// 设置Self事件
tx_event_flags_set(&xy_data_get_flags, SEND_Self_flag, TX_OR);
tx_thread_sleep(1000); // 1s生产一次
}
}
// 3. CAN发送线程(消费数据)
void can_send_thread(ULONG input) {
TCanFrame send_msg;
uint8_t read_data[8];
ULONG xy_send_events;
UINT status;
memset(&send_msg, 0, sizeof(TCanFrame));
for(;;) {
// 等待事件(任意Self/Sta)
status = tx_event_flags_get(&xy_data_get_flags, SEND_FLAG, TX_OR | TX_CLEAR, &xy_send_events, TX_WAIT_FOREVER);
if (status != TX_SUCCESS) continue;
// 读取队列数据
tx_queue_receive(&xy_data_queue, read_data, TX_NO_WAIT);
// 获取互斥信号量(保护CAN)
tx_semaphore_get(&can_mutex_sem, TX_WAIT_FOREVER);
// 填充CAN报文
send_msg.Dlc = 8;
send_msg.Ext = 1;
memcpy(send_msg.Data, read_data, 8);
if (xy_send_events & SEND_Self_flag) {
send_msg.Id = WRJ_MODULE << 24 | COMM_MODULE << 16 | WRJ_SELF_MSG << 8 | 0x00;
} else {
send_msg.Id = WRJ_MODULE << 24 | COMM_MODULE << 16 | WRJ_STATE_MSG << 8 | 0x00;
}
// 发送CAN
can_sendMessage(&send_msg);
// 释放互斥信号量
tx_semaphore_put(&can_mutex_sem);
}
}
4. 案例 3:多线程共用互斥锁保护 CAN(核心实战)
完整代码
// 1. 全局定义
TX_SEMAPHORE can_mutex_sem;
typedef struct {
uint32_t Id;
uint8_t Dlc;
uint8_t Ext;
uint8_t Data[8];
} TCanFrame;
// 2. CAN底层发送函数
UINT can_sendMessage(TCanFrame *msg) {
if (msg == NULL || msg->Dlc > 8) return TX_PTR_ERROR;
// 硬件底层发送逻辑(适配STM32/其他MCU)
return TX_SUCCESS;
}
// 3. 互斥锁初始化
void can_mutex_init(void) {
tx_semaphore_create(&can_mutex_sem, "CAN_MUTEX", 1);
}
// 4. 线程A:Self数据发送
void thread_A(ULONG input) {
TCanFrame msgA = {.Id=0x123, .Dlc=8, .Ext=1, .Data={0x01,0x02,0x03,0x04,0x05,0x06,0x07,0x08}};
for(;;) {
tx_semaphore_get(&can_mutex_sem, TX_WAIT_FOREVER);
can_sendMessage(&msgA);
printf("线程A发送CAN:ID=0x123\r\n");
tx_semaphore_put(&can_mutex_sem);
tx_thread_sleep(1000);
}
}
// 5. 线程B:故障数据发送
void thread_B(ULONG input) {
TCanFrame msgB = {.Id=0x456, .Dlc=8, .Ext=1, .Data={0xE0,0x01,0x00,0x00,0x00,0x00,0x00,0x00}};
for(;;) {
tx_semaphore_get(&can_mutex_sem, TX_WAIT_FOREVER);
can_sendMessage(&msgB);
printf("线程B发送CAN:ID=0x456\r\n");
tx_semaphore_put(&can_mutex_sem);
tx_thread_sleep(500);
}
}
// 6. 线程创建
void create_can_threads(void) {
uint8_t stackA[1024], stackB[1024];
TX_THREAD threadA, threadB;
tx_thread_create(&threadA, "THREAD_A", thread_A, 0, stackA, 1024, 10, 10, TX_NO_TIME_SLICE, TX_AUTO_START);
tx_thread_create(&threadB, "THREAD_B", thread_B, 0, stackB, 1024, 9, 9, TX_NO_TIME_SLICE, TX_AUTO_START);
}
七、避坑指南(必看)
- 中断中必须用 ISR 版本 API:队列 / 信号量 / 事件的
_from_isr后缀接口,否则系统崩溃 - 互斥信号量必须成对释放:分支(continue/break/return)前先释放,否则死锁
- 事件标志组必加 TX_CLEAR:避免事件位残留导致重复触发
- 队列操作检查返回值:处理
TX_QUEUE_FULL(队列满)、TX_PTR_ERROR(空指针) - 信号量获取加超时:避免永久阻塞(如
10000节拍 = 1s)
八、核心记忆口诀
- 队列存数据,信号量发通知,互斥锁保资源,事件组管多事
- 二值信号量:中断放,任务拿;互斥信号量:任务拿,任务还
- 一个事件用信号量,多个事件用事件组
- 队列 + 信号量 = 单事件数据缓冲,队列 + 事件组 = 多事件数据缓冲
文章声明
本文档为个人学习笔记,内容由 AI 辅助整理、总结与完善,仅供学习交流、技术参考使用。
任何直接用于生产环境的代码与逻辑,请结合实际硬件、需求与官方手册进行验证与调试。
更多推荐

所有评论(0)