W5500 esp32 mpy 参考视频

参考网址

W5500 Lite 丝印简写:

接线按照:sck=Pin(26),mosi=Pin(25),miso=Pin(13) cs=Pin(27,Pin.OUT),电源3.3V

1、网线直连电脑,初始化失败,默认是有DHCP服务器

#创建W55xx驱动对象

nic =WIZNET5K(spi,cs,rst)

在ESP32的mpy下运行上述代码报错:

File "wiznet5k.py", line 170, in __init__ AssertionError: Failed to configure DHCP Server!

静态IP有构造方法 nic=WIZNET5K(spi,cs,rst,is_dhcp=False)

def __init__(self, spi_bus, cs, reset=None, is_dhcp=True, mac=DEFAULT_MAC, hostname=None, dhcp_timeout=30, debug=False) 

2、手动设置网络参数
nic.ifconfig = ('192.168.1.100', '255.255.255.0', '192.168.1.1', '8.8.8.8')
报错 TypeError: can't convert str to int

    @property
    def ifconfig(self):
        """Returns the network configuration as a tuple."""
        print('IFCONFIG')    
        return (self.ip_address, self.read(REG_SUBR, 0x00, 4), self.read(REG_GAR, 0x00, 4), self._dns)
        

    @ifconfig.setter
    def ifconfig(self, params):
        """Sets network configuration to provided tuple in format:
        (ip_address, subnet_mask, gateway_address, dns_server).
        """
        ip_address, subnet_mask, gateway_address, dns_server = params
        self.write(REG_SIPR, 0x04, ip_address)
        self.write(REG_SUBR, 0x04, subnet_mask)
        self.write(REG_GAR, 0x04, gateway_address)
        self._dns = dns_server

解决办法:

def ip_to_bytes(ip_str):
    """将IP地址字符串转换为字节格式"""
    return bytes([int(part) for part in ip_str.split('.')])

nic.ifconfig = (
    ip_to_bytes('172.16.30.119'),
    ip_to_bytes('255.255.255.0'),
    ip_to_bytes('172.16.30.254'),
    ip_to_bytes('8.8.8.8')
)

from wiznet5k import WIZNET5K
from machine import Pin,SPI
import wiznet5k_socket as socket
import sma_esp32_w5500_requests as requests
import time
#自定义接线引脚
spi=SPI(2,baudrate=8000000,sck=Pin(26),mosi=Pin(25),miso=Pin(13))
#CS对应的GPIO
cs=Pin(27,Pin.OUT)
#虚指GPIO实际接高电平即可
rst=Pin(39)
#创建W5500驱动对象 is_dhcp=False
nic=WIZNET5K(spi,cs,rst,is_dhcp=False)
#打印相关信息
print("\n\n以太网芯片版本:", nic.chip)
print("网卡MAC地址:",[hex(i) for i in nic.mac_address])
print("IP地址:",nic.pretty_ip(nic.ip_address))

# def ip_to_bytes(ip_str):
#     parts = ip_str.split('.')
#     # 将每个部分转换为整数,然后转换为字节
#     return bytes(int(part) for part in parts)

def ip_to_bytes(ip_str):
    """将IP地址字符串转换为字节格式"""
    return bytes([int(part) for part in ip_str.split('.')])

# 手动设置网络参数
# nic.ifconfig = ('192.168.1.100', '255.255.255.0', '192.168.1.1', '8.8.8.8')
# TypeError: can't convert str to int
nic.ifconfig = (
    ip_to_bytes('172.16.30.119'),
    ip_to_bytes('255.255.255.0'),
    ip_to_bytes('172.16.30.254'),
    ip_to_bytes('8.8.8.8')
)
print("网络配置设置成功")

'''设置网络接口
关键点:在创建 socket 之前,必须调用 socket.set_interface(nic) 来设置 W5500 为默认网络接口。
这是解决 'NoneType' object has no attribute 'get_socket' 错误的关键步骤'''
socket.set_interface(nic)
print("网络接口设置成功")

#创建udp套接字
udp_socket =socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print("UDP socket创建成功!")

# 测试socket功能
udp_socket.settimeout(1.0)
print("Socket配置完成")
    
#准备接收方的地址
dest_addr = ('172.16.30.133', 8080)
print("IP地址:",nic.pretty_ip(nic.ip_address))

#发送数据到指定的电脑上的指定程序中
for i in range(1000000):
    send_data="hello world--%d"%i
    print(send_data)
    udp_socket.sendto(send_data.encode('utf-8'),dest_addr)
    time.sleep(0.1)

#关闭套接字
udp_socket.close()

第一部分:上面写的这些常识。但UDP与TCP连接对抽象层socket有不同的需求。
硬件驱动层 (wiznet5k.py)基础通信正常:TCP连接建立、数据传输、状态机转换。
核心方法工作:
socket_open() - 正确打开socket
socket_listen() - 监听功能正常
socket_accept() - 接受连接(有socket交换机制)
socket_write() - 数据发送正常(返回实际字节数)
socket_read() - 数据接收正常
socket_status() - 状态读取准确

第二部分:TCP测试与抽象层API修复
W5500芯片内置8个独立的物理Socket(编号0-7)
每个Socket独立工作,有自己的寄存器组
硬件限制:最大并发连接数 = 8(包括监听Socket)
二、W5500硬件特殊性
1. 8个物理Socket硬件实现
W5500芯片内置8个独立的物理Socket(编号0-7)
每个Socket独立工作,有自己的寄存器组
硬件限制:最大并发连接数 = 8(包括监听Socket)
2. Socket状态机严格
# W5500 Socket状态编码
SNSR_SOCK_CLOSED = 0x00      # 关闭
SNSR_SOCK_INIT = 0x13        # 初始化
SNSR_SOCK_LISTEN = 0x14      # 监听
SNSR_SOCK_ESTABLISHED = 0x17 # 已连接
SNSR_SOCK_UDP = 0x22         # UDP模式
3. 监听与数据Socket分离
监听Socket(LISTEN状态)专门用于接受连接
数据Socket(ESTABLISHED状态)专门用于数据传输
accept()返回的是新的数据Socket

三层架构模型
┌─────────────────────────────────────┐
│应用程序层 (Application)             │ ← 使用标准socket API
│          ↓ 标准socket API                                                          │
├─────────────────────────────────────┤
│socket抽象层 (wiznet5k_socket.py)  │ ← 关键桥梁,API转换
│                                  ↓ 封装硬件细节,提供标准接口           │
├─────────────────────────────────────┤
│硬件驱动层 (wiznet5k.py)          │ ← 直接操作W5500寄存器
│                                           ↓ SPI通信,寄存器操作              │
└─────────────────────────────────────┘

TCP连接流程正常,三次握手:CLOSED → INIT → LISTEN → SYNRECV → ESTABLISHED
Socket编号交换机制:监听socket和数据socket正确分离
多端口监听支持:可同时监听多个端口(8个物理socket)

抽象层 wiznet5k_socket.py 的主要bug已经通过deepseek修订,如下直接使用:

# SPDX-FileCopyrightText: 2019 ladyada for Adafruit Industries
# SPDX-FileCopyrightText: 2020 Brent Rubell for Adafruit Industries
# SPDX-FileCopyrightText: 2021 Vincenzo D'Angelo
#
# SPDX-License-Identifier: MIT

"""
`wiznet5k_socket`
================================================================================

A socket compatible interface with the Wiznet5k module.
修复版:解决响应延迟问题

* Author(s): ladyada, Brent Rubell, Patrick Van Oosterwijck, Adam Cummick, Vincenzo D'Angelo
* 修复: 响应延迟修复

"""

import gc
import time

from micropython import const

SNSR_SOCK_CLOSED = const(0x00)
SNSR_SOCK_INIT = const(0x13)
SNSR_SOCK_LISTEN = const(0x14)
SNSR_SOCK_SYNSENT = const(0x15)
SNSR_SOCK_SYNRECV = const(0x16)
SNSR_SOCK_ESTABLISHED = const(0x17)
SNSR_SOCK_FIN_WAIT = const(0x18)
SNSR_SOCK_CLOSING = const(0x1A)
SNSR_SOCK_TIME_WAIT = const(0x1B)
SNSR_SOCK_CLOSE_WAIT = const(0x1C)
SNSR_SOCK_LAST_ACK = const(0x1D)
SNSR_SOCK_UDP = const(0x22)

SNMR_TCP = const(0x21)
SNMR_UDP = const(0x02)

_the_interface = None  # pylint: disable=invalid-name

def set_interface(iface):
    """Helper to set the global internet interface."""
    global _the_interface  # pylint: disable=global-statement, invalid-name
    _the_interface = iface


def htonl(x):
    """Convert 32-bit positive integers from host to network byte order."""
    return (
        ((x) << 24 & 0xFF000000)
        | ((x) << 8 & 0x00FF0000)
        | ((x) >> 8 & 0x0000FF00)
        | ((x) >> 24 & 0x000000FF)
    )


def htons(x):
    """Convert 16-bit positive integers from host to network byte order."""
    return (((x) << 8) & 0xFF00) | (((x) >> 8) & 0xFF)


SOCK_STREAM = const(0x21)  # TCP
TCP_MODE = 80
SOCK_DGRAM = const(0x02)  # UDP
AF_INET = const(3)
SOCKET_INVALID = const(255)


# pylint: disable=too-many-arguments, unused-argument
def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
    """Translate the host/port argument into a sequence of 5-tuples that
    contain all the necessary arguments for creating a socket connected to that service.

    """
    if not isinstance(port, int):
        raise RuntimeError("Port must be an integer")
    if is_ipv4(host):
        return [(AF_INET, socktype, proto, "", (host, port))]
    return [(AF_INET, socktype, proto, "", (gethostbyname(host), port))]


def gethostbyname(hostname):
    """Translate a host name to IPv4 address format. The IPv4 address
    is returned as a string.
    :param str hostname: Desired hostname.
    """
    addr = _the_interface.get_host_by_name(hostname)
    addr = "{}.{}.{}.{}".format(addr[0], addr[1], addr[2], addr[3])
    return addr


def is_ipv4(host):
    """Checks if a host string is an IPv4 address.
    :param str host: host's name or ip
    """
    octets = host.split(".", 3)
    if len(octets) != 4 or not "".join(octets).isdigit():
        return False
    for octet in octets:
        if int(octet) > 255:
            return False
    return True


# pylint: disable=invalid-name, too-many-public-methods
class socket:
    """A simplified implementation of the Python 'socket' class
    for connecting to a Wiznet5k module.
    修复版:优化recv()方法避免阻塞延迟
    
    :param int family: Socket address (and protocol) family.
    :param int type: Socket type.

    """

    # pylint: disable=redefined-builtin,unused-argument
    def __init__(self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None):
        if family != AF_INET:
            raise RuntimeError("Only AF_INET family supported by W5K modules.")
        self._sock_type = type
        self._buffer = b""
        self._timeout = 0
        self._listen_port = None

        self._socknum = _the_interface.get_socket()
        if self._socknum == SOCKET_INVALID:
            raise RuntimeError("Failed to allocate socket.")

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        if self._sock_type == SOCK_STREAM:
            self.disconnect()
            stamp = time.time()
            while self.status == SNSR_SOCK_FIN_WAIT:
                if time.time() - stamp > 1000:
                    raise RuntimeError("Failed to disconnect socket")
        self.close()
        stamp = time.time()
        while self.status != SNSR_SOCK_CLOSED:
            if time.time() - stamp > 1000:
                raise RuntimeError("Failed to close socket")

    @property
    def socknum(self):
        """Returns the socket object's socket number."""
        return self._socknum

    @property
    def status(self):
        """Returns the status of the socket"""
        return _the_interface.socket_status(self.socknum)[0]

    @property
    def connected(self):
        """Returns whether or not we are connected to the socket."""
        if self.socknum >= _the_interface.max_sockets:
            return False
        status = _the_interface.socket_status(self.socknum)[0]
        if (
            status == SNSR_SOCK_CLOSE_WAIT
            and self.available() == 0
        ):
            result = False
        else:
            result = status not in (
                SNSR_SOCK_CLOSED,
                SNSR_SOCK_LISTEN,
                SNSR_SOCK_TIME_WAIT,
                SNSR_SOCK_FIN_WAIT,
            )
        if not result and status != SNSR_SOCK_LISTEN:
            self.close()
        return result

    def getpeername(self):
        """Return the remote address to which the socket is connected."""
        return _the_interface.remote_ip(self.socknum)

    def inet_aton(self, ip_string):
        """Convert an IPv4 address from dotted-quad string format.
        :param str ip_string: IP Address, as a dotted-quad string.

        """
        self._buffer = b""
        self._buffer = [int(item) for item in ip_string.split(".")]
        self._buffer = bytearray(self._buffer)
        return self._buffer

    def bind(self, address):
        """Bind the socket to the listen port, if host is specified the interface
        will be reconfigured to that IP.
        :param tuple address: local socket as a (host, port) tuple.
        """
        if address[0] is not None:
            ip_address = _the_interface.unpretty_ip(address[0])
            current_ip, subnet_mask, gw_addr, dns = _the_interface.ifconfig
            if ip_address != current_ip:
                _the_interface.ifconfig = (ip_address, subnet_mask, gw_addr, dns)
        self._listen_port = address[1]
        # For UDP servers we need to open the socket here because we won't call
        # listen
        if self._sock_type == SOCK_DGRAM:
            _the_interface.socket_listen(
                self.socknum, self._listen_port, SNMR_UDP
            )
            self._buffer = b""

    def listen(self, backlog=None):
        """Listen on the port specified by bind.
        :param backlog: For compatibility but ignored.
        """
        assert self._listen_port is not None, "Use bind to set the port before listen!"
        _the_interface.socket_listen(self.socknum, self._listen_port)
        self._buffer = b""

    def accept(self):
        """Accept a connection. The socket must be bound to an address and listening for
        connections. The return value is a pair (conn, address) where conn is a new
        socket object usable to send and receive data on the connection, and address is
        the address bound to the socket on the other end of the connection.
        """
        stamp = time.time()
        while self.status not in (
            SNSR_SOCK_SYNRECV,
            SNSR_SOCK_ESTABLISHED,
        ):
            if self._timeout is not None and self._timeout > 0 and time.time() - stamp > self._timeout:
                return None
            if self.status == SNSR_SOCK_CLOSED:
                self.close()
                self.listen()

        new_listen_socknum, addr = _the_interface.socket_accept(self.socknum)
        current_socknum = self.socknum
        # Create a new socket object and swap socket nums so we can continue listening
        client_sock = socket()
        client_sock._socknum = current_socknum  # pylint: disable=protected-access
        self._socknum = new_listen_socknum  # pylint: disable=protected-access
        self.bind((None, self._listen_port))
        self.listen()
        while self.status != SNSR_SOCK_LISTEN:
            raise RuntimeError("Failed to open new listening socket")
        return client_sock, addr

    def connect(self, address, conntype=None):
        """Connect to a remote socket at address.
        :param tuple address: Remote socket as a (host, port) tuple.
        """
        assert (
            conntype != 0x03
        ), "Error: SSL/TLS is not currently supported by CircuitPython."
        host, port = address

        if hasattr(host, "split"):
            try:
                host = tuple(map(int, host.split(".")))
            except ValueError:
                host = _the_interface.get_host_by_name(host)
        if self._listen_port is not None:
            _the_interface.src_port = self._listen_port
        result = _the_interface.socket_connect(
            self.socknum, host, port, conn_mode=self._sock_type
        )
        _the_interface.src_port = 0
        if not result:
            raise RuntimeError("Failed to connect to host", host)
        self._buffer = b""

    def send(self, data):
        """Send data to the socket. The socket must be connected to
        a remote socket.
        :param bytearray data: Desired data to send to the socket.
        :return: Number of bytes actually sent.
        """
        # 处理timeout参数 - 保持原始精度
        if self._timeout is None:
            timeout_val = None  # None表示阻塞模式,传递给底层处理
        elif isinstance(self._timeout, (int, float)):
            timeout_val = self._timeout  # 保持原始值
        else:
            timeout_val = 0  # 其他情况使用默认值
        
        result = _the_interface.socket_write(self.socknum, data, timeout_val)
        gc.collect()
        return result  # 返回实际发送的字节数

    def sendto(self, data, address):
        """Send data to the socket. The socket must be connected to
        a remote socket.
        :param bytearray data: Desired data to send to the socket.
        :param tuple address: Remote socket as a (host, port) tuple.
        :return: Number of bytes actually sent.
        """
        self.connect(address)
        return self.send(data)

    def recv(self, bufsize=0, flags=0):  # pylint: disable=too-many-branches
        """Reads some bytes from the connected remote address.
        修复版:优化接收逻辑,避免阻塞延迟
        
        :param int bufsize: Maximum number of bytes to receive.
        :param int flags: ignored, present for compatibility.
        """
        if self.status == SNSR_SOCK_CLOSED:
            return b""

        if bufsize == 0:
            # read everything on the socket
            while True:
                avail = self.available()
                if avail:
                    if self._sock_type == SOCK_STREAM:
                        # 正确处理socket_read返回值
                        ret, data = _the_interface.socket_read(self.socknum, avail)
                        if ret > 0 and isinstance(data, (bytes, bytearray)):
                            self._buffer += data
                        else:
                            # 没有数据或数据无效
                            break
                    elif self._sock_type == SOCK_DGRAM:
                        # 同样修复UDP读取
                        ret, data = _the_interface.read_udp(self.socknum, avail)
                        if ret > 0 and isinstance(data, (bytes, bytearray)):
                            self._buffer += data
                        else:
                            break
                else:
                    break
            gc.collect()
            ret = self._buffer
            self._buffer = b""
            gc.collect()
            return ret
        
        # 🔥 关键修复:如果有数据就立即返回,不要等待 bufsize
        # 检查缓冲区是否已经有足够的数据
        if len(self._buffer) >= bufsize:
            ret = self._buffer[:bufsize]
            self._buffer = self._buffer[bufsize:]
            gc.collect()
            return ret
        
        stamp = time.time()
        to_read = bufsize - len(self._buffer)
        received = []
        
        # 优化:每次循环检查是否有数据,而不是等待
        while to_read > 0:
            # 检查是否有数据可用
            avail = self.available()
            if avail:
                # 只读取实际可用的数据,不超过需要的数据量
                read_size = min(to_read, avail)
                
                if self._sock_type == SOCK_STREAM:
                    ret, data = _the_interface.socket_read(self.socknum, read_size)
                    if ret > 0 and isinstance(data, (bytes, bytearray)):
                        self._buffer += data
                        to_read -= ret
                elif self._sock_type == SOCK_DGRAM:
                    ret, data = _the_interface.read_udp(self.socknum, read_size)
                    if ret > 0 and isinstance(data, (bytes, bytearray)):
                        self._buffer += data
                        to_read -= ret
                
                stamp = time.time()  # 重置超时计时器
                gc.collect()
            
            # 检查超时
            if self._timeout is not None and self._timeout > 0 and time.time() - stamp > self._timeout:
                break
            
            # 如果没有数据,短暂休眠避免CPU占用过高
            if avail == 0:
                time.sleep(0.001)  # 1ms休眠
        
        # 返回数据(可能没有达到bufsize)
        if len(self._buffer) >= bufsize:
            ret = self._buffer[:bufsize]
            self._buffer = self._buffer[bufsize:]
        else:
            # 返回所有可用的数据
            ret = self._buffer
            self._buffer = b""
        
        gc.collect()
        return ret

    def recvfrom(self, bufsize=0, flags=0):
        """Reads some bytes from the connected remote address.
        :param int bufsize: Maximum number of bytes to receive.
        :param int flags: ignored, present for compatibility.
        :returns: a tuple (bytes, address) where address is a tuple (ip, port)
        """
        return (
            self.recv(bufsize),
            (
                _the_interface.remote_ip(self.socknum),
                _the_interface.remote_port(self.socknum),
            ),
        )

    def recv_into(self, buf, nbytes=0, flags=0):
        """Reads some bytes from the connected remote address info the provided buffer.
        :param bytearray buf: Data buffer
        :param nbytes: Maximum number of bytes to receive
        :param int flags: ignored, present for compatibility.
        :returns: the number of bytes received
        """
        if nbytes == 0:
            nbytes = len(buf)
        ret = self.recv(nbytes)
        nbytes = len(ret)
        buf[:nbytes] = ret
        return nbytes

    def recvfrom_into(self, buf, nbytes=0, flags=0):
        """Reads some bytes from the connected remote address info the provided buffer.
        :param bytearray buf: Data buffer
        :param nbytes: Maximum number of bytes to receive
        :param int flags: ignored, present for compatibility.
        :returns a tuple (nbytes, address) where address is a tuple (ip, port)
        """
        return (
            self.recv_into(buf, nbytes),
            (
                _the_interface.remote_ip(self.socknum),
                _the_interface.remote_port(self.socknum),
            ),
        )

    def readline(self):
        """Attempt to return as many bytes as we can up to \
        but not including '\r\n'.

        """
        stamp = time.time()
        while b"\r\n" not in self._buffer:
            avail = self.available()
            if avail:
                if self._sock_type == SOCK_STREAM:
                    ret, data = _the_interface.socket_read(self.socknum, avail)
                    if ret > 0 and isinstance(data, (bytes, bytearray)):
                        self._buffer += data
                elif self._sock_type == SOCK_DGRAM:
                    ret, data = _the_interface.read_udp(self.socknum, avail)
                    if ret > 0 and isinstance(data, (bytes, bytearray)):
                        self._buffer += data
            if (
                not avail
                and self._timeout is not None
                and self._timeout > 0
                and time.time() - stamp > self._timeout
            ):
                self.close()
                raise RuntimeError("Didn't receive response, failing out...")
        firstline, self._buffer = self._buffer.split(b"\r\n", 1)
        gc.collect()
        return firstline

    def disconnect(self):
        """Disconnects a TCP socket."""
        assert self._sock_type == SOCK_STREAM, "Socket must be a TCP socket."
        _the_interface.socket_disconnect(self.socknum)

    def close(self):
        """Closes the socket."""
        _the_interface.socket_close(self.socknum)

    def available(self):
        """Returns how many bytes of data are available to be read from the socket."""
        return _the_interface.socket_available(self.socknum, self._sock_type)

    def settimeout(self, value):
        """Sets socket read timeout.
        :param value: Socket read timeout in seconds, or None for blocking mode.
        """
        if value is None:
            self._timeout = None
        elif isinstance(value, (int, float)):
            if value < 0:
                raise Exception("Timeout period should be non-negative.")
            self._timeout = value
        else:
            raise TypeError("Timeout must be a number or None")
    
    def gettimeout(self):
        """Return the timeout in seconds (float) associated
        with socket operations, or None if no timeout is set.

        """
        return self._timeout


第三部分:多连接压力测试8个socket满负荷运行

W5500 Windows客户端 - V3.5 (增加长时间稳定性测试)

"""
W5500 Windows客户端 - V3.5 (增加长时间稳定性测试)
修复:恢复V2的连接策略
新增:1. 高频消息测试模式
      2. 大数据包测试模式
      3. 混合负载测试模式
      4. 异常恢复测试模式
      5. 长时间稳定性测试模式
修复:连接断开时自动重连
"""

import socket
import threading
import time
import json
from datetime import datetime
import random
import struct

# ==================== 配置 ====================
CONFIG = {
    'server_ip': '172.16.30.75',
    'server_port': 9050,
    'test_mode': 'normal',  # 'normal', 'high_freq', 'large_packet', 'mixed_load', 'recovery', 'long_term'
    'test_duration': 30,
    'log_file': 'w5500_final_fix.log',
}

# 不同测试模式的配置
TEST_MODES = {
    'normal': {
        'client_count': 8,
        'message_interval': 1.0,
        'message_size': 64,
        'test_name': '常规压力测试'
    },
    'high_freq': {
        'client_count': 8,
        'message_interval': 0.2,  # 5条/秒 → 40条/秒总吞吐
        'message_size': 64,
        'test_name': '高频小消息测试'
    },
    'large_packet': {
        'client_count': 8,
        'message_interval': 2.0,
        'message_size': 1024,  # 1KB大消息
        'test_name': '大数据包测试'
    },
    'mixed_load': {
        'client_count': 8,
        'test_name': '混合负载测试',
        'clients_config': [
            {'interval': 0.2, 'size': 64},    # 快速客户端1
            {'interval': 0.2, 'size': 64},    # 快速客户端2
            {'interval': 0.5, 'size': 128},   # 中速客户端1
            {'interval': 0.5, 'size': 128},   # 中速客户端2
            {'interval': 1.0, 'size': 256},   # 慢速客户端1
            {'interval': 1.0, 'size': 256},   # 慢速客户端2
            {'interval': 2.0, 'size': 512},   # 超慢客户端1
            {'interval': 2.0, 'size': 1024},  # 超慢客户端2(大数据包)
        ]
    },
    'recovery': {
        'client_count': 8,
        'test_name': '异常恢复测试',
        'test_phases': [
            {'name': 'phase1', 'duration': 10, 'action': 'normal'},      # 正常连接阶段
            {'name': 'phase2', 'duration': 5, 'action': 'disconnect'},   # 断开阶段
            {'name': 'phase3', 'duration': 5, 'action': 'reconnect'},    # 重连阶段
            {'name': 'phase4', 'duration': 10, 'action': 'verify'},      # 验证阶段
        ]
    },
    'long_term': {
        'client_count': 8,
        'test_name': '长时间稳定性测试',
        'duration': 3600,  # 1小时
        'interval_range': [0.5, 5.0],  # 随机间隔
        'size_range': [64, 1024],      # 随机大小
        'monitoring': {
            'memory_check': 60,    # 每60秒检查内存
            'connection_check': 30 # 每30秒检查连接
        }
    }
}

# ==================== 统计 ====================
class TestStatsV35:
    def __init__(self):
        self.start_time = None
        self.end_time = None
        self.clients_connected = 0
        self.total_sent = 0
        self.total_received = 0
        self.total_bytes_sent = 0
        self.total_bytes_received = 0
        self.connection_errors = 0
        self.send_errors = 0
        self.receive_errors = 0
        self.reconnections = 0
        self.disconnections = 0
        self.test_mode = 'normal'
        self.client_types = {}
        self.current_phase = 'initial'
        self.long_term_stats = {
            'memory_usage': [],
            'connection_counts': [],
            'throughput_history': [],
            'error_counts': []
        }
        
    def start_test(self):
        self.start_time = time.time()
        
    def end_test(self):
        self.end_time = time.time()
        
    def get_duration(self):
        if self.start_time and self.end_time:
            return self.end_time - self.start_time
        elif self.start_time:
            return time.time() - self.start_time
        return 0
    
    def add_reconnection(self):
        self.reconnections += 1
    
    def add_disconnection(self):
        self.disconnections += 1
    
    def record_long_term_stat(self, stat_type, value):
        """记录长时间测试的统计"""
        timestamp = time.time() - self.start_time
        if stat_type == 'memory':
            self.long_term_stats['memory_usage'].append((timestamp, value))
        elif stat_type == 'connections':
            self.long_term_stats['connection_counts'].append((timestamp, value))
        elif stat_type == 'throughput':
            self.long_term_stats['throughput_history'].append((timestamp, value))
        elif stat_type == 'errors':
            self.long_term_stats['error_counts'].append((timestamp, value))

stats = TestStatsV35()

# ==================== 客户端类 - V3.5版 ====================
class ClientV35:
    def __init__(self, client_id, client_config=None):
        self.id = client_id
        self.socket = None
        self.running = False
        self.connected = False
        self.sent_count = 0
        self.recv_count = 0
        self.bytes_sent = 0
        self.bytes_received = 0
        self.send_times = []
        self.thread = None
        self.disconnect_logged = False
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5  # 增加重连次数
        self.last_disconnect_time = 0
        self.reconnection_successful = False
        self.random_interval = False
        self.random_size = False
        
        # 测试配置
        if client_config:
            # 混合负载模式使用传入的配置
            self.message_interval = client_config['interval']
            self.message_size = client_config['size']
            self.client_type = self.get_client_type(client_config)
        else:
            # 其他模式使用默认配置
            if CONFIG['test_mode'] in TEST_MODES:
                test_config = TEST_MODES[CONFIG['test_mode']]
                if 'message_interval' in test_config:
                    self.message_interval = test_config['message_interval']
                    self.message_size = test_config['message_size']
                elif CONFIG['test_mode'] == 'long_term':
                    # 长时间测试使用随机配置
                    self.random_interval = True
                    self.random_size = True
                    self.message_interval = random.uniform(*test_config['interval_range'])
                    self.message_size = random.randint(*test_config['size_range'])
                else:
                    # 异常恢复测试使用默认值
                    self.message_interval = 1.0
                    self.message_size = 64
            else:
                self.message_interval = 1.0
                self.message_size = 64
            self.client_type = 'standard'
        
        self.test_mode = CONFIG['test_mode']
        
    def get_client_type(self, config):
        """根据配置确定客户端类型"""
        if config['interval'] <= 0.3:
            speed = 'fast'
        elif config['interval'] <= 0.8:
            speed = 'medium'
        elif config['interval'] <= 1.5:
            speed = 'slow'
        else:
            speed = 'very_slow'
            
        if config['size'] >= 512:
            size = 'large'
        elif config['size'] >= 128:
            size = 'medium'
        else:
            size = 'small'
            
        return f"{speed}_{size}"
    
    def connect(self):
        try:
            print(f"[Client{self.id:02d}] 连接中...")
            self.socket = socket.socket()
            self.socket.settimeout(5.0)  # 连接超时
            self.socket.connect((CONFIG['server_ip'], CONFIG['server_port']))
            self.socket.settimeout(0.5)  # 接收超时
            self.connected = True
            stats.clients_connected += 1
            
            # 记录重连成功
            if self.reconnect_attempts > 0:
                stats.add_reconnection()
                self.reconnection_successful = True
                print(f"[Client{self.id:02d}] 重连成功 (第{self.reconnect_attempts}次尝试)")
            else:
                print(f"[Client{self.id:02d}] 连接成功")
            
            return True
        except Exception as e:
            print(f"[Client{self.id:02d}] 连接失败: {e}")
            stats.connection_errors += 1
            return False
    
    def reconnect(self):
        """重连方法"""
        if self.reconnect_attempts >= self.max_reconnect_attempts:
            print(f"[Client{self.id:02d}] 已达到最大重连次数 ({self.max_reconnect_attempts})")
            return False
        
        self.reconnect_attempts += 1
        print(f"[Client{self.id:02d}] 尝试重连 ({self.reconnect_attempts}/{self.max_reconnect_attempts})...")
        
        # 关闭旧socket
        if self.socket:
            try:
                self.socket.close()
            except:
                pass
        
        self.socket = None
        self.connected = False
        self.disconnect_logged = False
        
        # 等待一段时间再重连
        wait_time = min(1.0 * self.reconnect_attempts, 3.0)  # 减少等待时间
        print(f"[Client{self.id:02d}] 等待{wait_time:.1f}秒后重连...")
        time.sleep(wait_time)
        
        return self.connect()
    
    def start(self):
        self.running = True
        self.thread = threading.Thread(target=self.run_client, daemon=True)
        self.thread.start()
    
    def run_client(self):
        if not self.connect():
            self.running = False
            return
        
        start_time = time.time()
        duration = CONFIG['test_duration']
        buffer = b""
        last_send_time = 0
        phase_start_time = start_time
        current_phase_index = 0
        last_random_change = start_time
        last_connection_check = start_time
        connection_check_interval = 2.0  # 每2秒检查一次连接
        
        # 长时间测试的特殊设置
        if self.test_mode == 'long_term':
            duration = TEST_MODES['long_term']['duration']
            print(f"[Client{self.id:02d}] 长时间测试: {duration}秒, "
                  f"初始间隔: {self.message_interval:.1f}s, 大小: {self.message_size}字节")
        
        # 异常恢复测试的阶段管理
        elif self.test_mode == 'recovery':
            phases = TEST_MODES['recovery']['test_phases']
            total_phase_duration = sum(phase['duration'] for phase in phases)
            duration = min(duration, total_phase_duration)
        
        try:
            while self.running and time.time() - start_time < duration:
                current_time = time.time()
                elapsed = current_time - start_time
                
                # 检查连接状态(定期主动检查)
                if current_time - last_connection_check >= connection_check_interval:
                    last_connection_check = current_time
                    if not self.connected:
                        print(f"[Client{self.id:02d}] 检测到连接断开,尝试重连...")
                        if not self.reconnect():
                            print(f"[Client{self.id:02d}] 重连失败,停止客户端")
                            break
                        else:
                            # 重连成功,重置缓冲区
                            buffer = b""
                
                # 长时间测试的随机变化
                if self.test_mode == 'long_term' and self.random_interval:
                    if current_time - last_random_change > 30.0:  # 每30秒变化一次
                        self.message_interval = random.uniform(*TEST_MODES['long_term']['interval_range'])
                        self.message_size = random.randint(*TEST_MODES['long_term']['size_range'])
                        last_random_change = current_time
                        print(f"[Client{self.id:02d}] 配置变化: 间隔{self.message_interval:.1f}s, 大小{self.message_size}字节")
                
                # 异常恢复测试的阶段检查
                elif self.test_mode == 'recovery':
                    phase_elapsed = current_time - phase_start_time
                    if current_phase_index < len(phases) - 1 and phase_elapsed >= phases[current_phase_index]['duration']:
                        current_phase_index += 1
                        phase_start_time = current_time
                        new_phase = phases[current_phase_index]
                        stats.current_phase = new_phase['name']
                        print(f"\n[阶段切换] 进入{new_phase['name']}: {new_phase['action']}")
                        
                        # 执行阶段动作
                        if new_phase['action'] == 'disconnect':
                            if self.id in [7, 8]:  # 随机断开2个客户端
                                print(f"[Client{self.id:02d}] 执行主动断开")
                                self.force_disconnect()
                                continue
                        elif new_phase['action'] == 'reconnect':
                            if self.id in [7, 8] and not self.connected:
                                print(f"[Client{self.id:02d}] 尝试重新连接")
                                self.reconnect()
                
                # 根据连接状态决定是否发送消息
                if self.connected:
                    # 根据间隔发送消息
                    if current_time - last_send_time >= self.message_interval:
                        if not self.send_message():
                            # 发送失败,立即尝试重连
                            print(f"[Client{self.id:02d}] 发送失败,尝试重连...")
                            if self.reconnect():
                                # 重连成功后立即发送消息
                                if self.send_message():
                                    last_send_time = current_time
                                continue
                            else:
                                print(f"[Client{self.id:02d}] 重连失败,停止客户端")
                                break
                        last_send_time = current_time
                    
                    # 接收消息
                    buffer = self.receive_messages(buffer)
                
                # 根据测试模式调整休眠时间
                sleep_time = self.get_sleep_time()
                time.sleep(sleep_time)
                
        except Exception as e:
            print(f"[Client{self.id:02d}] 运行错误: {e}")
            import traceback
            traceback.print_exc()
        finally:
            self.stop_client()
    
    def force_disconnect(self):
        """强制断开连接(用于异常恢复测试)"""
        if self.connected and self.socket:
            try:
                # 发送断开通知
                disconnect_msg = f"BYE Client{self.id:02d} (主动断开)\r\n"
                self.socket.settimeout(0.1)
                self.socket.send(disconnect_msg.encode())
            except:
                pass
            
            # 关闭socket
            try:
                self.socket.close()
            except:
                pass
            
            stats.clients_connected -= 1
            stats.add_disconnection()
            self.connected = False
            self.last_disconnect_time = time.time()
            print(f"[Client{self.id:02d}] 已主动断开连接")
    
    def get_sleep_time(self):
        """根据客户端类型获取休眠时间"""
        if self.test_mode == 'high_freq':
            return 0.001
        elif self.test_mode == 'mixed_load':
            if self.message_interval <= 0.3:
                return 0.001  # 快速客户端
            elif self.message_interval <= 0.8:
                return 0.005  # 中速客户端
            elif self.message_interval <= 1.5:
                return 0.01   # 慢速客户端
            else:
                return 0.05   # 超慢客户端(大数据包)
        elif self.test_mode == 'large_packet':
            return 0.05
        elif self.test_mode == 'long_term':
            return 0.01  # 长时间测试使用中等休眠
        else:
            return 0.01
    
    def send_message(self):
        """发送消息"""
        # 检查连接状态
        if not self.connected or not self.socket:
            if self.running and not self.disconnect_logged:
                print(f"[Client{self.id:02d}] 发送时连接已断开,尝试重连...")
                if self.reconnect():
                    # 重连成功,继续发送
                    print(f"[Client{self.id:02d}] 重连成功,继续发送")
                else:
                    print(f"[Client{self.id:02d}] 重连失败")
                    return False
            else:
                return False
        
        try:
            self.sent_count += 1
            
            # 根据消息大小选择发送方式
            if self.message_size >= 512:
                # 大数据包使用结构化数据
                message = self.create_large_message()
            elif self.message_size >= 128:
                # 中等大小消息
                message = self.create_medium_message()
            else:
                # 小消息
                message = self.create_small_message()
            
            # 设置发送超时
            self.socket.settimeout(1.0)
            sent = self.socket.send(message)
            
            if sent == len(message):
                stats.total_sent += 1
                stats.total_bytes_sent += sent
                self.bytes_sent += sent
                self.send_times.append(time.time())
                
                # 根据测试模式和消息频率调整日志输出
                self.log_send_progress(sent)
                
                return True
            else:
                print(f"[Client{self.id:02d}] 发送不完整 ({sent}/{len(message)}字节)")
                stats.send_errors += 1
                self.connected = False
                return False
                
        except socket.timeout:
            print(f"[Client{self.id:02d}] 发送超时")
            stats.send_errors += 1
            self.connected = False
            return False
            
        except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError, ConnectionError) as e:
            print(f"[Client{self.id:02d}] 发送时连接断开: {e}")
            stats.send_errors += 1
            self.connected = False
            
            # 尝试重连
            if self.running:
                return self.reconnect()  # 返回重连结果
            return False
            
        except Exception as e:
            err_str = str(e)
            if "10038" in err_str or "10053" in err_str or "10054" in err_str:
                print(f"[Client{self.id:02d}] 连接错误: {e}")
                stats.send_errors += 1
                self.connected = False
                return False
            print(f"[Client{self.id:02d}] 发送失败: {e}")
            stats.send_errors += 1
            return False
    
    def create_small_message(self):
        """创建小消息"""
        timestamp = datetime.now().strftime('%H:%M:%S.%f')[:12]
        base_msg = f"Client{self.id:02d} Msg{self.sent_count:04d} {timestamp}"
        
        # 如果消息长度不够,填充随机数据
        if len(base_msg) < self.message_size - 2:
            padding = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', 
                                            k=self.message_size - len(base_msg) - 3))
            msg = f"{base_msg} {padding}\r\n"
        else:
            msg = f"{base_msg[:self.message_size-3]}...\r\n"
        
        return msg.encode()
    
    def create_medium_message(self):
        """创建中等大小消息"""
        timestamp = time.time()
        msg_id = self.sent_count
        
        # 创建消息头
        header = struct.pack('!HHf', self.id, msg_id, timestamp)
        
        # 计算需要填充的数据大小
        remaining = self.message_size - len(header) - 2  # -2 for \r\n
        
        if remaining > 0:
            # 生成填充数据(随机文本)
            text_data = f"Medium message #{msg_id} from Client{self.id:02d} "
            padding_len = remaining - len(text_data.encode())
            
            if padding_len > 0:
                padding = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz', k=padding_len))
                full_text = text_data + padding
            else:
                full_text = text_data[:remaining]
            
            padding_bytes = full_text.encode()[:remaining]
        else:
            padding_bytes = b""
        
        # 组合完整消息
        message = header + padding_bytes + b"\r\n"
        
        # 确保消息大小正确
        if len(message) != self.message_size:
            # 调整填充数据
            padding_len = self.message_size - len(header) - 2
            padding = bytes([random.randint(32, 126) for _ in range(padding_len)])
            message = header + padding + b"\r\n"
        
        return message
    
    def create_large_message(self):
        """创建大数据包"""
        timestamp = time.time()
        msg_id = self.sent_count
        
        # 创建消息头
        header = struct.pack('!HHd', self.id, msg_id, timestamp)
        
        # 计算需要填充的数据大小
        remaining = self.message_size - len(header) - 2  # -2 for \r\n
        
        if remaining > 0:
            # 生成填充数据(结构化数据)
            # 第一部分:文本描述
            text_part = f"Large packet #{msg_id} from Client{self.id:02d} ".encode()
            
            # 第二部分:二进制数据
            binary_part = bytes([(i + msg_id) % 256 for i in range(256)])
            
            # 第三部分:重复填充以达到指定大小
            repeat_count = (remaining - len(text_part)) // len(binary_part) + 1
            padding = text_part + (binary_part * repeat_count)
            padding = padding[:remaining]
        else:
            padding = b""
        
        # 组合完整消息
        message = header + padding + b"\r\n"
        
        # 确保消息大小正确
        if len(message) != self.message_size:
            # 简单填充
            padding_len = self.message_size - len(header) - 2
            padding = bytes([random.randint(0, 255) for _ in range(padding_len)])
            message = header + padding + b"\r\n"
        
        return message
    
    def log_send_progress(self, sent_bytes):
        """记录发送进度"""
        if self.test_mode == 'long_term':
            # 长时间测试减少日志输出
            if self.sent_count % 100 == 0:
                elapsed = time.time() - self.send_times[0] if self.send_times else 0
                if elapsed > 0:
                    rate = self.sent_count / elapsed
                    throughput = self.bytes_sent / elapsed
                    print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, "
                          f"运行{elapsed:.0f}秒, 速率: {rate:.1f}条/秒")
        elif self.test_mode == 'recovery':
            # 异常恢复测试减少日志输出
            if self.sent_count % 20 == 0:
                elapsed = time.time() - self.send_times[0] if self.send_times else 0
                if elapsed > 0:
                    rate = self.sent_count / elapsed
                    print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, 阶段: {stats.current_phase}")
        elif self.test_mode == 'mixed_load':
            log_interval = max(5, int(5 / self.message_interval))
            if self.sent_count % log_interval == 0:
                elapsed = time.time() - self.send_times[0] if self.send_times else 0
                if elapsed > 0:
                    rate = self.sent_count / elapsed
                    throughput = self.bytes_sent / elapsed
                    print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, "
                          f"速率: {rate:.1f}条/秒, 吞吐: {throughput/1024:.1f}KB/s")
        elif self.test_mode == 'high_freq':
            if self.sent_count % 50 == 0:
                elapsed = time.time() - self.send_times[0] if self.send_times else 0
                if elapsed > 0:
                    rate = self.sent_count / elapsed
                    print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条, 速率: {rate:.1f}条/秒")
        elif self.test_mode == 'large_packet':
            if self.sent_count % 5 == 0:
                print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条 ({sent_bytes}字节)")
        else:
            if self.sent_count % 20 == 0:
                print(f"[Client{self.id:02d}] 已发送 {self.sent_count} 条")
    
    def receive_messages(self, buffer):
        """接收消息"""
        if not self.connected or not self.socket:
            # 尝试重连
            if self.running and not self.disconnect_logged:
                print(f"[Client{self.id:02d}] 接收时连接已断开,尝试重连...")
                if self.reconnect():
                    print(f"[Client{self.id:02d}] 重连成功")
                    return buffer
                else:
                    print(f"[Client{self.id:02d}] 重连失败,停止接收")
            return buffer
        
        try:
            # 根据消息大小调整接收缓冲区
            recv_size = max(1024, self.message_size * 2)
            data = self.socket.recv(recv_size)
            
            if data:
                buffer += data
                self.bytes_received += len(data)
                stats.total_bytes_received += len(data)
                
                while b'\r\n' in buffer:
                    idx = buffer.find(b'\r\n')
                    message = buffer[:idx]
                    buffer = buffer[idx+2:]
                    
                    if message:
                        self.recv_count += 1
                        stats.total_received += 1
                        
                        # 根据测试模式调整日志输出
                        self.log_receive_progress(message)
        
        except socket.timeout:
            pass
        
        except (ConnectionResetError, BrokenPipeError):
            if not self.disconnect_logged:
                print(f"[Client{self.id:02d}] 接收时连接已断开,尝试重连...")
                self.disconnect_logged = True
                self.connected = False
                stats.add_disconnection()
                
                # 尝试立即重连
                if self.running:
                    if self.reconnect():
                        self.disconnect_logged = False
                    else:
                        print(f"[Client{self.id:02d}] 重连失败")
            stats.receive_errors += 1
        
        except Exception as e:
            err_str = str(e)
            if "10038" in err_str or "10053" in err_str or "10054" in err_str:
                if not self.disconnect_logged:
                    print(f"[Client{self.id:02d}] 接收时连接已关闭,尝试重连...")
                    self.disconnect_logged = True
                    self.connected = False
                    stats.add_disconnection()
                    
                    # 尝试立即重连
                    if self.running:
                        if self.reconnect():
                            self.disconnect_logged = False
                        else:
                            print(f"[Client{self.id:02d}] 重连失败")
            elif "timed out" not in err_str:
                print(f"[Client{self.id:02d}] 接收错误: {e}")
            stats.receive_errors += 1
        
        return buffer
    
    def log_receive_progress(self, message):
        """记录接收进度"""
        if self.test_mode == 'long_term':
            # 长时间测试特殊处理
            if self.recv_count % 50 == 0:
                try:
                    msg_str = message.decode('utf-8', errors='ignore')
                    if msg_str.startswith("ACK"):
                        print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count}")
                except:
                    if self.recv_count % 100 == 0:
                        print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
        elif self.test_mode == 'recovery':
            # 异常恢复测试特殊处理
            try:
                msg_str = message.decode('utf-8', errors='ignore')
                if msg_str.startswith("ACK"):
                    if self.recv_count % 10 == 0:
                        print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count} (阶段: {stats.current_phase})")
            except:
                if self.recv_count % 5 == 0:
                    print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
        elif self.test_mode == 'mixed_load':
            log_interval = max(10, int(10 / self.message_interval))
            if self.recv_count % log_interval == 0:
                try:
                    msg_str = message.decode('utf-8', errors='ignore')
                    if msg_str.startswith("ACK"):
                        print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count} ({len(message)}字节)")
                except:
                    print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
        elif self.test_mode == 'high_freq':
            if self.recv_count % 100 == 0:
                try:
                    msg_str = message.decode('utf-8', errors='ignore')
                    if msg_str.startswith("ACK"):
                        print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count}")
                except:
                    pass
        elif self.test_mode == 'large_packet':
            if self.recv_count % 5 == 0:
                print(f"[Client{self.id:02d}] 收到消息 #{self.recv_count} ({len(message)}字节)")
        else:
            if self.recv_count % 10 == 0:
                try:
                    msg_str = message.decode('utf-8', errors='ignore')
                    if msg_str.startswith("ACK"):
                        print(f"[Client{self.id:02d}] 收到ACK #{self.recv_count}")
                except:
                    pass
    
    def stop_client(self):
        """停止客户端"""
        if not self.running:
            return
        
        # 尝试发送断开通知
        if self.connected and self.socket:
            try:
                disconnect_msg = f"BYE Client{self.id:02d}\r\n"
                self.socket.settimeout(0.1)
                self.socket.send(disconnect_msg.encode())
            except:
                pass
        
        # 关闭socket
        if self.socket:
            try:
                self.socket.close()
            except:
                pass
        
        if self.connected:
            stats.clients_connected -= 1
            self.connected = False
        
        # 计算统计信息
        avg_interval = 0
        send_rate = 0
        throughput = 0
        
        if len(self.send_times) > 1:
            total_time = self.send_times[-1] - self.send_times[0]
            avg_interval = total_time / (len(self.send_times) - 1)
            if total_time > 0:
                send_rate = self.sent_count / total_time
                throughput = self.bytes_sent / total_time
        
        # 根据测试模式显示不同统计信息
        if self.test_mode == 'long_term':
            print(f"[Client{self.id:02d}] 停止,运行{total_time:.0f}秒, 发送: {self.sent_count}, "
                  f"接收: {self.recv_count}, 平均间隔: {avg_interval:.2f}s, "
                  f"平均大小: {self.bytes_sent/self.sent_count if self.sent_count > 0 else 0:.0f}字节")
        elif self.test_mode == 'recovery':
            status = "重连成功" if self.reconnection_successful else "未重连"
            print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
                  f"重连尝试: {self.reconnect_attempts}, 状态: {status}")
        elif self.test_mode == 'mixed_load':
            print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
                  f"间隔: {self.message_interval}s, 大小: {self.message_size}字节, "
                  f"吞吐: {throughput/1024:.1f}KB/s")
        elif self.test_mode == 'high_freq':
            print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
                  f"速率: {send_rate:.1f}条/秒")
        elif self.test_mode == 'large_packet':
            print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
                  f"吞吐: {throughput/1024:.1f}KB/s")
        else:
            print(f"[Client{self.id:02d}] 停止,发送: {self.sent_count}, 接收: {self.recv_count}, "
                  f"间隔: {avg_interval:.2f}秒")
        
        self.running = False

# ==================== 主程序 ====================
def main_v35():
    print("W5500 Windows客户端 - V3.5 (增加长时间稳定性测试)")
    print("="*60)
    print(f"服务器: {CONFIG['server_ip']}:{CONFIG['server_port']}")
    
    # 选择测试模式
    print("\n选择测试模式:")
    print("1. 常规压力测试 (1秒/条, 64字节)")
    print("2. 高频消息测试 (0.2秒/条, 64字节)")
    print("3. 大数据包测试 (2秒/条, 1024字节)")
    print("4. 混合负载测试 (8种不同配置)")
    print("5. 异常恢复测试 (断开重连验证)")
    print("6. 长时间稳定性测试 (1小时随机负载)")
    
    choice = input("请输入选择 (1, 2, 3, 4, 5 或 6): ").strip()
    
    if choice == "2":
        CONFIG['test_mode'] = 'high_freq'
        print("\n⚠️  警告:高频测试将产生大量网络流量")
        print(f"   理论吞吐量: {TEST_MODES['high_freq']['client_count'] / TEST_MODES['high_freq']['message_interval']:.0f} 条/秒")
    elif choice == "3":
        CONFIG['test_mode'] = 'large_packet'
        print("\n⚠️  警告:大数据包测试将产生大量网络数据")
        print(f"   理论数据量: {TEST_MODES['large_packet']['client_count'] / TEST_MODES['large_packet']['message_interval'] * TEST_MODES['large_packet']['message_size']/1024:.1f} KB/s")
    elif choice == "4":
        CONFIG['test_mode'] = 'mixed_load'
        print("\n⚠️  警告:混合负载测试将模拟真实场景")
        print("   客户端配置:")
        for i, config in enumerate(TEST_MODES['mixed_load']['clients_config'], 1):
            print(f"     客户端{i}: 间隔{config['interval']}s, 大小{config['size']}字节")
    elif choice == "5":
        CONFIG['test_mode'] = 'recovery'
        print("\n⚠️  异常恢复测试 - 验证W5500断开重连能力")
        print("   测试阶段:")
        for phase in TEST_MODES['recovery']['test_phases']:
            print(f"     {phase['name']}: {phase['duration']}秒 ({phase['action']})")
        print("\n   测试流程:")
        print("     1. 正常连接8个客户端 (10秒)")
        print("     2. 随机断开2个客户端 (5秒)")
        print("     3. 尝试重连断开的客户端 (5秒)")
        print("     4. 验证所有客户端状态 (10秒)")
    elif choice == "6":
        CONFIG['test_mode'] = 'long_term'
        CONFIG['test_duration'] = TEST_MODES['long_term']['duration']
        print("\n⚠️  长时间稳定性测试 - 验证W5500持续运行能力")
        print(f"   测试时长: {TEST_MODES['long_term']['duration']/60:.0f}分钟")
        print(f"   间隔范围: {TEST_MODES['long_term']['interval_range'][0]}-{TEST_MODES['long_term']['interval_range'][1]}秒")
        print(f"   消息大小: {TEST_MODES['long_term']['size_range'][0]}-{TEST_MODES['long_term']['size_range'][1]}字节")
        print("\n   监控设置:")
        print(f"     内存检查: 每{TEST_MODES['long_term']['monitoring']['memory_check']}秒")
        print(f"     连接检查: 每{TEST_MODES['long_term']['monitoring']['connection_check']}秒")
        print("\n   ⚠️  建议在稳定的网络环境下进行此测试")
    else:
        CONFIG['test_mode'] = 'normal'
    
    test_config = TEST_MODES[CONFIG['test_mode']]
    stats.test_mode = CONFIG['test_mode']
    
    print(f"\n测试模式: {test_config['test_name']}")
    print(f"测试时长: {CONFIG['test_duration']}秒")
    
    if CONFIG['test_mode'] == 'mixed_load':
        print(f"客户端数: {test_config['client_count']} (8种不同配置)")
    elif CONFIG['test_mode'] == 'recovery':
        print(f"客户端数: {test_config['client_count']} (包含断开重连测试)")
        print(f"测试阶段: {len(test_config['test_phases'])}个阶段")
    elif CONFIG['test_mode'] == 'long_term':
        print(f"客户端数: {test_config['client_count']} (随机配置)")
        print(f"测试时长: {CONFIG['test_duration']/3600:.1f}小时")
    else:
        print(f"客户端数: {test_config['client_count']}")
        if 'message_interval' in test_config:
            print(f"消息间隔: {test_config['message_interval']}秒")
        if 'message_size' in test_config:
            print(f"消息大小: {test_config['message_size']}字节")
    
    print("="*60)
    
    if CONFIG['test_mode'] == 'long_term':
        confirm = input(f"\n⚠️  长时间测试将运行{CONFIG['test_duration']/3600:.1f}小时,确认开始?(y/n): ").strip().lower()
        if confirm != 'y':
            print("测试已取消")
            return
    
    input("按 Enter 开始测试...")
    
    # 创建客户端
    if CONFIG['test_mode'] == 'mixed_load':
        clients = []
        for i, client_config in enumerate(test_config['clients_config'], 1):
            clients.append(ClientV35(i, client_config))
    else:
        clients = [ClientV35(i+1) for i in range(test_config['client_count'])]
    
    # 启动客户端
    print(f"\n启动{len(clients)}个客户端...")
    test_start_time = time.time()
    stats.start_test()
    
    # 快速连续启动(微小延迟)
    for client in clients:
        client.start()
        time.sleep(0.1)  # 仅微小延迟,避免完全同时
    
    # 监控循环
    try:
        start_monitor_time = time.time()
        last_print_time = start_monitor_time
        last_sent_count = 0
        last_bytes_sent = 0
        last_memory_check = start_monitor_time
        last_connection_check = start_monitor_time
        
        while time.time() - test_start_time < CONFIG['test_duration']:
            active = sum(1 for c in clients if c.running)
            if active == 0:
                print("\n所有客户端已停止")
                break
            
            current_time = time.time()
            elapsed = current_time - test_start_time
            progress = min(100, (elapsed / CONFIG['test_duration']) * 100)
            
            # 计算实时统计
            total_sent = sum(c.sent_count for c in clients)
            total_recv = sum(c.recv_count for c in clients)
            total_bytes_sent = stats.total_bytes_sent
            
            # 长时间测试的特殊监控
            if CONFIG['test_mode'] == 'long_term':
                # 定期内存和连接检查
                if current_time - last_memory_check >= TEST_MODES['long_term']['monitoring']['memory_check']:
                    # 记录连接数
                    stats.record_long_term_stat('connections', active)
                    last_memory_check = current_time
                
                if current_time - last_connection_check >= TEST_MODES['long_term']['monitoring']['connection_check']:
                    # 记录吞吐量
                    throughput = total_bytes_sent / elapsed if elapsed > 0 else 0
                    stats.record_long_term_stat('throughput', throughput)
                    last_connection_check = current_time
            
            # 每2秒打印一次监控信息(长时间测试改为每10秒)
            monitor_interval = 10.0 if CONFIG['test_mode'] == 'long_term' else 2.0
            if current_time - last_print_time >= monitor_interval:
                # 计算瞬时速率
                time_diff = current_time - last_print_time
                sent_diff = total_sent - last_sent_count
                bytes_diff = total_bytes_sent - last_bytes_sent
                
                instant_rate = sent_diff / time_diff if time_diff > 0 else 0
                instant_throughput = bytes_diff / time_diff if time_diff > 0 else 0
                
                # 计算平均速率
                total_elapsed = current_time - start_monitor_time
                avg_rate = total_sent / total_elapsed if total_elapsed > 0 else 0
                avg_throughput = total_bytes_sent / total_elapsed if total_elapsed > 0 else 0
                
                # 显示监控信息
                if CONFIG['test_mode'] == 'long_term':
                    hours = elapsed / 3600
                    minutes = (elapsed % 3600) / 60
                    print(f"\r运行: {hours:.0f}:{minutes:02.0f} | 进度: {progress:.1f}% | 发送: {total_sent} | "
                          f"接收: {total_recv} | 活动: {active}/{len(clients)} | "
                          f"吞吐: {avg_throughput/1024:.1f}KB/s", end="")
                elif CONFIG['test_mode'] in ['mixed_load', 'large_packet', 'recovery']:
                    phase_info = f"阶段: {stats.current_phase} | " if CONFIG['test_mode'] == 'recovery' else ""
                    print(f"\r进度: {progress:.1f}% | {phase_info}发送: {total_sent} | 接收: {total_recv} | "
                          f"活动: {active}/{len(clients)} | 瞬时: {instant_throughput/1024:.1f}KB/s | "
                          f"平均: {avg_throughput/1024:.1f}KB/s", end="")
                else:
                    print(f"\r进度: {progress:.1f}% | 发送: {total_sent} | 接收: {total_recv} | "
                          f"活动: {active}/{len(clients)} | 瞬时: {instant_rate:.1f}条/秒 | "
                          f"平均: {avg_rate:.1f}条/秒", end="")
                
                last_print_time = current_time
                last_sent_count = total_sent
                last_bytes_sent = total_bytes_sent
            
            time.sleep(0.1)
        
        print("\n\n测试完成!")
        
    except KeyboardInterrupt:
        print("\n\n测试中断")
    finally:
        # 停止所有客户端
        print("\n停止客户端...")
        for client in clients:
            client.running = False
        
        for client in clients:
            if client.thread:
                client.thread.join(timeout=1)
        
        stats.end_test()
        display_results_v35(clients, test_config)

def display_results_v35(clients, test_config):
    print("\n" + "="*60)
    print(f"测试结果 - {test_config['test_name']}")
    print("="*60)
    
    duration = stats.get_duration()
    total_sent = sum(c.sent_count for c in clients)
    total_recv = sum(c.recv_count for c in clients)
    
    print(f"测试模式: {test_config['test_name']}")
    print(f"测试时长: {duration:.1f}秒 ({duration/3600:.2f}小时)")
    print(f"客户端数: {len(clients)}")
    print(f"成功连接: {stats.clients_connected}")
    print(f"发送消息: {total_sent}")
    print(f"接收消息: {total_recv}")
    print(f"发送字节: {stats.total_bytes_sent:,}")
    print(f"接收字节: {stats.total_bytes_received:,}")
    
    if total_sent > 0:
        response_rate = total_recv / total_sent * 100
        print(f"响应率: {response_rate:.1f}%")
    
    if duration > 0:
        avg_send_rate = total_sent / duration
        avg_recv_rate = total_recv / duration
        avg_throughput_send = stats.total_bytes_sent / duration
        avg_throughput_recv = stats.total_bytes_received / duration
        
        print(f"\n速率统计:")
        print(f"  平均发送速率: {avg_send_rate:.1f} 条/秒")
        print(f"  平均接收速率: {avg_recv_rate:.1f} 条/秒")
        print(f"  发送吞吐量: {avg_throughput_send/1024:.1f} KB/s")
        print(f"  接收吞吐量: {avg_throughput_recv/1024:.1f} KB/s")
        
        # 长时间测试额外统计
        if stats.test_mode == 'long_term':
            if stats.long_term_stats['connection_counts']:
                min_conn = min(c[1] for c in stats.long_term_stats['connection_counts'])
                max_conn = max(c[1] for c in stats.long_term_stats['connection_counts'])
                avg_conn = sum(c[1] for c in stats.long_term_stats['connection_counts']) / len(stats.long_term_stats['connection_counts'])
                print(f"\n连接稳定性:")
                print(f"  最小连接数: {min_conn}")
                print(f"  最大连接数: {max_conn}")
                print(f"  平均连接数: {avg_conn:.1f}")
            
            if stats.long_term_stats['throughput_history']:
                min_tp = min(t[1] for t in stats.long_term_stats['throughput_history'])
                max_tp = max(t[1] for t in stats.long_term_stats['throughput_history'])
                avg_tp = sum(t[1] for t in stats.long_term_stats['throughput_history']) / len(stats.long_term_stats['throughput_history'])
                print(f"\n吞吐量稳定性:")
                print(f"  最小吞吐量: {min_tp/1024:.1f} KB/s")
                print(f"  最大吞吐量: {max_tp/1024:.1f} KB/s")
                print(f"  平均吞吐量: {avg_tp/1024:.1f} KB/s")
        
        # 异常恢复测试额外统计
        if stats.test_mode == 'recovery':
            print(f"\n恢复统计:")
            print(f"  断开次数: {stats.disconnections}")
            print(f"  重连次数: {stats.reconnections}")
            print(f"  重连成功率: {stats.reconnections/(stats.disconnections or 1)*100:.1f}%")
        
        # 效率分析
        if stats.test_mode == 'high_freq':
            theoretical_rate = 1/test_config['message_interval'] * len(clients)
            efficiency = avg_send_rate / theoretical_rate * 100
            print(f"\n效率分析:")
            print(f"  理论吞吐量: {theoretical_rate:.1f} 条/秒")
            print(f"  实际吞吐量: {avg_send_rate:.1f} 条/秒")
            print(f"  效率: {efficiency:.1f}%")
        
        elif stats.test_mode == 'large_packet':
            theoretical_throughput = (1/test_config['message_interval'] * len(clients) * 
                                     test_config['message_size'])
            actual_throughput = stats.total_bytes_sent / duration
            efficiency = actual_throughput / theoretical_throughput * 100
            
            print(f"\n效率分析:")
            print(f"  理论吞吐量: {theoretical_throughput/1024:.1f} KB/s")
            print(f"  实际吞吐量: {actual_throughput/1024:.1f} KB/s")
            print(f"  效率: {efficiency:.1f}%")
        
        elif stats.test_mode == 'mixed_load':
            # 计算混合负载的理论吞吐量
            theoretical_throughput = 0
            for client in clients:
                theoretical_throughput += (1/client.message_interval * client.message_size)
            
            actual_throughput = stats.total_bytes_sent / duration
            efficiency = actual_throughput / theoretical_throughput * 100
            
            print(f"\n效率分析:")
            print(f"  理论吞吐量: {theoretical_throughput/1024:.1f} KB/s")
            print(f"  实际吞吐量: {actual_throughput/1024:.1f} KB/s")
            print(f"  效率: {efficiency:.1f}%")
    
    print(f"\n错误统计:")
    print(f"  连接错误: {stats.connection_errors}")
    print(f"  发送错误: {stats.send_errors}")
    print(f"  接收错误: {stats.receive_errors}")
    
    print(f"\n客户端详情:")
    client_stats = []
    for client in clients:
        if len(client.send_times) > 1:
            total_time = client.send_times[-1] - client.send_times[0]
            avg_interval = total_time / (len(client.send_times) - 1)
            send_rate = len(client.send_times) / total_time if total_time > 0 else 0
            throughput = client.bytes_sent / total_time if total_time > 0 else 0
        else:
            avg_interval = 0
            send_rate = 0
            throughput = 0
            
        client_stats.append({
            'id': client.id,
            'sent': client.sent_count,
            'recv': client.recv_count,
            'bytes_sent': client.bytes_sent,
            'interval': client.message_interval if stats.test_mode in ['mixed_load', 'recovery', 'long_term'] else avg_interval,
            'size': client.message_size if stats.test_mode in ['mixed_load', 'long_term'] else 0,
            'rate': send_rate,
            'throughput': throughput,
            'reconnect_attempts': client.reconnect_attempts if stats.test_mode in ['recovery', 'long_term'] else 0
        })
        
        if stats.test_mode == 'long_term':
            avg_size = client.bytes_sent / client.sent_count if client.sent_count > 0 else 0
            print(f"  客户端{client.id:02d}: 运行{total_time:.0f}秒, 发送{client.sent_count:6d}, "
                  f"接收{client.recv_count:6d}, 平均间隔{avg_interval:.2f}s, 平均大小{avg_size:.0f}字节")
        elif stats.test_mode == 'recovery':
            status = "重连成功" if client.reconnection_successful else "正常"
            print(f"  客户端{client.id:02d}: 发送{client.sent_count:4d}, 接收{client.recv_count:4d}, "
                  f"重连尝试: {client.reconnect_attempts}, 状态: {status}")
        elif stats.test_mode == 'mixed_load':
            print(f"  客户端{client.id:02d}: 间隔{client.message_interval}s, "
                  f"大小{client.message_size}字节, 发送{client.sent_count:4d}, "
                  f"接收{client.recv_count:4d}, 吞吐{throughput/1024:.1f}KB/s")
        elif stats.test_mode == 'high_freq':
            print(f"  客户端{client.id:02d}: 发送{client.sent_count:6d}, "
                  f"接收{client.recv_count:6d}, 速率{send_rate:.1f}条/秒")
        elif stats.test_mode == 'large_packet':
            print(f"  客户端{client.id:02d}: 发送{client.sent_count:4d}, "
                  f"接收{client.recv_count:4d}, 吞吐{throughput/1024:.1f}KB/s")
        else:
            print(f"  客户端{client.id:02d}: 发送{client.sent_count:4d}, "
                  f"接收{client.recv_count:4d}, 间隔{avg_interval:.2f}秒")
    
    # 计算均匀性
    if client_stats and stats.test_mode not in ['mixed_load', 'recovery', 'long_term']:
        sent_counts = [s['sent'] for s in client_stats]
        max_sent = max(sent_counts)
        min_sent = min(sent_counts)
        avg_sent = sum(sent_counts) / len(sent_counts)
        
        if avg_sent > 0:
            uniformity = (1 - (max_sent - min_sent) / avg_sent) * 100
            print(f"\n消息分发均匀性: {uniformity:.1f}%")
            print(f"  最多: {max_sent}, 最少: {min_sent}, 平均: {avg_sent:.1f}")
    
    print("="*60)
    
    # 保存测试结果
    save_test_results_v35(clients, test_config, duration)

def save_test_results_v35(clients, test_config, duration):
    """保存测试结果到文件"""
    try:
        result = {
            'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'test_mode': test_config['test_name'],
            'duration': duration,
            'clients': len(clients),
            'total_sent': sum(c.sent_count for c in clients),
            'total_received': sum(c.recv_count for c in clients),
            'total_bytes_sent': stats.total_bytes_sent,
            'total_bytes_received': stats.total_bytes_received,
            'connection_errors': stats.connection_errors,
            'send_errors': stats.send_errors,
            'receive_errors': stats.receive_errors,
            'disconnections': stats.disconnections,
            'reconnections': stats.reconnections,
            'client_details': []
        }
        
        if CONFIG['test_mode'] == 'mixed_load':
            result['clients_config'] = TEST_MODES['mixed_load']['clients_config']
        elif CONFIG['test_mode'] == 'recovery':
            result['test_phases'] = TEST_MODES['recovery']['test_phases']
        elif CONFIG['test_mode'] == 'long_term':
            result['monitoring'] = TEST_MODES['long_term']['monitoring']
            result['interval_range'] = TEST_MODES['long_term']['interval_range']
            result['size_range'] = TEST_MODES['long_term']['size_range']
            result['long_term_stats'] = stats.long_term_stats
        
        for client in clients:
            if len(client.send_times) > 1:
                total_time = client.send_times[-1] - client.send_times[0]
                send_rate = len(client.send_times) / total_time if total_time > 0 else 0
                throughput = client.bytes_sent / total_time if total_time > 0 else 0
            else:
                send_rate = 0
                throughput = 0
                
            client_detail = {
                'id': client.id,
                'sent': client.sent_count,
                'received': client.recv_count,
                'bytes_sent': client.bytes_sent,
                'send_rate': send_rate,
                'throughput': throughput
            }
            
            if CONFIG['test_mode'] == 'mixed_load':
                client_detail['interval'] = client.message_interval
                client_detail['message_size'] = client.message_size
            elif CONFIG['test_mode'] in ['recovery', 'long_term']:
                client_detail['reconnect_attempts'] = client.reconnect_attempts
                client_detail['reconnection_successful'] = client.reconnection_successful
                if CONFIG['test_mode'] == 'long_term':
                    client_detail['final_interval'] = client.message_interval
                    client_detail['final_message_size'] = client.message_size
            
            result['client_details'].append(client_detail)
        
        filename = f"w5500_{test_config['test_name']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(result, f, indent=2, ensure_ascii=False)
        
        print(f"\n测试结果已保存到: {filename}")
        
    except Exception as e:
        print(f"保存测试结果失败: {e}")

if __name__ == "__main__":
    main_v35()

W5500 TCP压力测试服务器 - V3.5 (增强socket回收版) socket回收仍有小问题需修改

"""
W5500 TCP压力测试服务器 - V3.5 (增强socket回收版)
基于V3.2成功经验,简化逻辑
"""

import time
import gc
import wiznet5k_socket as socket_module
try:
    import uasyncio as asyncio
except ImportError:
    import asyncio

# ==================== 服务器配置 ====================
SERVER_CONFIG = {
    'port': 9050,           # 服务器端口
    'max_clients': 8,       # 真正的8路并发
    'test_duration': 120,   # 测试时长
    'backlog': 10,          # 监听队列大小
    'heartbeat_interval': 8.0,
    'client_timeout': 10.0,  # 10秒超时
    'max_packet_size': 2048, # 支持最大2KB数据包
}

# ==================== 全局状态 ====================
class ServerStatsV35:
    """服务器统计信息"""
    def __init__(self):
        
        self.reset()
        self._lock = asyncio.Lock()

        self.start_time = 0
        self.connections = 0
        self.messages_received = 0
        self.messages_sent = 0
        self.bytes_received = 0
        self.bytes_sent = 0
        self.connection_errors = 0
        self.socket_alloc_errors = 0
        self.eighth_client_success = 0  # 第8路成功次数
        self.listen_socket_consumed = 0  # 监听socket被使用次数
        self.active_clients = 0
        self.client_handlers = {}
        self.bye_messages_received = 0  # 收到的BYE消息数
        self.large_packets_received = 0  # 大数据包计数
        self._lock = asyncio.Lock()

    def reset(self):
        """重置所有统计信息(服务器重启时调用)"""
        self.start_time = 0
        self.connections = 0
        self.messages_received = 0
        self.messages_sent = 0
        self.bytes_received = 0
        self.bytes_sent = 0
        self.connection_errors = 0
        self.socket_alloc_errors = 0
        self.eighth_client_success = 0  # 第8路成功次数
        self.listen_socket_consumed = 0  # 监听socket被使用次数
        self.active_clients = 0
        self.client_handlers = {}
        self.bye_messages_received = 0  # 收到的BYE消息数
        self.large_packets_received = 0  # 大数据包计数

    async def add_connection(self):
        async with self._lock:
            self.connections += 1
    
    async def add_message_received(self, bytes_count):
        async with self._lock:
            self.messages_received += 1
            self.bytes_received += bytes_count
            
            # 统计大数据包
            if bytes_count >= 512:  # 512字节以上算大数据包
                self.large_packets_received += 1
    
    async def add_message_sent(self, bytes_count):
        async with self._lock:
            self.messages_sent += 1
            self.bytes_sent += bytes_count
    
    async def add_connection_error(self):
        async with self._lock:
            self.connection_errors += 1
    
    async def add_socket_alloc_error(self):
        async with self._lock:
            self.socket_alloc_errors += 1
    
    async def add_eighth_client_success(self):
        async with self._lock:
            self.eighth_client_success += 1
    
    async def add_listen_socket_consumed(self):
        async with self._lock:
            self.listen_socket_consumed += 1
    
    async def add_bye_message(self):
        async with self._lock:
            self.bye_messages_received += 1
    
    async def client_connected(self, client_id, is_eighth_client=False):
        async with self._lock:
            self.active_clients += 1
            self.client_handlers[client_id] = {
                'connected_time': time.time(),
                'message_count': 0,
                'bytes_received': 0,
                'last_activity': time.time(),
                'is_eighth_client': is_eighth_client
            }
            
            if is_eighth_client:
                self.eighth_client_success += 1
    
    async def client_disconnected(self, client_id):
        async with self._lock:
            if client_id in self.client_handlers:
                self.active_clients -= 1
                # 保留记录用于统计,但标记为已断开
                self.client_handlers[client_id]['disconnected'] = True
                self.client_handlers[client_id]['disconnect_time'] = time.time()
    
    async def update_client_activity(self, client_id, message_size=0):
        async with self._lock:
            if client_id in self.client_handlers:
                self.client_handlers[client_id]['last_activity'] = time.time()
                if message_size > 0:
                    self.client_handlers[client_id]['message_count'] += 1
                    self.client_handlers[client_id]['bytes_received'] += message_size
    
    async def get_stats(self):
        async with self._lock:
            client_details = {}
            for client_id, info in self.client_handlers.items():
                client_details[client_id] = {
                    'uptime': time.time() - info['connected_time'],
                    'message_count': info['message_count'],
                    'bytes_received': info['bytes_received'],
                    'idle_time': time.time() - info['last_activity'],
                    'is_eighth_client': info.get('is_eighth_client', False),
                    'disconnected': info.get('disconnected', False)
                }
            
            return {
                'uptime': time.time() - self.start_time if self.start_time > 0 else 0,
                'connections': self.connections,
                'messages_received': self.messages_received,
                'messages_sent': self.messages_sent,
                'bytes_received': self.bytes_received,
                'bytes_sent': self.bytes_sent,
                'connection_errors': self.connection_errors,
                'socket_alloc_errors': self.socket_alloc_errors,
                'eighth_client_success': self.eighth_client_success,
                'listen_socket_consumed': self.listen_socket_consumed,
                'bye_messages_received': self.bye_messages_received,
                'large_packets_received': self.large_packets_received,
                'active_clients': self.active_clients,
                'client_details': client_details
            }

# 全局统计实例
stats = ServerStatsV35()

# 服务器运行状态
server_running = False

# W5500硬件限制:只有8个socket
MAX_W5500_SOCKETS = 8

# ==================== 优化的服务器协程 ====================
async def tcp_server_v35():
    """
    TCP服务器主协程(V3.5版 - 增强socket回收)
    基于V3.2成功经验
    """
    global server_running
    
    print("🚀 TCP压力测试服务器启动 (V3.5 - 增强socket回收版)")
    print("="*60)
    print(f"⚠️  W5500硬件限制: 最多{MAX_W5500_SOCKETS}个socket")
    print(f"🔥  核心功能: 第8路连接建立后,监听socket转为客户端socket")
    print("="*60)
    
    # 获取网络接口
    try:
        from main import global_vars
        nic = global_vars.get('nic')
    except:
        print("❌ 无法获取网络接口")
        return
    
    if not nic:
        print("❌ 网络接口不可用")
        return
    
    try:
        # 设置socket接口
        socket_module.set_interface(nic)
        server_ip = nic.pretty_ip(nic.ip_address)
        
        # 创建服务器socket
        server = socket_module.socket()
        server.settimeout(0.5)
        
        # 绑定端口
        port = SERVER_CONFIG['port']
        server.bind((server_ip, port))
        server.listen(SERVER_CONFIG['max_clients'])
        
        # 初始化统计
        stats.start_time = time.time()
        server_running = True
        
        print(f"✅ 服务器: {server_ip}:{port}")
        print(f"📡 最大连接: {SERVER_CONFIG['max_clients']}路 (真正的8路并发)")
        print(f"📦 最大包大小: {SERVER_CONFIG['max_packet_size']}字节")
        print(f"⏱️  测试时长: {SERVER_CONFIG['test_duration']}秒")
        print(f"初始监听socket编号: {server.socknum}")
        print("="*60)
        print("等待客户端连接...\n")
        
        # 启动监控协程
        asyncio.create_task(stats_monitor_v35())
        
        # 启动socket状态监控
        async def socket_recycle_monitor():
            """监控socket回收状态"""
            print("🔄 Socket回收监控启动")
            
            while server_running:
                await asyncio.sleep(10)  # 每10秒检查一次
                
                try:
                    # 打印当前socket使用情况
                    from main import global_vars
                    nic = global_vars.get('nic')
                    if nic and hasattr(nic, '_debug_sockets'):
                        print(f"🔄 Socket状态检查 ({time.time() - stats.start_time:.0f}秒):")
                        nic._debug_sockets()
                except:
                    pass

        asyncio.create_task(socket_recycle_monitor())
                
        client_id = 1
        listen_socket_active = True
        listen_socket_used_as_client = False
        
        # 主服务器循环
        while server_running:
            try:
                # 检查当前连接数
                stat_data = await stats.get_stats()
                active_clients = stat_data['active_clients']
                
                # 检查是否达到第8路情况
                if active_clients >= MAX_W5500_SOCKETS - 1:
                    if listen_socket_active and not listen_socket_used_as_client:
                        print(f"🔥 第8路情况:当前连接 {active_clients}/{MAX_W5500_SOCKETS}")
                
                # 只在需要监听时才监听
                if not listen_socket_active:
                    await asyncio.sleep(1.0)
                    continue
                
                try:
                    # 尝试接受新连接
                    result = server.accept()
                    
                    if result is not None:
                        client_sock, addr = result
                        
                        # 检查socket是否有效
                        if not hasattr(client_sock, 'connected') or not client_sock.connected:
                            print(f"[{client_id}] Socket无效,关闭")
                            try:
                                client_sock.close()
                            except:
                                pass
                            continue
                        
                        # 检查并处理第8路客户端
                        is_eighth_client = False
                        if active_clients >= MAX_W5500_SOCKETS - 1:
                            print(f"🎯 第{client_id}路客户端激活!")
                            
                            if active_clients == MAX_W5500_SOCKETS - 1:
                                is_eighth_client = True
                                listen_socket_active = False
                                listen_socket_used_as_client = True
                                await stats.add_listen_socket_consumed()
                                print(f"   ⚠️  监听socket已用于第{client_id}路客户端")
                        
                        # 创建客户端处理协程
                        asyncio.create_task(
                            handle_client_v35(client_id, client_sock, addr, is_eighth_client)
                        )
                        
                        print(f"[{client_id}] 已连接 (socket {client_sock.socknum}), " +
                              f"当前连接: {active_clients + 1}/{MAX_W5500_SOCKETS}")
                        
                        client_id += 1
                        
                        await asyncio.sleep(0.05)
                
                except Exception as e:
                    err_str = str(e)
                    
                    if "Failed to allocate socket" in err_str:
                        await stats.add_socket_alloc_error()
                        
                        print(f"⛔ W5500 socket分配失败: 所有{MAX_W5500_SOCKETS}个socket都在使用中")
                        print(f"   当前连接数: {active_clients}/{MAX_W5500_SOCKETS}")
                        
                        # 关键修复:等待并重试机制
                        print(f"   ⏳ 等待socket资源释放...")
                        for retry_count in range(3):  # 最多重试3次
                            try:
                                # 等待一段时间让socket完全释放
                                await asyncio.sleep(2.0)  # 每次等待2秒
                                
                                # 检查当前连接数
                                stat_data = await stats.get_stats()
                                current_active = stat_data['active_clients']
                                print(f"   重试 {retry_count+1}/3: 当前活动连接 {current_active}/{MAX_W5500_SOCKETS}")
                                
                                # 如果有socket释放了,尝试重新接受连接
                                if current_active < MAX_W5500_SOCKETS:
                                    print(f"   ✅ 检测到socket资源已释放,继续接受连接")
                                    break
                                    
                            except Exception as retry_err:
                                print(f"   重试过程中出错: {retry_err}")
                        
                        try:
                            status = server.status
                            print(f"   监听socket状态: 0x{status:02X}")
                            
                            if status == 0x17:  # 已建立连接
                                print(f"   ✅ 监听socket已建立连接(第{client_id}个客户端)")
                                
                                try:
                                    remote_ip = server.getpeername()
                                    print(f"   客户端地址: {remote_ip}")
                                    addr = remote_ip
                                except:
                                    print(f"   无法获取客户端地址,使用默认地址")
                                    addr = ("0.0.0.0", 0)
                                
                                listen_socket_active = False
                                listen_socket_used_as_client = True
                                await stats.add_listen_socket_consumed()
                                
                                is_eighth_client = (active_clients == MAX_W5500_SOCKETS - 1)
                                
                                asyncio.create_task(
                                    handle_client_v35(client_id, server, addr, is_eighth_client)
                                )
                                
                                if is_eighth_client:
                                    print(f"🎯 [{client_id}] 第8路客户端已连接 (使用监听socket)")
                                    await stats.add_eighth_client_success()
                                else:
                                    print(f"[{client_id}] 客户端已连接 (使用监听socket)")
                                
                                print(f"   ⚠️  监听socket已转为客户端socket")
                                
                                client_id += 1
                                
                            elif status == 0x13:  # 监听状态
                                print(f"   ⏳ 监听socket仍在监听状态")
                                await asyncio.sleep(0.5)
                                
                            else:
                                print(f"   ❓ 监听socket状态异常: 0x{status:02X}")
                                await asyncio.sleep(1.0)
                                
                        except Exception as status_e:
                            print(f"   状态检查失败: {status_e}")
                            await asyncio.sleep(1.0)
                    
                    elif "timeout" not in err_str:
                        print(f"接受连接错误: {e}")
                        await stats.add_connection_error()
                        await asyncio.sleep(0.5)
                
                await asyncio.sleep(0.01)
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                print(f"服务器循环错误: {e}")
                await asyncio.sleep(0.5)
        
        print("服务器主循环结束")
    
    except Exception as e:
        print(f"❌ 服务器启动失败: {e}")
        import sys
        sys.print_exception(e)
    
    finally:
        if server_running:
            await stop_server_v35()


# ==================== V3.5客户端处理器 ====================
async def handle_client_v35(client_id, client_sock, addr, is_eighth_client=False):
    """处理客户端连接(V3.5版)"""
    client_ip, client_port = addr if isinstance(addr, tuple) and len(addr) == 2 else ("0.0.0.0", 0)
    
    if is_eighth_client:
        print(f"🎯 [{client_id}] 第8路客户端连接: {client_ip}:{client_port} (使用原监听socket)")
    else:
        print(f"[{client_id}] 客户端连接: {client_ip}:{client_port}")
    
    await stats.client_connected(client_id, is_eighth_client)
    await stats.add_connection()
    
    try:
        # 发送欢迎消息
        if is_eighth_client:
            welcome = f"服务器客户端 #{client_id} (第8路特殊连接)\r\n>\r\n"
        else:
            welcome = f"服务器客户端 #{client_id} (8路压力测试)\r\n>\r\n"
        
        try:
            client_sock.settimeout(1.0)
            sent = client_sock.send(welcome.encode())
            if sent > 0:
                await stats.add_message_sent(sent)
        except Exception as send_err:
            print(f"[{client_id}] 发送欢迎消息失败: {send_err}")
        
        buffer = bytearray()
        last_heartbeat = time.time()
        message_count = 0
        client_active = True
        last_successful_recv = time.time()
        consecutive_timeouts = 0
        bye_received = False  # 标记是否收到BYE消息
        
        while server_running and client_active:
            current_time = time.time()
            
            # 更新客户端活动时间
            await stats.update_client_activity(client_id)
            
            # 检查客户端超时
            stat_data = await stats.get_stats()
            client_info = stat_data['client_details'].get(client_id, {})
            idle_time = client_info.get('idle_time', 0)
            
            if idle_time > SERVER_CONFIG['client_timeout']:
                print(f"[{client_id}] 客户端超时 (空闲{idle_time:.1f}秒)")
                break
            
            # 发送心跳
            if current_time - last_heartbeat > SERVER_CONFIG['heartbeat_interval']:
                try:
                    # 发送前检查socket状态
                    if hasattr(client_sock, 'status'):
                        heartbeat_status = client_sock.status
                        if heartbeat_status != 0x17:
                            print(f"[{client_id}] 发送心跳前socket状态异常: 0x{heartbeat_status:02X}")
                            client_active = False
                            break
                    
                    heartbeat = f"PING {time.ticks_ms()}\r\n"
                    client_sock.settimeout(0.5)
                    sent = client_sock.send(heartbeat.encode())
                    if sent > 0:
                        await stats.add_message_sent(sent)
                        print(f"💓 [{client_id}] 发送心跳包")
                    last_heartbeat = current_time
                except Exception as heartbeat_err:
                    print(f"[{client_id}] 发送心跳失败: {heartbeat_err}")
                    break

            # 接收数据
            try:
                client_sock.settimeout(0.5)
                
                # 新增:检查socket状态 BEFORE 尝试接收
                if hasattr(client_sock, 'status'):
                    status = client_sock.status
                    if status != 0x17:  # 不是ESTABLISHED状态
                        print(f"[{client_id}] ⚠️ 接收前检查: socket状态异常 0x{status:02X}")
                        
                        # 如果是可恢复状态,尝试修复
                        if status in [0x1C, 0x22]:  # UDP或CLOSE_WAIT状态
                            try:
                                print(f"[{client_id}] 尝试重置异常socket状态...")
                                # 先尝试优雅关闭
                                client_sock.close()
                                await asyncio.sleep(0.1)
                                # 然后立即重新创建socket(如果可能)
                                # 但这里我们先标记断开
                                client_active = False
                                break
                            except:
                                client_active = False
                                break
                
                if hasattr(client_sock, 'available') and client_sock.available() > 0:
                    # 动态调整接收大小 - 增加限制
                    available_bytes = client_sock.available()
                    
                    # 关键修复:限制单次接收大小,避免缓冲区溢出
                    max_safe_recv = min(
                        SERVER_CONFIG['max_packet_size'] * 2,
                        available_bytes,
                        1024  # 新增:硬性限制,避免过大读取
                    )
                    
                    if max_safe_recv > 0:
                        data = client_sock.recv(max_safe_recv)
                        
                        if data:
                            buffer.extend(data)
                            message_count += 1
                            consecutive_timeouts = 0
                            last_successful_recv = current_time
                            
                            await stats.add_message_received(len(data))
                            await stats.update_client_activity(client_id, len(data))
                            
                            # 处理完整的消息
                            while b'\r\n' in buffer:
                                idx = buffer.index(b'\r\n')
                                message = bytes(buffer[:idx])
                                buffer = buffer[idx+2:]
                                
                                if message:
                                    try:
                                        msg_str = message.decode('utf-8', errors='ignore').strip()
                                        
                                        # 检查是否是心跳响应
                                        if msg_str.startswith("PONG") or msg_str.startswith("PING"):
                                            print(f"💓 [{client_id}] 收到心跳响应: {msg_str[:30]}...")
                                            continue
                                        
                                        # 检查是否是BYE消息
                                        bye_keywords = ["BYE", "bye", "Bye", "goodbye", "Goodbye"]
                                        if any(keyword in msg_str for keyword in bye_keywords):
                                            print(f"👋 [{client_id}] 收到断开通知: {msg_str}")
                                            await stats.add_bye_message()
                                            bye_received = True
                                            client_active = False
                                            break
                                        
                                        # 检查是否是ACK响应
                                        if msg_str.startswith("ACK"):
                                            print(f"✓ [{client_id}] 收到ACK: {msg_str[:30]}...")
                                            continue
                                        
                                        # 普通消息显示
                                        if len(message) >= 512:
                                            print(f"[{client_id}] 收到#{message_count}: {len(message)}字节 (大数据包)")
                                        elif message_count % 10 == 0:
                                            if len(msg_str) > 30:
                                                msg_str = msg_str[:30] + "..."
                                            print(f"[{client_id}] 收到#{message_count}: {msg_str}")
                                        
                                    except:
                                        # 二进制数据,只显示大小
                                        if len(message) >= 512:
                                            print(f"[{client_id}] 收到#{message_count}: {len(message)}字节 (二进制大数据包)")
                                        elif message_count % 10 == 0:
                                            print(f"[{client_id}] 收到#{message_count}: {len(message)}字节 (二进制数据)")
                                    
                                    # 发送ACK响应 - 增加错误处理
                                    try:
                                        response = f"ACK {len(message)} {time.ticks_ms()}\r\n".encode()
                                        # 发送前检查socket状态
                                        if hasattr(client_sock, 'status'):
                                            send_status = client_sock.status
                                            if send_status == 0x17:
                                                client_sock.send(response)
                                                await stats.add_message_sent(len(response))
                                            else:
                                                print(f"[{client_id}] 发送ACK时socket状态异常: 0x{send_status:02X}")
                                                client_active = False
                                                break
                                        else:
                                            client_sock.send(response)
                                            await stats.add_message_sent(len(response))
                                    except Exception as ack_err:
                                        print(f"[{client_id}] 发送ACK失败: {ack_err}")
                                        client_active = False
                                        break
                    else:
                        # 没有数据可读
                        consecutive_timeouts += 1
                        
                        # 新增:长时间无数据时检查socket健康状态
                        if consecutive_timeouts > 10:
                            try:
                                if hasattr(client_sock, 'status'):
                                    status = client_sock.status
                                    if status != 0x17:
                                        print(f"[{client_id}] 长时间无数据且状态异常: 0x{status:02X}")
                                        client_active = False
                                        break
                            except:
                                pass
                        
                        if consecutive_timeouts > 30:  # 15秒没有数据
                            print(f"[{client_id}] 连续超时次数过多,断开连接")
                            client_active = False
                            break
                            
                        await asyncio.sleep(0.1)
                        
                else:
                    # 没有可用数据
                    consecutive_timeouts += 1
                    if consecutive_timeouts > 10:
                        await asyncio.sleep(0.2)  # 增加等待时间
                    else:
                        await asyncio.sleep(0.1)
                    
            except Exception as recv_err:
                err_str = str(recv_err)
                if "timeout" not in err_str:
                    print(f"[{client_id}] 接收错误: {recv_err}")
                    client_active = False
            
            await asyncio.sleep(0.01)
        
        # 客户端断开连接
        if bye_received:
            if is_eighth_client:
                print(f"🎯 [{client_id}] 第8路客户端正常断开 (收到BYE),接收消息: {message_count}条")
            else:
                print(f"[{client_id}] 客户端正常断开 (收到BYE),接收消息: {message_count}条")
        else:
            if is_eighth_client:
                print(f"🎯 [{client_id}] 第8路客户端异常断开,接收消息: {message_count}条")
            else:
                print(f"[{client_id}] 客户端异常断开,接收消息: {message_count}条")
        
    except Exception as e:
        print(f"[{client_id}] 客户端处理异常: {e}")
        import sys
        sys.print_exception(e)

    finally:
        # 清理资源 - 增强版socket回收
        try:
            if hasattr(client_sock, 'socknum'):
                socknum = client_sock.socknum
                print(f"[{client_id}] 开始深度清理socket {socknum}...")
                
                # 第一步:获取当前状态
                if hasattr(client_sock, 'status'):
                    status = client_sock.status
                    print(f"[{client_id}] socket {socknum} 当前状态: 0x{status:02X}")
                
                # 第二步:尝试优雅关闭
                try:
                    # 先尝试shutdown(如果支持)
                    if hasattr(client_sock, 'shutdown'):
                        client_sock.shutdown(2)  # SHUT_RDWR
                        await asyncio.sleep(0.05)
                except:
                    pass
                
                # 第三步:正式关闭socket
                for attempt in range(3):  # 尝试3次确保关闭
                    try:
                        client_sock.close()
                        await asyncio.sleep(0.1)  # 关键:给W5500硬件足够时间
                        
                        # 检查是否真的关闭了
                        if hasattr(client_sock, 'connected'):
                            if not client_sock.connected:
                                print(f"[{client_id}] socket {socknum} 第{attempt+1}次关闭成功")
                                break
                            else:
                                print(f"[{client_id}] socket {socknum} 仍显示连接中,继续关闭...")
                        else:
                            print(f"[{client_id}] socket {socknum} 关闭完成")
                            break
                            
                    except Exception as close_err:
                        print(f"[{client_id}] socket {socknum} 关闭出错: {close_err}")
                
                # 第四步:强制释放资源(如果可用)
                try:
                    # 如果socket有reset方法,调用它
                    if hasattr(client_sock, 'reset'):
                        client_sock.reset()
                        print(f"[{client_id}] socket {socknum} 已重置")
                except:
                    pass
                
                print(f"[{client_id}] socket {socknum} 深度清理完成")
                
        except Exception as final_err:
            print(f"[{client_id}] 清理过程中出错: {final_err}")
        
        await stats.client_disconnected(client_id)

# ==================== 统计监控协程 ====================
async def stats_monitor_v35():
    """统计信息监控协程(V3.5版)"""
    print("📊 统计监控启动 (V3.5)")
    
    last_print_time = 0
    
    while server_running:
        current_time = time.time()
        
        if current_time - last_print_time >= 15.0:
            stat_data = await stats.get_stats()
            uptime = stat_data['uptime']
            
            print("\n" + "="*60)
            print("服务器统计 (V3.5 - 增强socket回收版)")
            print("="*60)
            print(f"运行时间: {uptime:.1f}秒")
            print(f"连接总数: {stat_data['connections']}")
            print(f"活动连接: {stat_data['active_clients']}/{MAX_W5500_SOCKETS}")
            print(f"接收消息: {stat_data['messages_received']}")
            print(f"发送消息: {stat_data['messages_sent']}")
            print(f"接收字节: {stat_data['bytes_received']:,}")
            print(f"发送字节: {stat_data['bytes_sent']:,}")
            
            if stat_data['bytes_received'] > 0 and uptime > 0:
                throughput = stat_data['bytes_received'] / uptime
                print(f"接收吞吐: {throughput/1024:.1f} KB/s")
            
            print(f"大数据包: {stat_data['large_packets_received']}")
            print(f"第8路成功: {stat_data['eighth_client_success']}次")
            print(f"监听socket被用: {stat_data['listen_socket_consumed']}次")
            print(f"BYE消息收到: {stat_data['bye_messages_received']}")
            
            if stat_data['client_details']:
                print("\n客户端详情:")
                for client_id, details in sorted(stat_data['client_details'].items()):
                    eighth_mark = "🎯 " if details.get('is_eighth_client') else "  "
                    status_mark = "❌" if details.get('disconnected') else "✅"
                    bytes_per_msg = details['bytes_received'] / details['message_count'] if details['message_count'] > 0 else 0
                    print(f"  {eighth_mark}{status_mark}客户端{client_id}: " +
                          f"运行{details['uptime']:.0f}秒, " +
                          f"消息{details['message_count']}条, " +
                          f"{details['bytes_received']}字节, " +
                          f"平均{bytes_per_msg:.0f}字节/条")
            
            try:
                free_mem = gc.mem_free()
                alloc_mem = gc.mem_alloc()
                total_mem = free_mem + alloc_mem
                print(f"\n内存使用: {alloc_mem/(1024):.1f}KB / {total_mem/(1024):.1f}KB " +
                      f"({alloc_mem/total_mem*100:.1f}%)")
            except:
                pass
            
            print("="*60)
            
            last_print_time = current_time
        
        await asyncio.sleep(2)

# ==================== 服务器控制函数 ====================
async def stop_server_v35():
    """停止服务器(V3.5版)"""
    global server_running
    
    if not server_running:
        return
    
    print("\n🛑 停止服务器...")
    print("开始清理资源...")
    
    # 标记服务器停止
    server_running = False
    
    # 给客户端处理协程时间完成
    await asyncio.sleep(3)  # 增加到3秒,确保所有socket都关闭
    
    # 强制垃圾回收
    gc.collect()
    
    # 关键修复:等待W5500硬件资源完全释放
    print(f"⏳ 等待W5500硬件资源释放...")
    await asyncio.sleep(2.0)  # 额外等待2秒
    
    print(f"✅ 内存清理完成: {gc.mem_free()/1024:.1f}KB 可用")
    
    stat_data = await stats.get_stats()
    
    print("\n" + "="*60)
    print("最终统计 (V3.5测试)")
    print("="*60)
    print(f"运行时间: {stat_data['uptime']:.1f}秒")
    print(f"连接总数: {stat_data['connections']}")
    print(f"最大并发: {MAX_W5500_SOCKETS}路")
    print(f"接收消息: {stat_data['messages_received']:,}")
    print(f"发送消息: {stat_data['messages_sent']:,}")
    print(f"接收字节: {stat_data['bytes_received']:,}")
    print(f"发送字节: {stat_data['bytes_sent']:,}")
    
    if stat_data['bytes_received'] > 0 and stat_data['uptime'] > 0:
        throughput = stat_data['bytes_received'] / stat_data['uptime']
        print(f"平均吞吐: {throughput/1024:.1f} KB/s")
    
    print(f"大数据包: {stat_data['large_packets_received']}")
    print(f"第8路成功: {stat_data['eighth_client_success']}次")
    print(f"监听socket被用: {stat_data['listen_socket_consumed']}次")
    print(f"BYE消息收到: {stat_data['bye_messages_received']}")
    
    if stat_data['eighth_client_success'] > 0:
        print(f"\n✅ 成功实现了真正的8路并发!")
        print(f"   第8个客户端使用了监听socket")
    else:
        print(f"\n⚠️  未触发第8路情况")
    
    print("="*60)
    print("\nV3.5测试完成!")

# ==================== 主入口函数 ====================
def main_v35():
    """主函数(V3.5版)"""
    global server_running
    
    print("TCP压力测试服务器 - V3.5 (增强socket回收版)")
    print("="*60)
    print("修复内容:")
    print("1. 手动Ctrl+C退出(无时间限制)")
    print("2. 修复BYE消息统计")
    print("3. 增强socket关闭和回收机制")
    print("4. 增加连接失败重试")
    print("="*60)
    
    # 重置所有状态
    stats.reset()
    server_running = False
    
    # 强制垃圾回收
    gc.collect()
    
    # 关键修复:等待之前的socket完全释放
    print(f"⏳ 等待任何未释放的socket资源...")
    import time as sys_time
    sys_time.sleep(1.0)  # 同步等待1秒
    
    print(f"✅ 统计信息已重置")
    print(f"✅ 内存清理完成: {gc.mem_free()/1024:.1f}KB 可用")
    print("="*60)
    
    try:
        loop = asyncio.get_event_loop()
        
        # 启动服务器协程
        loop.create_task(tcp_server_v35())
        
        print("✅ 服务器已启动")
        print("⏰ 运行时间: 无限制 (Ctrl+C退出)")
        print("="*60)
        
        # 移除自动停止功能,改为无限运行
        loop.run_forever()
        
    except KeyboardInterrupt:
        print("\n👤 用户中断 (Ctrl+C)")
    except Exception as e:
        print(f"❌ 错误: {e}")
        import sys
        sys.print_exception(e)
    finally:
        if server_running:
            try:
                loop.run_until_complete(stop_server_v35())
            except:
                pass
        
        # 关键修复:服务器完全停止后等待
        print(f"\n⏳ 服务器停止完成,等待3秒确保资源释放...")
        import time as sys_time
        sys_time.sleep(3.0)

if __name__ == "__main__":
    main_v35()

测试结果:

[Client07] 收到ACK #1700
[Client03] 配置变化: 间隔0.8s, 大小242字节
运行: 1:59 | 进度: 98.3% | 发送: 10848 | 接收: 14006 | 活动: 8/8 | 吞吐: 1.6KB/s[Client06] 已发送 1300 条, 运行3541秒, 速率: 0.4条/秒
[Client06] 收到ACK #1700
[Client01] 收到ACK #1750
运行: 1:59 | 进度: 98.5% | 发送: 10900 | 接收: 14065 | 活动: 8/8 | 吞吐: 1.6KB/s[Client05] 收到ACK #1650
[Client04] 配置变化: 间隔2.5s, 大小462字节
运行: 1:59 | 进度: 98.8% | 发送: 10949 | 接收: 14125 | 活动: 8/8 | 吞吐: 1.6KB/s[Client05] 配置变化: 间隔3.8s, 大
小335字节
[Client01] 配置变化: 间隔4.9s, 大小807字节
[Client02] 配置变化: 间隔1.7s, 大小256字节
[Client06] 配置变化: 间隔4.9s, 大小289字节
[Client07] 配置变化: 间隔0.6s, 大小262字节
[Client08] 配置变化: 间隔1.3s, 大小393字节
[Client03] 配置变化: 间隔0.8s, 大小344字节
运行: 1:60 | 进度: 99.4% | 发送: 11026 | 接收: 14216 | 活动: 8/8 | 吞吐: 1.6KB/s[Client03] 已发送 1400 条, 运行3583秒, 速率: 0.4条/秒
[Client03] 收到ACK #1800
[Client04] 配置变化: 间隔3.0s, 大小845字节
[Client04] 已发送 1400 条, 运行3584秒, 速率: 0.4条/秒
[Client04] 收到ACK #1800
运行: 1:60 | 进度: 99.7% | 发送: 11060 | 接收: 14260 | 活动: 8/8 | 吞吐: 1.6KB/s[Client05] 配置变化: 间隔3.4s, 大
小566字节
[Client02] 配置变化: 间隔4.9s, 大小981字节
[Client01] 配置变化: 间隔4.2s, 大小904字节
[Client06] 配置变化: 间隔0.7s, 大小817字节
[Client07] 配置变化: 间隔2.3s, 大小428字节
[Client08] 配置变化: 间隔2.5s, 大小653字节
[Client02] 收到ACK #1900
[Client03] 配置变化: 间隔2.2s, 大小560字节
运行: 1:60 | 进度: 99.9% | 发送: 11094 | 接收: 14302 | 活动: 8/8 | 吞吐: 1.6KB/s

测试完成!

停止客户端...

============================================================
测试结果 - 长时间稳定性测试
============================================================
测试模式: 长时间稳定性测试
测试时长: 3601.0秒 (1.00小时)
客户端数: 8
成功连接: 8
发送消息: 11100
接收消息: 14312
发送字节: 5,990,242
接收字节: 247,866
响应率: 128.9%

速率统计:
  平均发送速率: 3.1 条/秒
  平均接收速率: 4.0 条/秒
  发送吞吐量: 1.6 KB/s
  接收吞吐量: 0.1 KB/s

连接稳定性:
  最小连接数: 8
  最大连接数: 8
  平均连接数: 8.0

吞吐量稳定性:
  最小吞吐量: 1.6 KB/s
  最大吞吐量: 2.1 KB/s
  平均吞吐量: 1.7 KB/s

错误统计:
  连接错误: 0
  发送错误: 0
连接稳定性:
  最小连接数: 8
  最大连接数: 8
  平均连接数: 8.0

吞吐量稳定性:
  最小吞吐量: 1.6 KB/s
  最大吞吐量: 2.1 KB/s
  平均吞吐量: 1.7 KB/s

错误统计:
  连接错误: 0
  发送错误: 0

吞吐量稳定性:
  最小吞吐量: 1.6 KB/s
  最大吞吐量: 2.1 KB/s
  平均吞吐量: 1.7 KB/s

错误统计:
  连接错误: 0
  发送错误: 0
  平均吞吐量: 1.7 KB/s

错误统计:
  连接错误: 0
  发送错误: 0
错误统计:
  连接错误: 0
  发送错误: 0
  发送错误: 0
  接收错误: 0

客户端详情:
  客户端01: 运行3597秒, 发送  1373, 接收  1775, 平均间隔2.62s, 平均大小554字节
  客户端02: 运行3596秒, 发送  1499, 接收  1901, 平均间隔2.40s, 平均大小504字节
  客户端03: 运行3598秒, 发送  1412, 接收  1814, 平均间隔2.55s, 平均大小535字节
  客户端04: 运行3597秒, 发送  1404, 接收  1805, 平均间隔2.56s, 平均大小508字节
  客户端05: 运行3597秒, 发送  1271, 接收  1672, 平均间隔2.83s, 平均大小549字节
  客户端06: 运行3599秒, 发送  1327, 接收  1729, 平均间隔2.71s, 平均大小568字节
  客户端07: 运行3599秒, 发送  1347, 接收  1748, 平均间隔2.67s, 平均大小547字节
  客户端08: 运行3598秒, 发送  1467, 接收  1868, 平均间隔2.45s, 平均大小557字节
============================================================

测试结果已保存到: w5500_长时间稳定性测试_20260203_152849.json
(.venv) PS D:\python-ljs\.venv>

============================================================
服务器统计 (V3.5 - 增强socket回收版)
============================================================
运行时间: 4222.0秒
连接总数: 8
活动连接: 0/8
接收消息: 11100
发送消息: 14307
接收字节: 5,990,242
发送字节: 247,911
接收吞吐: 1.4 KB/s
大数据包: 5664
第8路成功: 2次
监听socket被用: 1次
BYE消息收到: 0

客户端详情:
    ❌客户端1: 运行4200秒, 消息1373条, 760840字节, 平均554字节/条
    ❌客户端2: 运行4200秒, 消息1499条, 755658字节, 平均504字节/条
    ❌客户端3: 运行4200秒, 消息1412条, 755826字节, 平均535字节/条
    ❌客户端4: 运行4200秒, 消息1271条, 697871字节, 平均549字节/条
    ❌客户端5: 运行4199秒, 消息1347条, 736391字节, 平均547字节/条
    ❌客户端6: 运行4199秒, 消息1404条, 712799字节, 平均508字节/条
    ❌客户端7: 运行4199秒, 消息1327条, 753507字节, 平均568字节/条
  🎯 ❌客户端8: 运行4197秒, 消息1467条, 817350字节, 平均557字节/条

内存使用: 133.8KB / 8058.0KB (1.7%)
============================================================

============================================================
服务器统计 (V3.5 - 增强socket回收版)
============================================================
运行时间: 4238.0秒
连接总数: 8
活动连接: 0/8
接收消息: 11100
发送消息: 14307
接收字节: 5,990,242
发送字节: 247,911
接收吞吐: 1.4 KB/s
大数据包: 5664
第8路成功: 2次
监听socket被用: 1次
BYE消息收到: 0

客户端详情:
    ❌客户端1: 运行4216秒, 消息1373条, 760840字节, 平均554字节/条
    ❌客户端2: 运行4216秒, 消息1499条, 755658字节, 平均504字节/条
    ❌客户端3: 运行4216秒, 消息1412条, 755826字节, 平均535字节/条
    ❌客户端4: 运行4216秒, 消息1271条, 697871字节, 平均549字节/条
    ❌客户端5: 运行4215秒, 消息1347条, 736391字节, 平均547字节/条
    ❌客户端6: 运行4215秒, 消息1404条, 712799字节, 平均508字节/条
    ❌客户端7: 运行4215秒, 消息1327条, 753507字节, 平均568字节/条
  🎯 ❌客户端8: 运行4213秒, 消息1467条, 817350字节, 平均557字节/条

内存使用: 133.8KB / 8058.0KB (1.7%)
============================================================
截图不及时导致显示的客户端都离线了。


第四部分:一些概念的反思
W5500硬件特殊性:

8个物理Socket硬件实现。W5500芯片内置8个独立的物理Socket(编号0-7) 每个Socket独立工作,有自己的寄存器组。硬件限制:最大并发连接数 = 8(包括监听Socket)。

Socket状态机严格 # W5500 Socket状态编码 SNSR_SOCK_CLOSED = 0x00 # 关闭 SNSR_SOCK_INIT = 0x13 # 初始化 SNSR_SOCK_LISTEN = 0x14 # 监听 SNSR_SOCK_ESTABLISHED = 0x17 # 已连接 SNSR_SOCK_UDP = 0x22 # UDP模式

监听与数据Socket分离 监听Socket(LISTEN状态)专门用于接受连接 数据Socket(ESTABLISHED状态)专门用于数据传输 accept()返回的是新的数据Socket

三次握手正确完成:CLOSED → INIT → LISTEN → SYNRECV → ESTABLISHED

Socket编号交换机制:监听socket和数据socket正确分离 多端口监听支持:可同时监听多个端口(受限于8个物理socket) Socket管理相关注意事项 Socket编号交换机制 accept() 方法会交换socket编号

监听socket使用新分配的编号继续监听 数据socket使用原来的编号进行通信

如果多次调用accept(),socket编号会不断变化

思考:后面一段的“Socket管理相关注意事项”与前面“监听与数据Socket分离”不同,到底哪种是正确的?
 

实际情况分析

1. W5500硬件限制(正确)

  • 8个物理Socket硬件实现(Socket 0-7)

  • 每个Socket在同一时刻只能处于一种状态

  • 无法同时既是LISTEN又是ESTABLISHED

2. 代码中的实现机制

在您的代码中(accept()方法部分):

python

def accept(self):
    # ... 等待连接建立 ...
    
    # 关键代码:
    new_listen_socknum, addr = _the_interface.socket_accept(self.socknum)
    current_socknum = self.socknum
    
    # 创建新socket对象
    client_sock = socket()
    client_sock._socknum = current_socknum  # 原来的监听socket编号给客户端
    self._socknum = new_listen_socknum      # 新分配的socket编号给自己(继续监听)
    
    # 重新绑定和监听
    self.bind((None, self._listen_port))
    self.listen()

3. 两种描述的对比

描述1("监听与数据Socket分离") ❌ 部分正确但表述不准确

  • 正确点:确实有监听Socket(LISTEN状态)和数据Socket(ESTABLISHED状态)

  • 不准确点:不是"分离",而是状态转换和编号交换

描述2("Socket编号交换机制") ✅ 更准确

  • 精确描述了代码的实际行为

  • accept()确实执行了socket编号交换

  • 原来的监听socket变成了数据socket给客户端使用

  • 分配的新socket用于继续监听

4. 实际的工作流程

text

初始状态:
服务器: Socket 2 (状态: LISTEN, 端口: 8080)

客户端连接 -> 
W5500硬件: 分配新的Socket 3来处理连接
           Socket 3进入SYNRECV/ESTABLISHED状态

accept()执行:
1. 获取新Socket编号(3)和客户端地址
2. 创建client_sock对象,使用原来的Socket 2编号
3. 服务器socket对象使用新的Socket 3编号
4. Socket 3重新绑定并进入LISTEN状态
5. 客户端通过Socket 2进行通信

结果:
client_sock: Socket 2 (状态: ESTABLISHED) -> 与客户端通信
服务器socket: Socket 3 (状态: LISTEN) -> 继续接受新连接

5. 总结

  • 硬件层面:W5500每个Socket在同一时刻只能有一种状态

  • 代码层面:通过编号交换实现了"一个socket对象支持持续监听"

  • 实际效果:用户感知上是同一个socket在持续监听,但底层硬件socket编号已经发生了变化

最准确的说法应该是:
"W5500的监听机制通过socket编号交换实现,accept()方法会将当前监听socket转为客户端连接socket,并分配一个新的socket用于继续监听。"

Logo

智能硬件社区聚焦AI智能硬件技术生态,汇聚嵌入式AI、物联网硬件开发者,打造交流分享平台,同步全国赛事资讯、开展 OPC 核心人才招募,助力技术落地与开发者成长。

更多推荐