Python中可实现流式下发的 StreamingResponse 和 EventSourceResponse 异同对比,及各自适用场景
逐块发送二进制数据(如文件下载)或自定义文本流。
流式响应在大模型服务中可大大提高用户体验,在Python 中主要有两种方式实现流式响应,即fastapi 的 StreamingResponse 和 SEE 模块的 EventSourceResponse,既然两者都可以实现流式响应,那么我们在实际应用中应该如何选择呢?
接下来我们就来详细分析一下二者的异同及适用场景。
一、核心相同点
1. 流式传输能力
- 均支持异步生成器 (
async generator),实现数据分块(chunked)传输。 - 适用于需要实时、持续下发数据的场景(如实时监控、大文件下载、消息推送)。
2. 长连接特性
- 保持 HTTP 连接长时间开放,持续发送数据,而非一次性返回完整响应。
- 可在客户端和服务器之间建立持久化通信通道。
3. 异步兼容性
- 完美适配 FastAPI 异步框架,能够结合
await和异步 I/O 操作。 - 避免阻塞事件循环,支持高并发场景。
二、核心不同点
|
特性 |
|
|
|---|---|---|
|
协议规范/标准 |
通用 HTTP 流式传输 |
严格遵循 Server-Sent Events (SSE) 协议 |
|
数据格式要求 |
无格式限制(二进制/文本均可) |
必须符合 SSE 消息格式规范( |
|
客户端交互方式 |
需要手动处理分块数据(如 Fetch API 流式读取) |
浏览器原生支持( |
|
Content-Type |
需手动指定(如 |
固定为 |
| 消息结构 | 无结构化要求 |
支持 |
|
连接管理 |
传输完成即关闭 |
默认长连接,需手动终止或处理断开重连逻辑 |
| 典型应用场景 |
大文件下载、视频流、日志流 |
实时通知、股票行情、在线聊天消息推送 |
三、技术实现对比
1. 服务端数据格式示例
StreamingResponse:自定义格式
适用场景:逐块发送二进制数据(如文件下载)或自定义文本流。
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
# 示例1:实时日志流(纯文本流,无结构要求)
async def log_generator():
for i in range(5):
yield f"Log entry {i}\n"
await asyncio.sleep(1)
@app.get("/logs_stream")
async def stream_logs():
return StreamingResponse(
log_generator(),
media_type="text/plain"
)
# 示例2:大文件分块下载(二进制)
async def file_chunker(file_path: str):
with open(file_path, "rb") as f:
while chunk := f.read(1024*1024): # 1MB chunks
yield chunk
@app.get("/download-large-file")
async def download_large_file():
return StreamingResponse(
file_chunker("bigfile.zip"),
media_type="application/octet-stream",
headers={"Content-Disposition": "attachment; filename=bigfile.zip"}
)
EventSourceResponse:SSE 标准格式
适用场景:需要浏览器自动解析的实时事件推送。
from sse_starlette.sse import EventSourceResponse
import datetime
async def sse_news_generator():
news_items = ["News 1", "News 2", "Breaking News"]
for news in news_items:
# SSE 格式要求:data字段 + 空行分隔
yield {
"event": "update",
"data": json.dumps({
"time": datetime.datetime.now().isoformat(),
"content": news
}),
"retry": 3000 # 客户端重试时间(毫秒)
}
await asyncio.sleep(2)
@app.get("/news-stream")
async def news_stream():
return EventSourceResponse(sse_news_generator())
2. 客户端处理方式
StreamingResponse(需手动解析)
// 浏览器端使用 Fetch API 处理流
const response = await fetch('/logs_stream');
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log(new TextDecoder().decode(value));
}
EventSourceResponse(自动解析)
// 浏览器端使用 EventSource API
## 方案一
const eventSource = new EventSource('/news-stream');
eventSource.onmessage = (e) => {
console.log("Received:", e.data); // 自动解析 data 字段
};
##方案二
const eventSource = new EventSource('/news-stream');
eventSource.addEventListener('update', (e) => {
const data = JSON.parse(e.data);
console.log('Received news:', data.content);
});
四、使用场景差异
StreamingResponse 适用场景
-
文件下载/上传:
- 分块传输大型文件(如视频、数据集)。
- 支持断点续传(通过
Range请求头)。
-
媒体流传输:
- 实时音视频流(如监控摄像头数据)。
- 动态生成的二进制数据流(如实时录屏)。
-
日志流式输出:
- 服务器实时日志推送(如 CI/CD 构建日志)。
- 命令行工具实时输出。
EventSourceResponse 适用场景
-
实时事件推送:
- 股票行情更新、体育比赛比分实时推送。
- 用户通知(如邮件到达提醒、系统报警)。
-
在线聊天/协作:
- 聊天消息实时广播。
- 文档协同编辑(如多人同时编辑时的状态同步)。
-
长轮询替代方案:
- 需要服务器主动推送数据的场景(替代 HTTP 轮询)。
- 客户端自动重连支持(浏览器内置机制)。
五、高级特性对比
|
高级功能 |
|
|
|---|---|---|
|
断点续传支持 |
✅ (需自定义 |
❌ (SSE 协议不支持) |
|
消息重连机制 |
❌ (需手动实现) |
✅ (浏览器自动重连,支持 |
|
事件类型过滤 |
❌ |
✅ (通过 |
|
心跳保活 |
需手动发送空数据 |
支持发送注释行(如 |
六、高级配置与注意事项
StreamingResponse 高级配置
- 分块编码控制:默认启用
Transfer-Encoding: chunked,可关闭:StreamingResponse(..., chunked=False) - 自定义响应头:如实现断点续传(
Content-Range):headers={"Content-Range": "bytes 0-1023/2048"}
2. EventSourceResponse 高级功能
- 事件类型过滤:客户端可监听特定事件:
python:
yield {"event": "alert", "data": "Critical update!"}
javascript
eventSource.addEventListener('alert', (e) => { ... });
- 连接保活:发送注释行保持连接:
yield ": keep-alive\n\n"
3. 通用注意事项
- 生成器退出逻辑:确保在客户端断开时清理资源:
async def generator(): try: while True: yield data await asyncio.sleep(1) except asyncio.CancelledError: print("客户端断开连接") # 执行清理操作 - 错误处理:捕获生成器内的异常,返回错误信息
async def safe_generator():
try:
async for chunk in risky_source():
yield chunk
except Exception as e:
yield f"Error occurred: {str(e)}"
七、性能优化建议
StreamingResponse
-
分块大小调优:
- 二进制传输时,根据带宽调整块大小(如 256KB ~ 1MB)。
- 文本流可适当减少块大小(如 4KB)。
-
启用压缩:对文本流启用gzip 传输:
from fastapi.middleware.gzip import GZipMiddleware app.add_middleware(GZipMiddleware, minimum_size=1024) # 对大于 1KB 的数据启用压缩
EventSourceResponse
1.减少序列化开销:
直接生成符合 SSE 格式的字符串,避免多次序列化:
# 优化前(低效)
yield {"data": json.dumps(message)}
# 优化后(高效)
yield f"data: {json.dumps(message)}\n\n"
2.心跳保活:防止代理服务器超时断开:
async def generator():
while True:
yield ":ping\n\n" # 发送心跳注释行
await asyncio.sleep(15)
八、总结与选型建议
- StreamingResponse:万能流式传输工具,适用于非结构化数据流,需手动处理协议细节。
- EventSourceResponse:专为 SSE 设计,简化实时事件推送,自动处理协议格式,与浏览器 EventSource API 深度集成。
-
选择
StreamingResponse:- 需要传输二进制数据或自定义文本流。
- 协议无格式要求,需高度灵活性。
- 客户端需手动处理流(如前端实现分片下载)。
-
选择
EventSourceResponse:- 需浏览器原生支持实时事件推送。
- 需自动重连、消息结构化和多事件类型。
- 符合 SSE 协议规范的服务端主动推送场景。
根据具体需求的数据格式、协议兼容性和客户端实现复杂度,合理选择以优化开发效率和性能。
更多推荐



所有评论(0)