FDA 21 CFR Part 11合规 Qt审计追踪与电子签名嵌入式实现方案
作者 丁林松 cnsilan@163.com
1. 嵌入式架构设计原则
1.1 系统架构概览
在嵌入式环境中实现FDA 21 CFR Part 11合规性需要考虑资源限制、实时性要求和数据完整性保障。嵌入式Qt应用程序的架构设计必须在性能、安全性和合规性之间找到平衡点。
核心架构组件
用户界面层
业务逻辑层
安全服务层
数据持久层
硬件抽象层
嵌入式Qt应用程序架构采用分层设计模式,确保各层职责明确且相互独立。用户界面层负责人机交互,业务逻辑层处理核心功能,安全服务层提供审计追踪和电子签名功能,数据持久层管理数据存储,硬件抽象层处理底层硬件接口。
1.2 内存管理策略
嵌入式系统的内存资源通常有限,因此需要实施高效的内存管理策略。审计日志和电子签名数据的存储必须考虑内存使用优化,同时确保数据完整性和可追溯性。
静态内存分配
为审计日志预分配固定大小的内存缓冲区,避免动态内存分配带来的碎片化问题。
循环缓冲区
使用循环缓冲区存储最近的审计记录,当缓冲区满时覆盖最旧的记录。
压缩存储
对审计日志进行实时压缩,减少存储空间占用。
1.3 实时性考虑
嵌入式系统通常有严格的实时性要求。审计追踪和电子签名功能的实现必须确保不影响系统的实时响应性能。通过异步处理、优先级队列和非阻塞I/O操作来保证系统的实时性。
关键实现要点:
- 审计日志采用异步写入机制
- 电子签名验证使用后台线程处理
- 关键操作具有最高优先级
- 非关键审计操作可延迟处理
2. 审计追踪嵌入式实现
2.1 审计事件捕获机制
在嵌入式Qt应用中,审计事件的捕获需要在不影响正常业务流程的前提下,全面记录所有关键操作。通过Qt的信号槽机制和事件过滤器实现透明的审计事件捕获。
2.1.1 事件拦截器设计
实现一个全局事件拦截器,监控所有需要审计的操作。拦截器采用观察者模式,当特定事件发生时自动触发审计记录的创建。
| 事件类型 | 捕获方法 | 优先级 | 存储策略 |
|---|---|---|---|
| 用户登录/登出 | 认证服务钩子 | 高 | 立即持久化 |
| 数据修改 | 数据访问层拦截 | 高 | 立即持久化 |
| 系统配置变更 | 配置管理器钩子 | 中 | 批量写入 |
| 界面操作 | 事件过滤器 | 低 | 缓存后写入 |
2.1.2 审计数据结构设计
为了在有限的嵌入式存储空间中高效存储审计信息,设计紧凑的数据结构至关重要。审计记录采用二进制格式存储,并使用字段压缩技术减少存储空间。
审计记录结构示例:
struct AuditRecord {
uint32_t timestamp; // 4字节时间戳
uint16_t event_type; // 2字节事件类型
uint16_t user_id; // 2字节用户ID
uint32_t object_id; // 4字节对象ID
uint16_t action_code; // 2字节动作代码
uint8_t data_length; // 1字节数据长度
uint8_t checksum; // 1字节校验和
// 可变长度数据部分
};
2.2 分布式审计存储
嵌入式系统通常存储空间有限,需要实现分布式审计存储策略。通过多级存储架构,将热数据保存在本地快速存储中,冷数据迁移到外部存储设备或云端。
2.2.1 本地存储优化
本地存储采用环形缓冲区和索引表相结合的方式,确保快速访问最近的审计记录,同时支持历史数据查询。
注意:本地存储空间规划需要考虑系统运行时间、审计事件频率和存储介质的耐久性。建议为审计数据预留至少30%的存储余量。
2.2.2 外部存储同步
当本地存储达到预设阈值时,自动将旧的审计记录同步到外部存储。同步过程采用增量备份策略,只传输新增和修改的记录,减少网络带宽占用。
2.3 实时审计监控
实时监控审计系统的运行状态,及时发现异常操作和潜在的安全威胁。监控系统采用规则引擎架构,支持动态配置监控规则和阈值。
异常行为检测
基于用户行为模式分析,检测异常登录、批量操作等可疑行为。
系统性能监控
监控审计系统对整体性能的影响,确保符合实时性要求。
完整性验证
定期验证审计日志的完整性和一致性,检测篡改行为。
3. 电子签名嵌入式实现
3.1 密码学基础设施
嵌入式电子签名系统需要轻量级的密码学实现,在保证安全性的同时最小化计算资源消耗。选择适合嵌入式环境的加密算法和密钥管理方案至关重要。
3.1.1 算法选择策略
针对嵌入式环境的限制,选择计算效率高、内存占用小的密码学算法。椭圆曲线密码学(ECC)因其在相同安全级别下密钥长度更短,成为嵌入式系统的理想选择。
| 算法类型 | 推荐算法 | 密钥长度 | 适用场景 |
|---|---|---|---|
| 数字签名 | ECDSA | 256位 | 文档签名 |
| 哈希函数 | SHA-256 | - | 数据完整性 |
| 对称加密 | AES-128 | 128位 | 数据加密 |
| 密钥交换 | ECDH | 256位 | 会话密钥生成 |
3.1.2 硬件安全模块集成
为了提高密钥的安全性,集成硬件安全模块(HSM)或可信平台模块(TPM)来保护私钥。硬件安全模块提供防篡改的密钥存储和密码学运算环境。
3.2 签名工作流程
电子签名工作流程必须符合FDA 21 CFR Part 11的要求,确保签名的不可否认性、完整性和真实性。工作流程包括用户身份验证、文档哈希计算、数字签名生成和签名验证等步骤。
身份验证
文档哈希
私钥解锁
数字签名
签名附加
审计记录
3.2.1 多因素身份验证
实现多因素身份验证机制,结合用户名密码、生物特征识别和硬件令牌等多种认证方式。在嵌入式环境中,可以利用指纹传感器、智能卡读取器等硬件设备。
3.2.2 签名意图确认
在执行签名操作前,必须明确确认用户的签名意图。通过图形化界面显示即将签名的文档摘要和关键信息,要求用户明确确认后才能进行签名。
签名意图确认要素:
- 文档标识和版本信息
- 签名者身份和角色
- 签名时间和地点
- 签名的法律效力说明
3.3 签名验证与管理
建立完整的签名验证和管理体系,支持签名的创建、验证、撤销和归档。验证过程包括数字签名验证、证书链验证和撤销状态检查。
3.3.1 离线验证机制
考虑到嵌入式设备可能工作在离线环境中,实现离线签名验证机制。通过本地证书存储和证书撤销列表(CRL)缓存,支持在无网络连接的情况下进行签名验证。
3.3.2 长期保存策略
电子签名需要长期保存,以满足法规要求。实现时间戳服务和长期签名格式(LTV),确保签名在密码学算法过时后仍能验证。
4. 数据完整性保护
4.1 端到端完整性验证
实现从数据产生到最终存储的端到端完整性保护机制。通过多层哈希校验、数字签名和区块链技术,确保数据在传输和存储过程中不被篡改。
4.1.1 分层哈希策略
采用分层哈希架构,在不同层级计算和验证数据哈希值。底层对原始数据计算哈希,中间层对数据块计算哈希,顶层对整个数据集计算根哈希。
分层哈希架构
- 记录级哈希:每条审计记录计算独立哈希
- 页面级哈希:存储页面计算聚合哈希
- 文件级哈希:整个审计文件计算总哈希
- 系统级哈希:所有审计数据计算根哈希
4.1.2 实时完整性监控
建立实时完整性监控系统,定期检查数据完整性,及时发现和报告数据篡改行为。监控系统采用增量验证算法,只检查发生变化的数据块,提高验证效率。
4.2 防篡改存储设计
设计防篡改的存储架构,通过硬件和软件相结合的方式保护数据不被恶意修改。使用只写存储介质、加密存储和分布式备份等技术手段。
4.2.1 只写存储模式
对于关键的审计数据,采用只写存储模式,数据一旦写入就不能修改。通过文件系统权限控制和硬件写保护机制实现真正的只写存储。
4.2.2 加密存储实现
所有敏感数据在存储前进行加密,使用强加密算法和安全的密钥管理。加密密钥存储在独立的安全区域,与数据物理隔离。
安全提醒:密钥管理是加密存储的关键,必须确保密钥的生成、分发、轮换和销毁都符合安全标准。
5. 用户权限与访问控制
5.1 基于角色的访问控制
实现细粒度的基于角色的访问控制(RBAC)系统,确保用户只能访问其职责范围内的功能和数据。访问控制策略需要支持动态权限调整和权限继承。
5.1.1 角色定义和权限分配
根据组织结构和业务流程定义不同的用户角色,为每个角色分配适当的权限。权限分配遵循最小权限原则,用户只获得完成工作所必需的最小权限集合。
| 角色类型 | 权限范围 | 审计要求 | 签名权限 |
|---|---|---|---|
| 系统管理员 | 系统配置、用户管理 | 所有操作审计 | 管理签名 |
| 质量管理员 | 质量数据、审计报告 | 数据访问审计 | 质量签名 |
| 操作员 | 日常操作、数据录入 | 操作行为审计 | 操作签名 |
| 审计员 | 审计日志、报告查看 | 查询行为审计 | 只读权限 |
5.1.2 动态权限管理
支持运行时动态调整用户权限,响应组织变化和安全事件。权限变更必须经过审批流程,所有权限变更操作都要记录审计日志。
5.2 会话管理和超时控制
实现安全的会话管理机制,包括会话创建、维持、超时和销毁。会话管理必须防止会话劫持、重放攻击和未授权访问。
5.2.1 自适应超时机制
根据用户活动模式和安全级别实现自适应会话超时。高风险操作需要更短的超时时间,常规操作可以允许更长的会话时间。
5.2.2 并发会话控制
限制同一用户的并发会话数量,防止凭据共享和未授权访问。当检测到异常的并发登录时,自动触发安全警报和额外的身份验证。
6. 合规性验证和测试
6.1 功能合规性测试
建立全面的合规性测试框架,验证系统是否满足FDA 21 CFR Part 11的所有要求。测试框架包括自动化测试脚本和手工测试用例,覆盖所有合规性要求点。
6.1.1 审计功能测试
验证审计系统能够准确记录所有需要审计的操作,审计记录包含完整的上下文信息,并且审计日志不能被篡改或删除。
记录完整性测试
验证所有关键操作都被正确记录,记录内容完整准确。
时间戳准确性测试
验证审计记录的时间戳准确性和同步性。
不可篡改性测试
验证审计日志无法被修改或删除。
6.1.2 电子签名功能测试
全面测试电子签名功能,包括签名创建、验证、管理和长期保存。测试需要覆盖正常场景和异常场景,验证系统的鲁棒性。
6.2 安全性评估
进行全面的安全性评估,识别潜在的安全风险和漏洞。安全评估包括代码审计、渗透测试、配置检查和风险分析。
6.2.1 密码学强度评估
评估所使用密码学算法的强度和实现质量,确保满足当前的安全标准。定期更新密码学库和算法,应对新发现的安全威胁。
6.2.2 权限系统审计
审计权限系统的配置和实现,确保权限分配合理,没有权限提升漏洞。定期检查用户权限,清理不必要的权限授予。
7. 系统监控和维护
7.1 运行状态监控
建立全面的系统监控体系,实时监控系统的运行状态、性能指标和安全事件。监控系统需要支持告警机制和自动响应。
7.1.1 性能指标监控
监控系统的关键性能指标,包括CPU使用率、内存占用、存储空间、网络延迟等。当性能指标超出阈值时,自动触发告警和优化措施。
| 监控指标 | 正常范围 | 警告阈值 | 响应措施 |
|---|---|---|---|
| CPU使用率 | < 70% | 70-90% | 性能优化 |
| 内存使用率 | < 80% | 80-95% | 内存清理 |
| 存储空间 | < 85% | 85-95% | 数据归档 |
| 审计延迟 | < 100ms | 100-500ms | 系统优化 |
7.1.2 安全事件监控
监控安全相关事件,包括登录失败、权限异常、数据访问异常等。建立安全事件响应流程,及时处理安全威胁。
7.2 预防性维护
实施预防性维护策略,定期检查系统健康状况,更新安全补丁,优化性能配置。预防性维护有助于降低系统故障风险,确保持续合规。
7.2.1 定期备份和恢复测试
建立可靠的数据备份机制,定期进行备份和恢复测试。备份数据必须保持与原始数据相同的完整性和安全性保护。
7.2.2 软件更新管理
建立软件更新管理流程,及时应用安全补丁和功能更新。更新过程必须经过充分测试,确保不影响系统的合规性和稳定性。
8. 应急响应和灾难恢复
8.1 应急响应计划
制定详细的应急响应计划,应对各种可能的安全事件和系统故障。应急响应计划包括事件分类、响应流程、角色职责和恢复程序。
8.1.1 安全事件分类
根据事件的严重程度和影响范围对安全事件进行分类,为不同类别的事件制定相应的响应策略。
紧急事件类型:
- 数据泄露或篡改
- 系统入侵或恶意攻击
- 审计系统故障
- 电子签名系统失效
8.1.2 响应流程标准化
建立标准化的事件响应流程,确保所有安全事件都能得到及时、有效的处理。响应流程包括事件发现、评估、隔离、调查、恢复和总结等阶段。
8.2 灾难恢复机制
设计完整的灾难恢复机制,确保在系统发生重大故障时能够快速恢复服务。灾难恢复计划需要考虑数据恢复、系统重建和业务连续性。
8.2.1 数据恢复策略
实现多层次的数据恢复策略,包括本地备份、远程备份和云端备份。不同层次的备份具有不同的恢复时间目标(RTO)和恢复点目标(RPO)。
8.2.2 业务连续性保障
在系统恢复期间,提供基本的业务连续性保障。通过冗余系统、离线模式和手工流程确保关键业务不中断。
9. Python PySide6实现代码
完整的FDA 21 CFR Part 11合规系统实现
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FDA 21 CFR Part 11合规的Qt审计追踪与电子签名系统
嵌入式实现版本
"""
import sys
import json
import hashlib
import sqlite3
import logging
import threading
import queue
import time
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, asdict
from pathlib import Path
import cryptography
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import secrets
from PySide6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QTabWidget, QTableWidget, QTableWidgetItem, QPushButton, QLabel,
QLineEdit, QTextEdit, QComboBox, QCheckBox, QGroupBox, QGridLayout,
QMessageBox, QDialog, QDialogButtonBox, QProgressBar, QSplitter,
QTreeWidget, QTreeWidgetItem, QHeaderView, QFrame, QScrollArea
)
from PySide6.QtCore import (
Qt, QTimer, QThread, pyqtSignal, QObject, QSettings, QDateTime,
QAbstractTableModel, QModelIndex, QVariant
)
from PySide6.QtGui import QFont, QPalette, QColor, QIcon, QPixmap
@dataclass
class AuditRecord:
"""审计记录数据结构"""
timestamp: str
event_type: str
user_id: str
session_id: str
object_id: str
action: str
old_value: Optional[str] = None
new_value: Optional[str] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
result: str = "SUCCESS"
details: Optional[Dict[str, Any]] = None
checksum: Optional[str] = None
def calculate_checksum(self) -> str:
"""计算记录校验和"""
data = f"{self.timestamp}{self.event_type}{self.user_id}{self.action}"
return hashlib.sha256(data.encode()).hexdigest()[:16]
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return asdict(self)
@dataclass
class DigitalSignature:
"""数字签名数据结构"""
signature_id: str
signer_id: str
document_hash: str
signature_data: str
timestamp: str
certificate: str
meaning: str
reason: str = "Document approval"
location: str = "Embedded System"
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return asdict(self)
class CryptoManager:
"""密码学管理器 - 嵌入式优化版本"""
def __init__(self):
self.backend = default_backend()
self._init_crypto_components()
def _init_crypto_components(self):
"""初始化密码学组件"""
# 生成系统密钥对(生产环境应从HSM加载)
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=self.backend
)
self.public_key = self.private_key.public_key()
# 初始化对称加密密钥
self.symmetric_key = secrets.token_bytes(32) # AES-256
def sign_data(self, data: bytes) -> bytes:
"""数字签名"""
try:
signature = self.private_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature
except Exception as e:
logging.error(f"Digital signature failed: {e}")
raise
def verify_signature(self, data: bytes, signature: bytes) -> bool:
"""验证数字签名"""
try:
self.public_key.verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception as e:
logging.error(f"Signature verification failed: {e}")
return False
def encrypt_data(self, data: bytes) -> bytes:
"""AES加密"""
iv = secrets.token_bytes(16)
cipher = Cipher(algorithms.AES(self.symmetric_key), modes.CBC(iv), backend=self.backend)
encryptor = cipher.encryptor()
# PKCS7 padding
padding_length = 16 - (len(data) % 16)
padded_data = data + bytes([padding_length] * padding_length)
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return iv + ciphertext
def decrypt_data(self, encrypted_data: bytes) -> bytes:
"""AES解密"""
iv = encrypted_data[:16]
ciphertext = encrypted_data[16:]
cipher = Cipher(algorithms.AES(self.symmetric_key), modes.CBC(iv), backend=self.backend)
decryptor = cipher.decryptor()
padded_data = decryptor.update(ciphertext) + decryptor.finalize()
# Remove PKCS7 padding
padding_length = padded_data[-1]
return padded_data[:-padding_length]
def hash_data(self, data: bytes) -> str:
"""计算SHA-256哈希"""
digest = hashes.Hash(hashes.SHA256(), backend=self.backend)
digest.update(data)
return digest.finalize().hex()
class DatabaseManager:
"""数据库管理器 - 嵌入式SQLite实现"""
def __init__(self, db_path: str = "compliance_system.db"):
self.db_path = db_path
self.crypto = CryptoManager()
self._init_database()
def _init_database(self):
"""初始化数据库表结构"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS audit_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
event_type TEXT NOT NULL,
user_id TEXT NOT NULL,
session_id TEXT NOT NULL,
object_id TEXT,
action TEXT NOT NULL,
old_value TEXT,
new_value TEXT,
ip_address TEXT,
user_agent TEXT,
result TEXT DEFAULT 'SUCCESS',
details TEXT,
checksum TEXT NOT NULL,
encrypted_data BLOB,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS digital_signatures (
id INTEGER PRIMARY KEY AUTOINCREMENT,
signature_id TEXT UNIQUE NOT NULL,
signer_id TEXT NOT NULL,
document_hash TEXT NOT NULL,
signature_data TEXT NOT NULL,
timestamp TEXT NOT NULL,
certificate TEXT,
meaning TEXT NOT NULL,
reason TEXT,
location TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS system_integrity (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
record_count INTEGER NOT NULL,
hash_value TEXT NOT NULL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
# 创建索引优化查询性能
conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_records(timestamp)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_records(user_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_signatures_signer ON digital_signatures(signer_id)")
def save_audit_record(self, record: AuditRecord) -> bool:
"""保存审计记录"""
try:
record.checksum = record.calculate_checksum()
# 加密敏感数据
record_json = json.dumps(record.to_dict())
encrypted_data = self.crypto.encrypt_data(record_json.encode())
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO audit_records
(timestamp, event_type, user_id, session_id, object_id, action,
old_value, new_value, ip_address, user_agent, result, details,
checksum, encrypted_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
record.timestamp, record.event_type, record.user_id,
record.session_id, record.object_id, record.action,
record.old_value, record.new_value, record.ip_address,
record.user_agent, record.result, json.dumps(record.details),
record.checksum, encrypted_data
))
return True
except Exception as e:
logging.error(f"Failed to save audit record: {e}")
return False
def save_digital_signature(self, signature: DigitalSignature) -> bool:
"""保存数字签名"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO digital_signatures
(signature_id, signer_id, document_hash, signature_data,
timestamp, certificate, meaning, reason, location)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
signature.signature_id, signature.signer_id, signature.document_hash,
signature.signature_data, signature.timestamp, signature.certificate,
signature.meaning, signature.reason, signature.location
))
return True
except Exception as e:
logging.error(f"Failed to save digital signature: {e}")
return False
def get_audit_records(self, limit: int = 1000, offset: int = 0) -> List[AuditRecord]:
"""获取审计记录"""
try:
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
cursor = conn.execute("""
SELECT * FROM audit_records
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
""", (limit, offset))
records = []
for row in cursor.fetchall():
# 解密数据
try:
decrypted_data = self.crypto.decrypt_data(row['encrypted_data'])
record_dict = json.loads(decrypted_data.decode())
records.append(AuditRecord(**record_dict))
except Exception as e:
logging.warning(f"Failed to decrypt audit record {row['id']}: {e}")
# 回退到未加密数据
record = AuditRecord(
timestamp=row['timestamp'],
event_type=row['event_type'],
user_id=row['user_id'],
session_id=row['session_id'],
object_id=row['object_id'],
action=row['action'],
old_value=row['old_value'],
new_value=row['new_value'],
ip_address=row['ip_address'],
user_agent=row['user_agent'],
result=row['result'],
details=json.loads(row['details']) if row['details'] else None,
checksum=row['checksum']
)
records.append(record)
return records
except Exception as e:
logging.error(f"Failed to get audit records: {e}")
return []
def verify_integrity(self) -> Dict[str, bool]:
"""验证数据完整性"""
results = {}
try:
with sqlite3.connect(self.db_path) as conn:
# 验证审计记录完整性
cursor = conn.execute("SELECT COUNT(*) FROM audit_records")
audit_count = cursor.fetchone()[0]
cursor = conn.execute("SELECT checksum FROM audit_records ORDER BY id")
checksums = [row[0] for row in cursor.fetchall()]
# 计算整体哈希
combined_hash = hashlib.sha256(''.join(checksums).encode()).hexdigest()
# 验证与上次记录的哈希是否一致
cursor = conn.execute("""
SELECT hash_value FROM system_integrity
WHERE table_name = 'audit_records'
ORDER BY timestamp DESC LIMIT 1
""")
last_hash = cursor.fetchone()
if last_hash:
results['audit_records'] = (combined_hash == last_hash[0])
else:
results['audit_records'] = True # 首次验证
# 更新完整性记录
conn.execute("""
INSERT INTO system_integrity (table_name, record_count, hash_value)
VALUES ('audit_records', ?, ?)
""", (audit_count, combined_hash))
# 验证数字签名表
cursor = conn.execute("SELECT COUNT(*) FROM digital_signatures")
sig_count = cursor.fetchone()[0]
results['digital_signatures'] = True # 简化验证
except Exception as e:
logging.error(f"Integrity verification failed: {e}")
results['error'] = str(e)
return results
class AuditService(QObject):
"""审计服务 - 异步处理审计记录"""
audit_recorded = pyqtSignal(AuditRecord)
def __init__(self, db_manager: DatabaseManager):
super().__init__()
self.db_manager = db_manager
self.audit_queue = queue.Queue()
self.current_user = None
self.current_session = None
self._start_audit_worker()
def _start_audit_worker(self):
"""启动审计工作线程"""
self.audit_worker = threading.Thread(target=self._process_audit_queue, daemon=True)
self.audit_worker.start()
def _process_audit_queue(self):
"""处理审计队列"""
while True:
try:
record = self.audit_queue.get(timeout=1)
if record is None: # 停止信号
break
success = self.db_manager.save_audit_record(record)
if success:
self.audit_recorded.emit(record)
else:
logging.error(f"Failed to save audit record: {record}")
self.audit_queue.task_done()
except queue.Empty:
continue
except Exception as e:
logging.error(f"Audit worker error: {e}")
def set_current_user(self, user_id: str, session_id: str):
"""设置当前用户和会话"""
self.current_user = user_id
self.current_session = session_id
def log_event(self, event_type: str, action: str, object_id: str = None,
old_value: str = None, new_value: str = None,
details: Dict[str, Any] = None):
"""记录审计事件"""
if not self.current_user:
logging.warning("No current user set for audit logging")
return
record = AuditRecord(
timestamp=datetime.now(timezone.utc).isoformat(),
event_type=event_type,
user_id=self.current_user,
session_id=self.current_session or "unknown",
object_id=object_id or "",
action=action,
old_value=old_value,
new_value=new_value,
details=details or {}
)
# 异步添加到队列
self.audit_queue.put(record)
def log_user_action(self, action: str, **kwargs):
"""记录用户操作"""
self.log_event("USER_ACTION", action, **kwargs)
def log_system_event(self, action: str, **kwargs):
"""记录系统事件"""
self.log_event("SYSTEM_EVENT", action, **kwargs)
def log_security_event(self, action: str, **kwargs):
"""记录安全事件"""
self.log_event("SECURITY_EVENT", action, **kwargs)
class DigitalSignatureService(QObject):
"""数字签名服务"""
signature_created = pyqtSignal(DigitalSignature)
def __init__(self, db_manager: DatabaseManager, crypto_manager: CryptoManager):
super().__init__()
self.db_manager = db_manager
self.crypto = crypto_manager
def create_signature(self, signer_id: str, document_data: bytes,
meaning: str, reason: str = "Document approval") -> Optional[DigitalSignature]:
"""创建数字签名"""
try:
# 计算文档哈希
document_hash = self.crypto.hash_data(document_data)
# 创建签名数据
signature_data = self.crypto.sign_data(document_data)
# 生成签名记录
signature = DigitalSignature(
signature_id=secrets.token_hex(16),
signer_id=signer_id,
document_hash=document_hash,
signature_data=signature_data.hex(),
timestamp=datetime.now(timezone.utc).isoformat(),
certificate="", # 在实际实现中应包含X.509证书
meaning=meaning,
reason=reason,
location="Embedded System"
)
# 保存到数据库
if self.db_manager.save_digital_signature(signature):
self.signature_created.emit(signature)
return signature
else:
logging.error("Failed to save digital signature")
return None
except Exception as e:
logging.error(f"Failed to create digital signature: {e}")
return None
def verify_signature(self, signature: DigitalSignature, document_data: bytes) -> bool:
"""验证数字签名"""
try:
# 验证文档哈希
current_hash = self.crypto.hash_data(document_data)
if current_hash != signature.document_hash:
logging.warning("Document hash mismatch")
return False
# 验证数字签名
signature_bytes = bytes.fromhex(signature.signature_data)
return self.crypto.verify_signature(document_data, signature_bytes)
except Exception as e:
logging.error(f"Signature verification failed: {e}")
return False
class AuditTableModel(QAbstractTableModel):
"""审计记录表格模型"""
def __init__(self, records: List[AuditRecord] = None):
super().__init__()
self.records = records or []
self.headers = [
"时间戳", "事件类型", "用户ID", "操作", "对象ID", "结果", "校验和"
]
def rowCount(self, parent=QModelIndex()):
return len(self.records)
def columnCount(self, parent=QModelIndex()):
return len(self.headers)
def data(self, index, role=Qt.DisplayRole):
if not index.isValid() or index.row() >= len(self.records):
return QVariant()
record = self.records[index.row()]
column = index.column()
if role == Qt.DisplayRole:
if column == 0:
return record.timestamp
elif column == 1:
return record.event_type
elif column == 2:
return record.user_id
elif column == 3:
return record.action
elif column == 4:
return record.object_id or ""
elif column == 5:
return record.result
elif column == 6:
return record.checksum or ""
return QVariant()
def headerData(self, section, orientation, role=Qt.DisplayRole):
if orientation == Qt.Horizontal and role == Qt.DisplayRole:
return self.headers[section]
return QVariant()
def update_records(self, records: List[AuditRecord]):
"""更新记录数据"""
self.beginResetModel()
self.records = records
self.endResetModel()
class ComplianceMainWindow(QMainWindow):
"""主窗口"""
def __init__(self):
super().__init__()
self.db_manager = DatabaseManager()
self.crypto_manager = CryptoManager()
self.audit_service = AuditService(self.db_manager)
self.signature_service = DigitalSignatureService(self.db_manager, self.crypto_manager)
self.current_user = "admin" # 模拟当前用户
self.current_session = secrets.token_hex(8)
self.audit_service.set_current_user(self.current_user, self.current_session)
self.init_ui()
self.connect_signals()
self.load_data()
# 记录系统启动事件
self.audit_service.log_system_event("SYSTEM_START")
def init_ui(self):
"""初始化用户界面"""
self.setWindowTitle("FDA 21 CFR Part 11 合规系统")
self.setGeometry(100, 100, 1200, 800)
# 创建中央部件
central_widget = QWidget()
self.setCentralWidget(central_widget)
# 创建主布局
main_layout = QVBoxLayout(central_widget)
# 创建状态栏信息
status_frame = QFrame()
status_frame.setFrameStyle(QFrame.StyledPanel)
status_layout = QHBoxLayout(status_frame)
self.user_label = QLabel(f"当前用户: {self.current_user}")
self.session_label = QLabel(f"会话ID: {self.current_session}")
self.status_label = QLabel("系统正常运行")
status_layout.addWidget(self.user_label)
status_layout.addWidget(self.session_label)
status_layout.addStretch()
status_layout.addWidget(self.status_label)
main_layout.addWidget(status_frame)
# 创建选项卡界面
self.tab_widget = QTabWidget()
main_layout.addWidget(self.tab_widget)
# 创建各个选项卡
self.create_audit_tab()
self.create_signature_tab()
self.create_integrity_tab()
self.create_monitoring_tab()
def create_audit_tab(self):
"""创建审计选项卡"""
audit_widget = QWidget()
layout = QVBoxLayout(audit_widget)
# 控制面板
control_frame = QFrame()
control_frame.setFrameStyle(QFrame.StyledPanel)
control_layout = QHBoxLayout(control_frame)
self.refresh_btn = QPushButton("刷新数据")
self.clear_btn = QPushButton("清空显示")
self.export_btn = QPushButton("导出审计日志")
control_layout.addWidget(self.refresh_btn)
control_layout.addWidget(self.clear_btn)
control_layout.addWidget(self.export_btn)
control_layout.addStretch()
layout.addWidget(control_frame)
# 审计记录表格
self.audit_model = AuditTableModel()
self.audit_table = QTableWidget()
self.audit_table.setColumnCount(7)
self.audit_table.setHorizontalHeaderLabels([
"时间戳", "事件类型", "用户ID", "操作", "对象ID", "结果", "校验和"
])
self.audit_table.horizontalHeader().setStretchLastSection(True)
layout.addWidget(self.audit_table)
# 详细信息面板
details_frame = QFrame()
details_frame.setFrameStyle(QFrame.StyledPanel)
details_layout = QVBoxLayout(details_frame)
details_layout.addWidget(QLabel("审计记录详细信息:"))
self.audit_details = QTextEdit()
self.audit_details.setMaximumHeight(150)
details_layout.addWidget(self.audit_details)
layout.addWidget(details_frame)
self.tab_widget.addTab(audit_widget, "审计追踪")
def create_signature_tab(self):
"""创建数字签名选项卡"""
signature_widget = QWidget()
layout = QVBoxLayout(signature_widget)
# 签名创建面板
create_group = QGroupBox("创建数字签名")
create_layout = QGridLayout(create_group)
create_layout.addWidget(QLabel("文档内容:"), 0, 0)
self.document_text = QTextEdit()
self.document_text.setMaximumHeight(100)
create_layout.addWidget(self.document_text, 0, 1)
create_layout.addWidget(QLabel("签名含义:"), 1, 0)
self.signature_meaning = QLineEdit()
create_layout.addWidget(self.signature_meaning, 1, 1)
create_layout.addWidget(QLabel("签名原因:"), 2, 0)
self.signature_reason = QLineEdit()
self.signature_reason.setText("Document approval")
create_layout.addWidget(self.signature_reason, 2, 1)
self.create_signature_btn = QPushButton("创建签名")
create_layout.addWidget(self.create_signature_btn, 3, 1)
layout.addWidget(create_group)
# 签名验证面板
verify_group = QGroupBox("验证数字签名")
verify_layout = QGridLayout(verify_group)
verify_layout.addWidget(QLabel("签名ID:"), 0, 0)
self.verify_signature_id = QLineEdit()
verify_layout.addWidget(self.verify_signature_id, 0, 1)
self.verify_signature_btn = QPushButton("验证签名")
verify_layout.addWidget(self.verify_signature_btn, 1, 1)
self.verification_result = QLabel("验证结果: 未验证")
verify_layout.addWidget(self.verification_result, 2, 0, 1, 2)
layout.addWidget(verify_group)
# 签名历史表格
history_group = QGroupBox("签名历史")
history_layout = QVBoxLayout(history_group)
self.signature_table = QTableWidget()
self.signature_table.setColumnCount(6)
self.signature_table.setHorizontalHeaderLabels([
"签名ID", "签名者", "时间戳", "含义", "原因", "状态"
])
history_layout.addWidget(self.signature_table)
layout.addWidget(history_group)
self.tab_widget.addTab(signature_widget, "电子签名")
def create_integrity_tab(self):
"""创建完整性检查选项卡"""
integrity_widget = QWidget()
layout = QVBoxLayout(integrity_widget)
# 完整性检查控制面板
control_group = QGroupBox("完整性检查控制")
control_layout = QGridLayout(control_group)
self.check_integrity_btn = QPushButton("执行完整性检查")
self.auto_check_enabled = QCheckBox("启用自动检查")
self.check_interval = QComboBox()
self.check_interval.addItems(["1分钟", "5分钟", "15分钟", "1小时"])
control_layout.addWidget(self.check_integrity_btn, 0, 0)
control_layout.addWidget(self.auto_check_enabled, 0, 1)
control_layout.addWidget(QLabel("检查间隔:"), 1, 0)
control_layout.addWidget(self.check_interval, 1, 1)
layout.addWidget(control_group)
# 完整性结果显示
results_group = QGroupBox("完整性检查结果")
results_layout = QVBoxLayout(results_group)
self.integrity_results = QTextEdit()
self.integrity_results.setReadOnly(True)
results_layout.addWidget(self.integrity_results)
layout.addWidget(results_group)
# 系统健康状态
health_group = QGroupBox("系统健康状态")
health_layout = QGridLayout(health_group)
self.db_status = QLabel("数据库: 正常")
self.crypto_status = QLabel("加密系统: 正常")
self.audit_status = QLabel("审计系统: 正常")
health_layout.addWidget(self.db_status, 0, 0)
health_layout.addWidget(self.crypto_status, 0, 1)
health_layout.addWidget(self.audit_status, 1, 0)
layout.addWidget(health_group)
self.tab_widget.addTab(integrity_widget, "完整性检查")
def create_monitoring_tab(self):
"""创建监控选项卡"""
monitoring_widget = QWidget()
layout = QVBoxLayout(monitoring_widget)
# 实时统计
stats_group = QGroupBox("实时统计")
stats_layout = QGridLayout(stats_group)
self.total_audits = QLabel("总审计记录: 0")
self.total_signatures = QLabel("总签名数: 0")
self.failed_operations = QLabel("失败操作: 0")
self.last_check_time = QLabel("最后检查: 未执行")
stats_layout.addWidget(self.total_audits, 0, 0)
stats_layout.addWidget(self.total_signatures, 0, 1)
stats_layout.addWidget(self.failed_operations, 1, 0)
stats_layout.addWidget(self.last_check_time, 1, 1)
layout.addWidget(stats_group)
# 系统日志
log_group = QGroupBox("系统日志")
log_layout = QVBoxLayout(log_group)
self.system_log = QTextEdit()
self.system_log.setReadOnly(True)
self.system_log.setMaximumHeight(200)
log_layout.addWidget(self.system_log)
log_control_layout = QHBoxLayout()
self.clear_log_btn = QPushButton("清空日志")
self.save_log_btn = QPushButton("保存日志")
log_control_layout.addWidget(self.clear_log_btn)
log_control_layout.addWidget(self.save_log_btn)
log_control_layout.addStretch()
log_layout.addLayout(log_control_layout)
layout.addWidget(log_group)
# 性能监控
performance_group = QGroupBox("性能监控")
performance_layout = QGridLayout(performance_group)
self.cpu_usage = QProgressBar()
self.memory_usage = QProgressBar()
self.disk_usage = QProgressBar()
performance_layout.addWidget(QLabel("CPU使用率:"), 0, 0)
performance_layout.addWidget(self.cpu_usage, 0, 1)
performance_layout.addWidget(QLabel("内存使用率:"), 1, 0)
performance_layout.addWidget(self.memory_usage, 1, 1)
performance_layout.addWidget(QLabel("磁盘使用率:"), 2, 0)
performance_layout.addWidget(self.disk_usage, 2, 1)
layout.addWidget(performance_group)
self.tab_widget.addTab(monitoring_widget, "系统监控")
def connect_signals(self):
"""连接信号和槽"""
# 审计选项卡
self.refresh_btn.clicked.connect(self.refresh_audit_data)
self.clear_btn.clicked.connect(self.clear_audit_display)
self.export_btn.clicked.connect(self.export_audit_log)
self.audit_table.itemSelectionChanged.connect(self.show_audit_details)
# 数字签名选项卡
self.create_signature_btn.clicked.connect(self.create_digital_signature)
self.verify_signature_btn.clicked.connect(self.verify_digital_signature)
# 完整性检查选项卡
self.check_integrity_btn.clicked.connect(self.check_data_integrity)
self.auto_check_enabled.stateChanged.connect(self.toggle_auto_check)
# 监控选项卡
self.clear_log_btn.clicked.connect(self.clear_system_log)
self.save_log_btn.clicked.connect(self.save_system_log)
# 审计服务信号
self.audit_service.audit_recorded.connect(self.on_audit_recorded)
self.signature_service.signature_created.connect(self.on_signature_created)
# 定时器
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.update_monitoring_data)
self.update_timer.start(5000) # 每5秒更新一次
def load_data(self):
"""加载初始数据"""
self.refresh_audit_data()
self.update_monitoring_data()
def refresh_audit_data(self):
"""刷新审计数据"""
try:
records = self.db_manager.get_audit_records(limit=1000)
self.audit_table.setRowCount(len(records))
for i, record in enumerate(records):
self.audit_table.setItem(i, 0, QTableWidgetItem(record.timestamp))
self.audit_table.setItem(i, 1, QTableWidgetItem(record.event_type))
self.audit_table.setItem(i, 2, QTableWidgetItem(record.user_id))
self.audit_table.setItem(i, 3, QTableWidgetItem(record.action))
self.audit_table.setItem(i, 4, QTableWidgetItem(record.object_id or ""))
self.audit_table.setItem(i, 5, QTableWidgetItem(record.result))
self.audit_table.setItem(i, 6, QTableWidgetItem(record.checksum or ""))
self.audit_service.log_system_event("AUDIT_DATA_REFRESH",
details={"record_count": len(records)})
except Exception as e:
self.log_message(f"刷新审计数据失败: {e}")
def clear_audit_display(self):
"""清空审计显示"""
self.audit_table.setRowCount(0)
self.audit_details.clear()
self.audit_service.log_user_action("CLEAR_AUDIT_DISPLAY")
def export_audit_log(self):
"""导出审计日志"""
try:
records = self.db_manager.get_audit_records(limit=10000)
# 简化导出:写入JSON文件
export_data = [record.to_dict() for record in records]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"audit_log_{timestamp}.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, ensure_ascii=False, indent=2)
self.log_message(f"审计日志已导出到: {filename}")
self.audit_service.log_user_action("EXPORT_AUDIT_LOG",
details={"filename": filename, "record_count": len(records)})
except Exception as e:
self.log_message(f"导出审计日志失败: {e}")
def show_audit_details(self):
"""显示审计记录详细信息"""
current_row = self.audit_table.currentRow()
if current_row >= 0:
try:
records = self.db_manager.get_audit_records(limit=1000)
if current_row < len(records):
record = records[current_row]
details = json.dumps(record.to_dict(), ensure_ascii=False, indent=2)
self.audit_details.setText(details)
except Exception as e:
self.audit_details.setText(f"加载详细信息失败: {e}")
def create_digital_signature(self):
"""创建数字签名"""
try:
document_text = self.document_text.toPlainText()
meaning = self.signature_meaning.text()
reason = self.signature_reason.text()
if not document_text or not meaning:
QMessageBox.warning(self, "警告", "请填写文档内容和签名含义")
return
document_data = document_text.encode('utf-8')
signature = self.signature_service.create_signature(
self.current_user, document_data, meaning, reason
)
if signature:
QMessageBox.information(self, "成功", f"数字签名创建成功\n签名ID: {signature.signature_id}")
self.document_text.clear()
self.signature_meaning.clear()
self.audit_service.log_user_action("CREATE_DIGITAL_SIGNATURE",
object_id=signature.signature_id,
details={"meaning": meaning, "reason": reason})
else:
QMessageBox.critical(self, "错误", "数字签名创建失败")
except Exception as e:
QMessageBox.critical(self, "错误", f"创建数字签名时发生错误: {e}")
def verify_digital_signature(self):
"""验证数字签名"""
try:
signature_id = self.verify_signature_id.text()
if not signature_id:
self.verification_result.setText("验证结果: 请输入签名ID")
return
# 这里应该从数据库获取签名记录和原始文档
# 简化实现:显示验证界面
self.verification_result.setText("验证结果: 功能演示 - 签名有效")
self.audit_service.log_user_action("VERIFY_DIGITAL_SIGNATURE",
object_id=signature_id)
except Exception as e:
self.verification_result.setText(f"验证结果: 验证失败 - {e}")
def check_data_integrity(self):
"""检查数据完整性"""
try:
self.integrity_results.append("开始执行完整性检查...")
results = self.db_manager.verify_integrity()
self.integrity_results.append(f"检查时间: {datetime.now()}")
for table, result in results.items():
status = "通过" if result else "失败"
self.integrity_results.append(f"{table}: {status}")
self.integrity_results.append("完整性检查完成\n")
self.audit_service.log_system_event("INTEGRITY_CHECK_PERFORMED",
details=results)
except Exception as e:
self.integrity_results.append(f"完整性检查失败: {e}\n")
def toggle_auto_check(self, state):
"""切换自动检查"""
enabled = state == Qt.Checked
self.audit_service.log_system_event("AUTO_CHECK_TOGGLED",
details={"enabled": enabled})
def clear_system_log(self):
"""清空系统日志"""
self.system_log.clear()
self.audit_service.log_user_action("CLEAR_SYSTEM_LOG")
def save_system_log(self):
"""保存系统日志"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"system_log_{timestamp}.txt"
with open(filename, 'w', encoding='utf-8') as f:
f.write(self.system_log.toPlainText())
self.log_message(f"系统日志已保存到: {filename}")
self.audit_service.log_user_action("SAVE_SYSTEM_LOG",
details={"filename": filename})
except Exception as e:
self.log_message(f"保存系统日志失败: {e}")
def update_monitoring_data(self):
"""更新监控数据"""
try:
# 更新统计信息
records = self.db_manager.get_audit_records(limit=1)
audit_count = len(self.db_manager.get_audit_records(limit=10000)) # 简化计数
self.total_audits.setText(f"总审计记录: {audit_count}")
self.last_check_time.setText(f"最后检查: {datetime.now().strftime('%H:%M:%S')}")
# 模拟性能数据
import random
self.cpu_usage.setValue(random.randint(10, 80))
self.memory_usage.setValue(random.randint(30, 70))
self.disk_usage.setValue(random.randint(20, 60))
except Exception as e:
self.log_message(f"更新监控数据失败: {e}")
def on_audit_recorded(self, record: AuditRecord):
"""处理审计记录事件"""
self.log_message(f"新增审计记录: {record.action} by {record.user_id}")
def on_signature_created(self, signature: DigitalSignature):
"""处理数字签名创建事件"""
self.log_message(f"数字签名已创建: {signature.signature_id}")
def log_message(self, message: str):
"""记录系统日志消息"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
self.system_log.append(log_entry)
# 自动滚动到底部
scrollbar = self.system_log.verticalScrollBar()
scrollbar.setValue(scrollbar.maximum())
def closeEvent(self, event):
"""窗口关闭事件"""
self.audit_service.log_system_event("SYSTEM_SHUTDOWN")
# 等待审计队列处理完成
self.audit_service.audit_queue.put(None) # 停止信号
event.accept()
def main():
"""主函数"""
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('compliance_system.log'),
logging.StreamHandler(sys.stdout)
]
)
# 创建应用程序
app = QApplication(sys.argv)
app.setApplicationName("FDA 21 CFR Part 11 合规系统")
app.setApplicationVersion("1.0.0")
# 设置应用程序样式
app.setStyle('Fusion')
# 创建主窗口
window = ComplianceMainWindow()
window.show()
# 运行应用程序
sys.exit(app.exec())
if __name__ == "__main__":
main()
更多推荐
所有评论(0)