Modbus RTU(CRC16)协议实践-水下超声波传感器上位机开发
本文将详细介绍一个水下超声波测距传感器上位机应用的开发过程。该项目使用Python和PySide6开发,通过Modbus RTU协议与水下超声波传感器通信,实时获取距离和温度数据。上位机支持主动模式和被动模式的切换,提供了直观的用户界面以显示传感器数据和连接状态
目录
1.项目介绍
本文将详细介绍一个水下超声波测距传感器上位机应用的开发过程。该项目使用Python和PySide6开发,通过Modbus RTU协议与水下超声波传感器通信,实时获取距离和温度数据。上位机支持主动模式和被动模式的切换,提供了直观的用户界面以显示传感器数据和连接状态



2.项目结构
├── backend.py # 传感器通信和数据处理逻辑
├── main.py # 程序入口和初始化
└── ui/
└── main.qml # 用户界面定义
3.Modbus RTU协议介绍
3.1 Modbus协议基础
Modbus是一种工业通信协议,广泛应用于工业自动化和设备通信领域。Modbus RTU是Modbus协议的一种变体,特点是:
• 使用串行通信(RS-232/RS-485)
• 采用二进制传输,传输效率高
• 使用CRC校验确保数据完整性
• 主从通信架构,主设备(上位机)向从设备(传感器)发送请求
3.2 Modbus RTU数据帧结构
一个标准的Modbus RTU数据帧包括:
|从机地址 | 功能码 | 数据 | CRC校验 |
|---------|--------|------|----------|
| 1字节 | 1字节 | N字节 | 2字节 |
• 从机地址: 标识目标设备,范围0-247
• 功能码: 定义操作类型,如03(读保持寄存器)、06(写单寄存器)
• 数据: 根据功能码不同而变化,包含寄存器地址、数据长度等
4.项目代码解读
4.1 backend.py - 传感器通信核心
(1)CRC校验计算函数
def calculate_crc(data):
"""计算ModbusCRC16校验码"""
crc = 0xFFFF
for pos in range(len(data)):
crc ^= data[pos]
for i in range(8):
if (crc & 0x0001) != 0:
crc = (crc >> 1) ^ 0xA001
else:
crc = crc >> 1
# 返回低字节和高字节
return crc & 0xFF, (crc >> 8) & 0xFF
这是标准的Modbus CRC16校验算法实现:
• 初始化CRC值为0xFFFF
• 对每个字节进行异或运算并右移位处理
• 返回低字节和高字节,在Modbus中低字节先发送
(2)SensorBackend类定义与初始化
class SensorBackend(QObject):
# 定义信号
modeChanged = Signal(bool) # True为主动模式,False为被动模式
dataUpdated = Signal(float, float) # 距离值和温度值
connectionChanged = Signal(bool) # 连接状态变化信号
def __init__(self):
super().__init__()
self.serial_port = None
self._is_active_mode = False
self.distance = 0.0
self.temperature = 0.0
self._is_connected = False
self.last_request_time = 0 # 添加上次请求时间记录
# 用于主动模式下的定时器
self.timer = QTimer()
self.timer.timeout.connect(self.request_data)
# 连接状态检测定时器
self.connection_check_timer = QTimer()
self.connection_check_timer.timeout.connect(self.check_connection)
self.connection_check_timer.start(5000) # 每5秒检查一次连接状态
关键点:
• 继承QObject,使类可以使用Qt的信号槽机制
• 定义三种信号用于向UI通知状态变化
• 两个定时器:一个用于主动模式下定期请求数据,一个用于监控连接状态
• 属性存储当前距离、温度、模式和连接状态
(3)属性访问器定义
@Property(bool)
def is_connected(self):
return self._is_connected
@is_connected.setter
def is_connected(self, value):
self._is_connected = value
@Property(bool)
def is_active_mode(self):
return self._is_active_mode
@is_active_mode.setter
def is_active_mode(self, value):
self._is_active_mode = value
使用Qt的Property装饰器,使这些属性可以在QML中直接访问:
• 定义getter和setter方法
• QML可以通过backend.is_connected直接读取这些属性
(4)连接状态检查
def check_connection(self):
"""检查与传感器的连接状态"""
if self.serial_port is None or not self.serial_port.is_open:
if self._is_connected:
self._is_connected = False
self.connectionChanged.emit(False)
return
# 如果已经在连接状态,不需要频繁检查
if self._is_connected and time.time() - self.last_request_time < 5:
return
try:
# 记录最后一次请求时间
self.last_request_time = time.time()
# 发送请求并检查响应
if self._is_active_mode or not self._is_connected:
# 清空缓冲区
self.serial_port.reset_input_buffer()
# 使用读取距离指令验证连接
command = bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A])
print(f"发送连接检查命令: {' '.join([f'{b:02X}' for b in command])}")
self.serial_port.write(command)
# 等待响应和检查逻辑...
这个方法用于检查传感器连接状态:
• 检查串口是否打开
• 使用优化策略避免频繁检查已连接设备
• 发送Modbus读取命令(0x03)验证连接
• 实现重试机制提高可靠性
(5)传感器连接实现
def connect_to_sensor(self, port="COM3", baudrate=115200):
"""连接到传感器,默认连接COM3,波特率115200"""
try:
# 如果已经有连接,先关闭
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
# 建立新连接
self.serial_port = serial.Serial(port, baudrate, timeout=1)
# 尝试发送一个简单的命令验证连接
try:
# 清空缓冲区
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
# 发送读取距离命令
command = bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A])
self.serial_port.write(command)
# 等待响应和验证逻辑...
这个方法负责建立与传感器的串口连接:
• 设置默认参数(COM3, 115200)
• 自动关闭现有连接以避免资源冲突
• 发送测试命令验证连接
• 分析响应数据确认通信正常
• 连接成功后还会进行温度补偿配置
(6)模式切换实现
@Slot()
def toggleMode(self):
"""切换传感器模式"""
# 切换模式状态
self._is_active_mode = not self._is_active_mode
# 如果已连接,则发送命令到传感器
if self.serial_port is not None and self.serial_port.is_open:
# 发送指令切换模式
if self._is_active_mode:
# 准备设置为主动模式的指令(寄存器地址0x5C, 值为0x02)
cmd_data = bytearray([0x01, 0x06, 0x00, 0x5c, 0x00, 0x02])
# 计算CRC
low_byte, high_byte = calculate_crc(cmd_data)
# 添加CRC到指令中
command = cmd_data + bytearray([low_byte, high_byte])
print(f"切换到主动模式,发送命令: {' '.join([f'{b:02X}' for b in command])}")
# 清空接收缓冲区,准备接收主动发送的数据
self.serial_port.reset_input_buffer()
self.timer.start(100) # 100ms检查一次主动发送的数据
else:
# 准备设置为被动模式的指令
cmd_data = bytearray([0x01, 0x06, 0x00, 0x5c, 0x00, 0x00])
# 计算CRC和发送命令...
self.timer.stop()
self.serial_port.write(command)
time.sleep(0.1) # 等待传感器响应
else:
# 未连接情况处理...
# 发送信号通知界面更新
self.modeChanged.emit(self._is_active_mode)
这个方法实现传感器模式切换:
• 使用@Slot()装饰器,可从QML直接调用
• 使用Modbus功能码06(写单寄存器)修改寄存器0x5C
• 主动模式(0x02)下启动定时器定期检查数据
• 被动模式(0x00)下停止定时器
• 发送信号通知UI更新模式状态
(7)数据请求与解析
@Slot()
def request_data(self):
"""请求传感器数据或读取主动发送的数据"""
if self.serial_port is None or not self.serial_port.is_open:
return
# 记录是否成功获取任一数据
data_updated = False
# 主动模式下读取传感器自动发送的数据
if self._is_active_mode:
if self.serial_port.in_waiting > 0:
data_updated = self.read_active_mode_data()
# 被动模式下发送请求数据指令
else:
# 首先读取距离值 - 寄存器地址0000H
try:
# 清空接收缓冲区,避免读取到旧数据
self.serial_port.reset_input_buffer()
distance_cmd = bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A])
print(f"发送读取距离命令: {' '.join([f'{b:02X}' for b in distance_cmd])}")
self.serial_port.write(distance_cmd)
# 等待距离数据返回和处理逻辑...
except Exception as e:
print(f"请求距离数据错误: {e}")
# 等待一小段时间确保命令处理完成
time.sleep(0.1)
# 然后读取温度值 - 寄存器地址0001H
try:
# 发送读取温度命令和处理逻辑...
except Exception as e:
print(f"请求温度数据错误: {e}")
# 如果成功获取了数据,发送界面更新信号
if data_updated:
self.dataUpdated.emit(self.distance, self.temperature)
这个方法负责获取传感器数据,根据模式采用不同策略:
• 主动模式:直接读取缓冲区中传感器主动发送的数据
• 被动模式:主动发送请求命令,分别读取距离和温度
• 使用异常处理提高代码健壮性
• 状态更新后发送信号触发UI更新
(8)距离数据解析
def read_distance_data(self):
"""读取并解析距离数据,返回是否成功解析"""
try:
# 读取数据
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"接收到距离数据: {' '.join([f'{b:02X}' for b in data])}")
# 在原始数据中查找完整的数据帧: [01][03][02][距离高字节][距离低字节][CRC低][CRC高]
valid_data = None
for i in range(len(data) - 6):
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x02:
if i + 6 <= len(data):
frame_data = data[i:i+7]
valid_data = frame_data
break
if valid_data:
print(f"找到有效距离数据帧: {' '.join([f'{b:02X}' for b in valid_data])}")
# 解析距离数据 (第4-5字节)
distance = (valid_data[3] << 8) | valid_data[4]
print(f"解析距离: {distance}mm")
# 更新距离值并更新连接状态
self.distance = distance
if not self._is_connected:
self._is_connected = True
self.connectionChanged.emit(True)
return True
else:
print("未找到有效的距离数据帧")
return False
except Exception as e:
print(f"读取距离数据错误: {e}")
return False
这个方法解析Modbus响应中的距离数据:
• 读取所有可用数据
• 使用滑动窗口算法在数据流中查找有效帧
• 识别Modbus标准帧结构(地址+功能码+数据长度)
• 解析距离值(高字节<<8 | 低字节)
• 成功解析后更新连接状态和距离值
(9)主动模式数据解析
def read_active_mode_data(self):
"""读取主动模式下传感器自动发送的数据"""
try:
# 读取所有可用数据
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"主动模式接收数据: {' '.join([f'{b:02X}' for b in data])}")
# 检查是否找到有效数据
data_found = False
# 尝试查找6字节完整数据帧(包含距离、温度和误差补偿)
i = 0
while i < len(data) - 10: # 至少需要1+1+1+6+2=11字节
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x06:
if i + 10 <= len(data): # 确保有足够的数据
frame_data = data[i:i+11]
print(f"找到主动模式完整数据帧: {' '.join([f'{b:02X}' for b in frame_data])}")
# 解析距离和温度值...
distance = (frame_data[3] << 8) | frame_data[4]
temp_raw = (frame_data[7] << 8) | frame_data[8]
temperature = temp_raw / 100.0
# 更新数据和状态...
self.distance = distance
self.temperature = temperature
data_found = True
i += 11
else:
i += 1
else:
i += 1
# 如果未找到6字节数据帧,尝试查找2字节简单数据帧
if not data_found:
# 查找简单数据帧的逻辑...
这个方法处理主动模式下传感器发送的数据:
• 读取所有可用数据
• 优先尝试解析6字节完整数据帧(包含多个寄存器值)
• 如失败则尝试解析2字节简单数据帧
• 根据数据内容或帧结构判断数据类型(距离/温度)
• 处理可能同时接收到的多个数据帧
(10)温度补偿配置
def configure_temperature_compensation(self):
"""配置传感器温度补偿,确保正确读取温度数据"""
try:
if not self.serial_port or not self.serial_port.is_open:
return
print("正在检查温度补偿设置...")
# 清空缓冲区
self.serial_port.reset_input_buffer()
# 1. 首先读取当前温度补偿设置
cmd_data = bytearray([0x01, 0x03, 0x00, 0x04, 0x00, 0x01]) # 读取寄存器0x0004(误差补偿)
low_byte, high_byte = calculate_crc(cmd_data)
command = cmd_data + bytearray([low_byte, high_byte])
print(f"读取温度补偿设置命令: {' '.join([f'{b:02X}' for b in command])}")
self.serial_port.write(command)
# 读取当前设置和验证温度读取...
这个方法负责配置传感器的温度补偿功能:
• 读取当前温度补偿寄存器(0x0004)的值
• 检查当前补偿状态是否正常
• 通过读取温度值测试补偿配置是否生效
• 检测到异常值(如温度为0)时发出警告
4.2 main.py - 程序入口
import sys
import os
from PySide6.QtGui import QGuiApplication
from PySide6.QtQml import QQmlApplicationEngine
from PySide6.QtCore import QObject, QUrl
import serial.tools.list_ports
from backend import SensorBackend
if __name__ == "__main__":
app = QGuiApplication(sys.argv)
# 创建后端对象
backend = SensorBackend()
# 查找可用串口
available_ports = [port.device for port in serial.tools.list_ports.comports()]
port_to_use = "COM3" # 默认端口
# 如果COM3不可用但有其他端口,使用第一个可用端口
if port_to_use not in available_ports and available_ports:
port_to_use = available_ports[0]
print(f"可用串口: {available_ports}")
print(f"尝试连接到: {port_to_use}")
# 连接到传感器
connection_result = backend.connect_to_sensor(port=port_to_use, baudrate=115200)
if not connection_result:
print(f"警告:无法连接到传感器,请检查{port_to_use}端口是否可用")
# 显式初始化为未连接状态
backend.initialize_connection(False, False)
else:
print(f"成功连接到传感器({port_to_use}),启动被动上传模式。请点击「读取数据」按钮获取当前数据。")
# 使用新方法初始化为被动模式
backend.initialize_connection(True, False)
# 连接成功后立即请求一次数据,以更新显示
backend.request_data()
# 创建QML引擎
engine = QQmlApplicationEngine()
# 注册后端对象到QML
engine.rootContext().setContextProperty("backend", backend)
# 加载QML文件
qml_file = os.path.join(os.path.dirname(__file__), "ui/main.qml")
engine.load(QUrl.fromLocalFile(qml_file))
if not engine.rootObjects():
sys.exit(-1)
# 应用退出时关闭连接
app.aboutToQuit.connect(backend.close)
sys.exit(app.exec())
主程序入口文件负责:
初始化Qt应用程序;创建SensorBackend实例;自动检测并选择串口;尝试连接传感器;根据连接结果初始化状态 ;将后端对象注册到QML环境 ;加载用户界面;设置应用退出时自动关闭连接
这个文件体现了良好的应用程序初始化流程:
1.环境准备和资源初始化
2. 自动化设备连接
3. 前后端对象绑定
4.资源清理注册
4.3 ui/main.qml - 用户界面
(1)主窗口和属性定义
ApplicationWindow {
visible: true
width: 600
height: 400
title: "水下超声波测距传感器控制界面"
property bool isActiveMode: false
property real distanceValue: 0.0
property real temperatureValue: 0.0
property bool isConnected: false
// 界面内容...
}
界面基础结构:
• 设置窗口尺寸和标题
• 定义4个关键属性存储应用状态
• 这些属性将通过信号槽与后端数据同步
(2)界面布局结构
Rectangle {
anchors.fill: parent
color: "#f0f0f0"
ColumnLayout {
anchors.fill: parent
anchors.margins: 20
spacing: 20
// 标题
Text {
Layout.alignment: Qt.AlignHCenter
text: "水下超声波测距传感器控制界面"
font.pixelSize: 24
font.bold: true
}
// 连接状态
Rectangle {
Layout.fillWidth: true
height: 40
color: isConnected ? "#e0ffe0" : "#ffe0e0"
radius: 10
Text {
anchors.centerIn: parent
text: isConnected ? "已连接到传感器 (COM3, 115200波特率)" : "未连接到传感器,请检查COM3端口"
font.pixelSize: 14
color: isConnected ? "green" : "red"
}
}
// 模式控制区域...
// 数据显示区域...
}
}
布局设计:
• 使用ColumnLayout垂直排列各组件
• 使用Layout属性控制组件大小和对齐
• 通过颜色变化直观反映连接状态
(3)模式控制区域
// 模式控制
Rectangle {
Layout.fillWidth: true
height: 80
color: "#e0e0e0"
radius: 10
RowLayout {
anchors.fill: parent
anchors.margins: 10
spacing: 20
Text {
text: "当前模式:"
font.pixelSize: 18
}
Text {
text: isActiveMode ? "主动上传模式" : "被动上传模式"
font.pixelSize: 18
color: isActiveMode ? "green" : "blue"
font.bold: true
}
Button {
text: "切换模式"
Layout.alignment: Qt.AlignRight
enabled: true
onClicked: backend.toggleMode()
}
// 添加读取数据按钮,只在被动模式下显示
Button {
text: "读取数据"
visible: !isActiveMode
enabled: isConnected
Layout.alignment: Qt.AlignRight
onClicked: backend.request_data()
}
}
}
模式控制区特点:
• 水平排列(RowLayout)显示模式信息和控制按钮
• 使用不同颜色区分模式状态
• 切换模式按钮直接调用后端toggleMode方法
• 读取数据按钮根据当前模式动态显示/隐藏
(4)数据显示区域
// 数据显示
Rectangle {
Layout.fillWidth: true
Layout.fillHeight: true
color: "#e0e0e0"
radius: 10
ColumnLayout {
anchors.fill: parent
anchors.margins: 20
spacing: 10
// 添加提示文本,只在被动模式下显示
Rectangle {
Layout.fillWidth: true
height: 30
color: "#e8f4ff"
radius: 5
visible: !isActiveMode && isConnected
Text {
anchors.centerIn: parent
text: "被动模式下,请点击「读取数据」按钮获取最新数据"
font.pixelSize: 14
color: "blue"
}
}
GridLayout {
Layout.fillWidth: true
Layout.fillHeight: true
columns: 2
rows: 2
// 距离显示...
// 温度显示...
}
}
}
数据显示设计:
• 使用网格布局(GridLayout)显示数据项
• 上下文感知提示在被动模式下提供操作指导
• 数据值实时绑定到后端提供的属性
(5)后端信号连接
// 连接后端信号
Connections {
target: backend
function onModeChanged(activeMode) {
isActiveMode = activeMode;
}
function onDataUpdated(distance, temperature) {
distanceValue = distance;
temperatureValue = temperature;
isConnected = true; // 如果收到数据,表示已连接
}
function onConnectionChanged(connected) {
isConnected = connected;
console.log("连接状态已更新: " + (connected ? "已连接" : "未连接"));
}
}
信号连接机制:
• 使用Connections元素连接后端信号
• 将Python后端发送的信号映射到QML属性
• 信号处理函数中更新对应的界面状态
(6)界面初始化逻辑
// 初始化
Component.onCompleted: {
// 立即查询后端的连接状态
isConnected = backend.is_connected;
isActiveMode = backend.is_active_mode;
console.log("界面初始化 - 连接状态:", isConnected);
console.log("界面初始化 - 模式状态:", isActiveMode ? "主动模式" : "被动模式");
// 立即请求一次数据更新,确保UI显示正确
if (isConnected && !isActiveMode) {
backend.request_data();
}
}
Timer {
id: connectionTimer
interval: 500 // 减少延迟时间,使界面更快响应
repeat: false
onTriggered: {
if (!isConnected) {
// 再次检查连接状态
isConnected = backend.is_connected;
console.log("连接状态定时器 - 更新连接状态:", isConnected);
if (isConnected && !isActiveMode) {
backend.request_data();
}
}
}
}
界面初始化和状态监控:
• Component.onCompleted钩子在界面加载完成后执行初始化
• 直接从后端读取初始状态避免状态不同步
• 使用短延时定时器在初始化后再次检查连接状态
• 确保界面状态与后端状态一致
5.项目扩展与延申
5.1 CRC校验
Modbus RTU协议中的CRC校验(Cyclic Redundancy Check)是一种关键的错误检测机制,用于确保数据在传输过程中的完整性
5.1.1 CRC校验的作用
• 错误检测:通过计算数据帧的CRC值,接收方可以验证数据是否在传输过程中被篡改或损坏。
• 协议要求:Modbus RTU协议强制要求所有数据帧必须包含CRC校验码,否则从机将返回错误码04H(CRC校验错误)。
5.1.2 CRC校验算法参数
M33传感器采用标准的Modbus RTU CRC-16算法,具体参数如下:
• 多项式(Polynomial):0x8005(二进制:1000000000000101,反转后为0xA001)。
• 初始值(Initial Value):0xFFFF。
• 输入反转(Input Reflected):是(每个字节按位反转,如0x01→0x80)。
• 输出反转(Output Reflected):是(最终CRC值的所有位反转)。
• 结果字节顺序:低字节在前(例如,计算出的CRC值为0x1234,实际传输顺序为0x34 0x12)。
5.1.3 CRC计算步骤
以示例指令 01 03 00 00 00 01 的CRC计算为例:
1. 初始化CRC寄存器:0xFFFF。
2. 逐字节处理数据:
• 依次处理每个字节(0x01, 0x03, 0x00, 0x00, 0x00, 0x01)。
• 对每个字节进行输入反转后,与CRC寄存器异或。
• 按位右移并异或多项式,直到处理完所有位。
3. 输出反转:将最终的CRC寄存器值按位反转。
4. 交换字节顺序:将结果的高字节和低字节交换,得到最终的CRC码。
示例计算:
• 数据帧:01 03 00 00 00 01
• 计算得到的CRC值为 0x0A84,按低字节在前传输为 0x84 0A。
5.1.4 CRC校验的验证方法
• 发送方:在数据帧末尾附加计算出的CRC码(如84 0A)。
• 接收方:
1. 提取接收到的数据(不包括CRC字段)。
2. 重新计算CRC值,并与接收到的CRC码对比。
3. 若一致,则认为数据完整;否则返回错误码04H。
6.项目完整代码
6.1 main.py
import sys
import os
from PySide6.QtGui import QGuiApplication
from PySide6.QtQml import QQmlApplicationEngine
from PySide6.QtCore import QObject, QUrl
import serial.tools.list_ports
from backend import SensorBackend
if __name__ == "__main__":
app = QGuiApplication(sys.argv)
# 创建后端对象
backend = SensorBackend()
# 查找可用串口
available_ports = [port.device for port in serial.tools.list_ports.comports()]
port_to_use = "COM3" # 默认端口
# 如果COM3不可用但有其他端口,使用第一个可用端口
if port_to_use not in available_ports and available_ports:
port_to_use = available_ports[0]
print(f"可用串口: {available_ports}")
print(f"尝试连接到: {port_to_use}")
# 连接到传感器
connection_result = backend.connect_to_sensor(port=port_to_use, baudrate=115200)
if not connection_result:
print(f"警告:无法连接到传感器,请检查{port_to_use}端口是否可用")
# 显式初始化为未连接状态
backend.initialize_connection(False, False)
else:
print(f"成功连接到传感器({port_to_use}),启动被动上传模式。请点击「读取数据」按钮获取当前数据。")
# 使用新方法初始化为被动模式
backend.initialize_connection(True, False)
# 连接成功后立即请求一次数据,以更新显示
backend.request_data()
# 创建QML引擎
engine = QQmlApplicationEngine()
# 注册后端对象到QML
engine.rootContext().setContextProperty("backend", backend)
# 加载QML文件
qml_file = os.path.join(os.path.dirname(__file__), "ui/main.qml")
engine.load(QUrl.fromLocalFile(qml_file))
if not engine.rootObjects():
sys.exit(-1)
# 应用退出时关闭连接
app.aboutToQuit.connect(backend.close)
sys.exit(app.exec())
6.2 backend.py
from PySide6.QtCore import QObject, Signal, Slot, QTimer, Property
import serial
import serial.tools.list_ports
import time
def calculate_crc(data):
"""计算ModbusCRC16校验码"""
crc = 0xFFFF
for pos in range(len(data)):
crc ^= data[pos]
for i in range(8):
if (crc & 0x0001) != 0:
crc = (crc >> 1) ^ 0xA001
else:
crc = crc >> 1
# 返回低字节和高字节
return crc & 0xFF, (crc >> 8) & 0xFF
class SensorBackend(QObject):
# 定义信号
modeChanged = Signal(bool) # True为主动模式,False为被动模式
dataUpdated = Signal(float, float) # 距离值和温度值
connectionChanged = Signal(bool) # 连接状态变化信号
def __init__(self):
super().__init__()
self.serial_port = None
self._is_active_mode = False
self.distance = 0.0
self.temperature = 0.0
self._is_connected = False
self.last_request_time = 0 # 添加上次请求时间记录
# 用于主动模式下的定时器
self.timer = QTimer()
self.timer.timeout.connect(self.request_data)
# 连接状态检测定时器
self.connection_check_timer = QTimer()
self.connection_check_timer.timeout.connect(self.check_connection)
self.connection_check_timer.start(5000) # 每5秒检查一次连接状态
# 添加属性访问器,让QML可以直接读取这些属性
@Property(bool)
def is_connected(self):
return self._is_connected
@is_connected.setter
def is_connected(self, value):
self._is_connected = value
@Property(bool)
def is_active_mode(self):
return self._is_active_mode
@is_active_mode.setter
def is_active_mode(self, value):
self._is_active_mode = value
def check_connection(self):
"""检查与传感器的连接状态"""
if self.serial_port is None or not self.serial_port.is_open:
if self._is_connected:
self._is_connected = False
self.connectionChanged.emit(False)
return
# 如果已经在连接状态,不需要频繁检查
if self._is_connected and time.time() - self.last_request_time < 5:
return
try:
# 记录最后一次请求时间
self.last_request_time = time.time()
# 发送请求并检查响应
if self._is_active_mode or not self._is_connected:
# 清空缓冲区
self.serial_port.reset_input_buffer()
# 使用读取距离指令验证连接
command = bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A])
print(f"发送连接检查命令: {' '.join([f'{b:02X}' for b in command])}")
self.serial_port.write(command)
# 等待响应
time.sleep(0.2) # 增加等待时间确保接收完整数据
# 检查是否有响应
response_received = False
retry_count = 0
max_retries = 2
while not response_received and retry_count < max_retries:
if self.serial_port.in_waiting > 0:
# 读取数据
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"连接检查响应: {' '.join([f'{b:02X}' for b in data])}")
# 检查是否包含有效的Modbus响应
valid_frame = False
for i in range(len(data) - 4):
if data[i] == 0x01 and data[i+1] == 0x03: # Modbus从机地址和功能码
valid_frame = True
break
if valid_frame:
# 有效响应,确认连接正常
response_received = True
if not self._is_connected:
self._is_connected = True
self.connectionChanged.emit(True)
print("连接检查成功: 传感器响应正常")
else:
# 无效响应,等待更多数据
retry_count += 1
time.sleep(0.1)
else:
# 无响应,重试
retry_count += 1
time.sleep(0.1)
# 最终检查连接状态
if not response_received:
# 多次尝试后仍无响应,认为连接已断开
if self._is_connected:
self._is_connected = False
self.connectionChanged.emit(False)
print("连接检查失败: 无有效响应")
except Exception as e:
print(f"连接检查失败: {e}")
# 只有在确认连接错误时才更改状态,避免误报
if self._is_connected and str(e).lower() not in ["timeout", "disconnected"]:
self._is_connected = False
self.connectionChanged.emit(False)
def connect_to_sensor(self, port="COM3", baudrate=115200):
"""连接到传感器,默认连接COM3,波特率115200"""
try:
# 如果已经有连接,先关闭
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
# 建立新连接
self.serial_port = serial.Serial(port, baudrate, timeout=1)
# 尝试发送一个简单的命令验证连接
try:
# 清空缓冲区
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
# 发送读取距离命令
command = bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A])
self.serial_port.write(command)
# 等待响应
time.sleep(0.2)
# 检查是否有响应
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"连接测试响应: {' '.join([f'{b:02X}' for b in data])}")
# 检查是否是有效的Modbus响应
valid_response = False
for i in range(len(data) - 4):
if data[i] == 0x01 and data[i+1] == 0x03:
valid_response = True
break
if valid_response:
print(f"成功连接到传感器: {port}, 波特率: {baudrate}")
self._is_connected = True
self.connectionChanged.emit(True)
# 连接成功后,配置温度补偿设置
self.configure_temperature_compensation()
return True
else:
print(f"连接到端口 {port} 成功,但传感器返回无效响应")
self.serial_port.close()
self._is_connected = False
self.connectionChanged.emit(False)
return False
else:
# 没有数据返回,认为连接失败
print(f"连接到端口 {port} 成功,但传感器没有响应")
self.serial_port.close()
self._is_connected = False
self.connectionChanged.emit(False)
return False
except Exception as e:
print(f"测试传感器连接失败: {e}")
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self._is_connected = False
self.connectionChanged.emit(False)
return False
except Exception as e:
print(f"连接传感器失败: {e}")
self._is_connected = False
self.connectionChanged.emit(False)
return False
@Slot()
def toggleMode(self):
"""切换传感器模式"""
# 切换模式状态
self._is_active_mode = not self._is_active_mode
# 如果已连接,则发送命令到传感器
if self.serial_port is not None and self.serial_port.is_open:
# 发送指令切换模式 (需要根据传感器说明书调整)
if self._is_active_mode:
# 准备设置为主动模式的指令(寄存器地址0x5C, 值为0x02)
cmd_data = bytearray([0x01, 0x06, 0x00, 0x5c, 0x00, 0x02]) # 示例Modbus指令,需调整
# 计算CRC
low_byte, high_byte = calculate_crc(cmd_data)
# 添加CRC到指令中
command = cmd_data + bytearray([low_byte, high_byte])
print(f"切换到主动模式,发送命令: {' '.join([f'{b:02X}' for b in command])}")
# 清空接收缓冲区,准备接收主动发送的数据
self.serial_port.reset_input_buffer()
self.timer.start(100) # 100ms检查一次主动发送的数据
else:
# 准备设置为被动模式的指令
cmd_data = bytearray([0x01, 0x06, 0x00, 0x5c, 0x00, 0x00]) # 示例Modbus指令,需调整
# 计算CRC
low_byte, high_byte = calculate_crc(cmd_data)
# 添加CRC到指令中
command = cmd_data + bytearray([low_byte, high_byte])
print(f"切换到被动模式,发送命令: {' '.join([f'{b:02X}' for b in command])}")
self.timer.stop()
self.serial_port.write(command)
time.sleep(0.1) # 等待传感器响应
else:
print(f"传感器未连接,仅切换界面模式为: {'主动' if self._is_active_mode else '被动'}上传模式")
if self._is_active_mode:
self.timer.start(100)
else:
self.timer.stop()
# 发送信号通知界面更新
self.modeChanged.emit(self._is_active_mode)
@Slot()
def request_data(self):
"""请求传感器数据或读取主动发送的数据"""
if self.serial_port is None or not self.serial_port.is_open:
return
# 记录是否成功获取任一数据
data_updated = False
# 主动模式下读取传感器自动发送的数据
if self._is_active_mode:
if self.serial_port.in_waiting > 0:
data_updated = self.read_active_mode_data()
# 被动模式下发送请求数据指令
else:
# 首先读取距离值 - 寄存器地址0000H
try:
# 清空接收缓冲区,避免读取到旧数据
self.serial_port.reset_input_buffer()
distance_cmd = bytearray([0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A])
print(f"发送读取距离命令: {' '.join([f'{b:02X}' for b in distance_cmd])}")
self.serial_port.write(distance_cmd)
# 等待距离数据返回
time.sleep(0.2) # 增加等待时间确保完整接收
if self.serial_port.in_waiting:
if self.read_distance_data():
data_updated = True
except Exception as e:
print(f"请求距离数据错误: {e}")
# 等待一小段时间确保命令处理完成
time.sleep(0.1)
# 然后读取温度值 - 寄存器地址0001H
try:
# 清空接收缓冲区
self.serial_port.reset_input_buffer()
temperature_cmd = bytearray([0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, 0xCA])
print(f"发送读取温度命令: {' '.join([f'{b:02X}' for b in temperature_cmd])}")
self.serial_port.write(temperature_cmd)
# 等待温度数据返回
time.sleep(0.2) # 增加等待时间确保完整接收
if self.serial_port.in_waiting:
if self.read_temperature_data():
data_updated = True
except Exception as e:
print(f"请求温度数据错误: {e}")
# 如果成功获取了数据,发送界面更新信号
if data_updated:
self.dataUpdated.emit(self.distance, self.temperature)
def read_distance_data(self):
"""读取并解析距离数据,返回是否成功解析"""
try:
# 读取数据
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"接收到距离数据: {' '.join([f'{b:02X}' for b in data])}")
# 在原始数据中查找完整的数据帧: [01][03][02][距离高字节][距离低字节][CRC低][CRC高]
valid_data = None
for i in range(len(data) - 6):
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x02:
if i + 6 <= len(data):
frame_data = data[i:i+7]
valid_data = frame_data
break
if valid_data:
print(f"找到有效距离数据帧: {' '.join([f'{b:02X}' for b in valid_data])}")
# 解析距离数据 (第4-5字节)
distance = (valid_data[3] << 8) | valid_data[4]
print(f"解析距离: {distance}mm")
# 更新距离值
self.distance = distance
# 更新连接状态
if not self._is_connected:
self._is_connected = True
self.connectionChanged.emit(True)
return True
else:
print("未找到有效的距离数据帧")
return False
except Exception as e:
print(f"读取距离数据错误: {e}")
return False
def read_temperature_data(self):
"""读取并解析温度数据,返回是否成功解析"""
try:
# 读取数据
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"接收到温度数据: {' '.join([f'{b:02X}' for b in data])}")
# 在原始数据中查找完整的数据帧: [01][03][02][温度高字节][温度低字节][CRC低][CRC高]
valid_data = None
for i in range(len(data) - 6):
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x02:
if i + 6 <= len(data):
frame_data = data[i:i+7]
valid_data = frame_data
break
if valid_data:
print(f"找到有效温度数据帧: {' '.join([f'{b:02X}' for b in valid_data])}")
# 解析温度数据 (第4-5字节)
temp_raw = (valid_data[3] << 8) | valid_data[4]
temperature = temp_raw / 100.0 # 根据协议,温度需要除以100获得正确单位
print(f"解析温度: {temperature}°C")
# 更新温度值
self.temperature = temperature
# 更新连接状态
if not self._is_connected:
self._is_connected = True
self.connectionChanged.emit(True)
return True
else:
print("未找到有效的温度数据帧")
return False
except Exception as e:
print(f"读取温度数据错误: {e}")
return False
def read_active_mode_data(self):
"""读取主动模式下传感器自动发送的数据"""
try:
# 读取所有可用数据
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"主动模式接收数据: {' '.join([f'{b:02X}' for b in data])}")
# 在数据中查找所有可能的完整帧
# 6字节数据帧: [01][03][06][距离高][距离低][寄存器2高][寄存器2低][温度高][温度低][CRC低][CRC高]
# 检查是否找到有效数据
data_found = False
# 尝试查找6字节完整数据帧(包含距离、温度和误差补偿)
i = 0
while i < len(data) - 10: # 至少需要1+1+1+6+2=11字节
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x06:
if i + 10 <= len(data): # 确保有足够的数据
frame_data = data[i:i+11]
print(f"找到主动模式完整数据帧: {' '.join([f'{b:02X}' for b in frame_data])}")
# 解析距离值 (第4-5字节)
distance = (frame_data[3] << 8) | frame_data[4]
print(f"解析距离: {distance}mm")
# 根据观察发现,主动模式下温度值在寄存器3位置(第8-9字节)
temp_raw = (frame_data[7] << 8) | frame_data[8]
temperature = temp_raw / 100.0 # 温度值除以100
print(f"解析温度(从寄存器3): {temperature}°C")
# 输出寄存器值以供调试
print(f"寄存器1(距离): {frame_data[3]:02X} {frame_data[4]:02X} = {distance}mm")
print(f"寄存器2: {frame_data[5]:02X} {frame_data[6]:02X}")
print(f"寄存器3(温度): {frame_data[7]:02X} {frame_data[8]:02X} = {temperature}°C")
# 更新距离和温度值
self.distance = distance
self.temperature = temperature
data_found = True
# 继续查找下一个帧
i += 11
else:
i += 1
else:
i += 1
# 如果未找到6字节数据帧,尝试查找2字节简单数据帧
if not data_found:
i = 0
while i < len(data) - 6: # 至少需要1+1+1+2+2=7字节
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x02:
if i + 6 <= len(data): # 确保有足够的数据
frame_data = data[i:i+7]
print(f"找到主动模式简单数据帧: {' '.join([f'{b:02X}' for b in frame_data])}")
# 解析数据值(根据内容判断类型)
value = (frame_data[3] << 8) | frame_data[4]
# 获取命令帧的寄存器地址以判断数据类型
reg_addr = None
for j in range(max(0, i-8), i):
if j+3 < len(data) and data[j] == 0x01 and data[j+1] == 0x03:
reg_addr = (data[j+2] << 8) | data[j+3]
break
if reg_addr == 0x0000: # 距离寄存器
print(f"解析距离: {value}mm")
self.distance = value
elif reg_addr == 0x0002: # 温度寄存器
temperature = value / 100.0
print(f"解析温度: {temperature}°C")
self.temperature = temperature
else:
# 根据数值范围判断是距离还是温度
if value > 100 and value < 10000: # 距离通常大于100mm且小于10000mm
print(f"解析距离: {value}mm")
self.distance = value
elif value < 5000: # 温度*100通常小于5000 (小于50℃)
temperature = value / 100.0
print(f"解析温度: {temperature}°C")
self.temperature = temperature
else:
print(f"未知数据值: {value}")
data_found = True
i += 7
else:
i += 1
else:
i += 1
# 更新连接状态
if data_found and not self._is_connected:
self._is_connected = True
self.connectionChanged.emit(True)
return data_found
except Exception as e:
print(f"读取主动模式数据错误: {e}")
return False
def read_sensor_data(self):
"""保留此方法用于向后兼容,实际不再使用"""
pass
def close(self):
"""关闭连接"""
if self.serial_port and self.serial_port.is_open:
self.timer.stop()
self.connection_check_timer.stop()
self.serial_port.close()
self._is_connected = False
self.connectionChanged.emit(False)
print("传感器连接已关闭")
@Slot()
def initialize_connection(self, is_connected=True, is_active=False):
"""初始化连接和模式状态,并通知UI更新"""
self._is_connected = is_connected
self._is_active_mode = is_active
# 发送信号通知界面
self.connectionChanged.emit(is_connected)
self.modeChanged.emit(is_active)
if is_connected:
if is_active:
self.timer.start(100)
else:
self.timer.stop()
else:
self.timer.stop()
return is_connected
def configure_temperature_compensation(self):
"""配置传感器温度补偿,确保正确读取温度数据"""
try:
if not self.serial_port or not self.serial_port.is_open:
return
print("正在检查温度补偿设置...")
# 清空缓冲区
self.serial_port.reset_input_buffer()
# 1. 首先读取当前温度补偿设置
cmd_data = bytearray([0x01, 0x03, 0x00, 0x04, 0x00, 0x01]) # 读取寄存器0x0004(误差补偿)
low_byte, high_byte = calculate_crc(cmd_data)
command = cmd_data + bytearray([low_byte, high_byte])
print(f"读取温度补偿设置命令: {' '.join([f'{b:02X}' for b in command])}")
self.serial_port.write(command)
# 等待响应
time.sleep(0.2)
# 读取当前补偿设置
current_compensation = None
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"温度补偿设置响应: {' '.join([f'{b:02X}' for b in data])}")
# 解析响应
for i in range(len(data) - 6):
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x02:
if i + 6 <= len(data):
current_compensation = (data[i+3] << 8) | data[i+4]
print(f"当前温度补偿设置: {current_compensation}")
break
# 2. 如果需要,启用温度补偿
# 在这里我们不改变用户的设置,只是确认读取正确
# 3. 读取当前温度值以验证
self.serial_port.reset_input_buffer()
temperature_cmd = bytearray([0x01, 0x03, 0x00, 0x02, 0x00, 0x01, 0x25, 0xCA])
print(f"测试读取温度命令: {' '.join([f'{b:02X}' for b in temperature_cmd])}")
self.serial_port.write(temperature_cmd)
# 等待响应
time.sleep(0.2)
# 解析温度响应
if self.serial_port.in_waiting > 0:
data = self.serial_port.read(self.serial_port.in_waiting)
print(f"温度读取测试响应: {' '.join([f'{b:02X}' for b in data])}")
for i in range(len(data) - 6):
if data[i] == 0x01 and data[i+1] == 0x03 and data[i+2] == 0x02:
if i + 6 <= len(data):
temp_raw = (data[i+3] << 8) | data[i+4]
temperature = temp_raw / 100.0
print(f"当前温度读数: {temperature}°C")
if temperature == 0 and temp_raw == 0:
print("警告: 温度读数为0,可能存在配置问题")
else:
print("温度读取正常")
break
else:
print("温度读取测试无响应")
except Exception as e:
print(f"配置温度补偿错误: {e}")
6.3 main.qml
import QtQuick 2.15
import QtQuick.Controls 2.15
import QtQuick.Layouts 1.15
ApplicationWindow {
visible: true
width: 600
height: 400
title: "水下超声波测距传感器控制界面"
property bool isActiveMode: false
property real distanceValue: 0.0
property real temperatureValue: 0.0
property bool isConnected: false
Rectangle {
anchors.fill: parent
color: "#f0f0f0"
ColumnLayout {
anchors.fill: parent
anchors.margins: 20
spacing: 20
// 标题
Text {
Layout.alignment: Qt.AlignHCenter
text: "水下超声波测距传感器控制界面"
font.pixelSize: 24
font.bold: true
}
// 连接状态
Rectangle {
Layout.fillWidth: true
height: 40
color: isConnected ? "#e0ffe0" : "#ffe0e0"
radius: 10
Text {
anchors.centerIn: parent
text: isConnected ? "已连接到传感器 (COM3, 115200波特率)" : "未连接到传感器,请检查COM3端口"
font.pixelSize: 14
color: isConnected ? "green" : "red"
}
}
// 模式控制
Rectangle {
Layout.fillWidth: true
height: 80
color: "#e0e0e0"
radius: 10
RowLayout {
anchors.fill: parent
anchors.margins: 10
spacing: 20
Text {
text: "当前模式:"
font.pixelSize: 18
}
Text {
text: isActiveMode ? "主动上传模式" : "被动上传模式"
font.pixelSize: 18
color: isActiveMode ? "green" : "blue"
font.bold: true
}
Button {
text: "切换模式"
Layout.alignment: Qt.AlignRight
enabled: true
onClicked: backend.toggleMode()
}
// 添加读取数据按钮,只在被动模式下显示
Button {
text: "读取数据"
visible: !isActiveMode
enabled: isConnected
Layout.alignment: Qt.AlignRight
onClicked: backend.request_data()
}
}
}
// 数据显示
Rectangle {
Layout.fillWidth: true
Layout.fillHeight: true
color: "#e0e0e0"
radius: 10
ColumnLayout {
anchors.fill: parent
anchors.margins: 20
spacing: 10
// 添加提示文本,只在被动模式下显示
Rectangle {
Layout.fillWidth: true
height: 30
color: "#e8f4ff"
radius: 5
visible: !isActiveMode && isConnected
Text {
anchors.centerIn: parent
text: "被动模式下,请点击「读取数据」按钮获取最新数据"
font.pixelSize: 14
color: "blue"
}
}
GridLayout {
Layout.fillWidth: true
Layout.fillHeight: true
columns: 2
rows: 2
Text {
text: "距离值(mm):"
font.pixelSize: 18
}
Rectangle {
Layout.fillWidth: true
height: 40
color: "white"
radius: 5
border.color: "gray"
Text {
anchors.centerIn: parent
text: distanceValue.toFixed(2)
font.pixelSize: 18
}
}
Text {
text: "温度值(°C):"
font.pixelSize: 18
}
Rectangle {
Layout.fillWidth: true
height: 40
color: "white"
radius: 5
border.color: "gray"
Text {
anchors.centerIn: parent
text: temperatureValue.toFixed(2)
font.pixelSize: 18
}
}
}
}
}
}
}
// 连接后端信号
Connections {
target: backend
function onModeChanged(activeMode) {
isActiveMode = activeMode;
}
function onDataUpdated(distance, temperature) {
distanceValue = distance;
temperatureValue = temperature;
isConnected = true; // 如果收到数据,表示已连接
}
function onConnectionChanged(connected) {
isConnected = connected;
console.log("连接状态已更新: " + (connected ? "已连接" : "未连接"));
}
}
// 初始化
Component.onCompleted: {
// 立即查询后端的连接状态
isConnected = backend.is_connected;
isActiveMode = backend.is_active_mode;
console.log("界面初始化 - 连接状态:", isConnected);
console.log("界面初始化 - 模式状态:", isActiveMode ? "主动模式" : "被动模式");
// 立即请求一次数据更新,确保UI显示正确
if (isConnected && !isActiveMode) {
backend.request_data();
}
}
Timer {
id: connectionTimer
interval: 500 // 减少延迟时间,使界面更快响应
repeat: false
onTriggered: {
if (!isConnected) {
// 再次检查连接状态
isConnected = backend.is_connected;
console.log("连接状态定时器 - 更新连接状态:", isConnected);
if (isConnected && !isActiveMode) {
backend.request_data();
}
}
}
}
}
更多推荐



所有评论(0)