SSE协议
目录
- SSE协议
- 协议实现
- 传输格式
- data 字段
- id 字段
- event 字段
- retry 字段
- 前后端实现
- 使用案例
- FastAPI + SSE-STARLETTE 模拟大模型推理流
- 🖥 代码:FastAPI + SSE-STARLETTE 模拟大模型推理流
SSE协议
SSE,全称是 Server-Sent Events,是一种 服务器主动推送消息 到浏览器(客户端)的一种通信协议,基于 HTTP 单向流。
简单理解就是:
-
客户端发起一个普通的 HTTP 请求(通常是 GET)。
-
服务器保持这个连接不断开,持续地、实时地往客户端推送数据(类似实时通知、消息推送)。
-
客户端收到数据后可以及时处理显示。
主要特点
-
单向通信:服务器 → 客户端(客户端不能主动通过这个连接回传数据,只能发起新请求)。
-
基于文本格式,数据流以 text/event-stream 的 MIME 类型传输。
-
轻量、简单,不需要像 WebSocket 那样升级协议。
-
自动重连(浏览器原生支持,断了会自动重连)。
-
有序(服务器推送的消息默认是顺序到达的)。
同为浏览器推送技术,相较于 WebSocket 而言,Server-Sent Events (简称SSE)更少被人知晓,具体实践也较少。
原因有两点:
- WebSocket 比 SSE 更强大,Websocket 在客户端和服务器之间建立了双向的实时通信。而 SSE 只支持从服务器到客户端的单向实时通信。
- WebSocket 在浏览器方面支持更广(详见下图),IE / Edge 几乎根本不支持 SSE。
然而,就第一点而言,与 WebSocket 相比,SSE 也有独特的优势。
- SSE 的浏览器端实现内置断线重连和消息追踪的功能,WebSocket 也能实现,但是不在协议设计范围内,需要手动处理。
- SSE 实现简单,完全复用现有的 HTTP 协议,而 WebSocket 是相对独立于 HTTP 的一套标准,跨平台实现较为复杂。
协议实现
SSE 协议很简单,本质上是一个客户端发起的 HTTP Get 请求,服务器在接到该请求后,返回 200 OK 状态,同时附带以下 Headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
- SSE 的 MIME Type 规定为 text/event-stream
- SSE 肯定不允许缓存
- SSE 是一个一直打开的 TCP 连接,所以 Connection 为 Keep-Alive
传输格式
每一条消息长这样:
data: 这是推送的数据
id: 1234
event: message
retry: 10000
- data: 是发送的内容(可以多行)
- id: 是消息 ID(浏览器会记下来,用于断线续传)
- event: 可以指定事件类型(配合前端 addEventListener 监听不同事件)
每条消息 以两个换行符(\n\n)结尾,标志一条消息结束。
data 字段
数据内容用data字段表示。
data: message\n\n
如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。
data: begin message\n
data: continue message\n\n
下面是一个发送 JSON 数据的例子。
data: {\n
data: "foo": "bar",\n
data: "baz", 555\n
data: }\n\n
id 字段
数据标识符用id字段表示,相当于每一条数据的编号。
id: msg1\n
data: message\n\n
浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。
event 字段
event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。
event: foo\n
data: a foo event\n\ndata: an unnamed event\n\nevent: bar\n
data: a bar event\n\n
上面的代码创造了三条信息。第一条的名字是foo,触发浏览器的foo事件;第二条未取名,表示默认类型,触发浏览器的message事件;第三条是bar,触发浏览器的bar事件。
- 在 SSE 协议规范(WHATWG EventSource spec)里,
event:
是一个合法的标准字段。 - 但
event:
后面的内容(事件名)是可以随便取的,你可以叫update
、new-message
、heartbeat
、anything-you-want
。 - 客户端可以通过
addEventListener("事件名", handler)
来分别监听不同类型的事件。
也就是说,SSE协议只规定了格式,但没有限制你具体的 event 名字。
服务器发送的数据:
event: user-message
data: {"user":"Alice","msg":"Hi there"}event: system-alert
data: {"level":"warning","message":"Server is hot"}event: heartbeat
data: ping
前端可以这样监听不同的事件:
const es = new EventSource("/stream");// 监听普通消息
es.addEventListener("user-message", e => {console.log("收到用户消息:", e.data);
});// 监听系统警告
es.addEventListener("system-alert", e => {console.warn("系统警告:", e.data);
});// 监听心跳
es.addEventListener("heartbeat", e => {console.log("心跳包:", e.data);
});// 监听默认事件(没有 event: 字段时)
es.onmessage = e => {console.log("默认消息:", e.data);
};
- 如果服务器没有指定
event: xxx
,那浏览器默认就是message
事件(onmessage
触发)。 - 如果有指定
event: xxx
,就要用addEventListener("xxx", handler)
来监听。 event:
字段必须在data:
字段前面,否则它只对下一条消息生效。
retry 字段
服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。
retry: 10000\n
两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。
前后端实现
前端代码,使用浏览器原生提供的方法即可:
const url = '/xx/xxx'
// 1. 创建实例
var source = new EventSource(url)// 2. 事件监听
// 建立连接后,触发`open` 事件
source.addEventListener('open', (e) => {console.log('open', e)
})
// 收到消息,触发`message` 事件
source.addEventListener('message', (e) => {console.log('message', e)
})
// 发生错误,触发`error` 事件
source.addEventListener('error', (e) => {console.log('error', e)
})
// 自定义事件
source.addEventListener('eventName', (e) => {// ...
}, false)// 3. 关闭链接
source.close()
上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。
var source = new EventSource(url, { withCredentials: true });
EventSource实例的readyState属性,表明连接的当前状态。该属性只读,可以取以下值。
0:相当于常量EventSource.CONNECTING,表示连接还未建立,或者断线正在重连。
1:相当于常量EventSource.OPEN,表示连接已经建立,可以接受数据。
2:相当于常量EventSource.CLOSED,表示连接已断,且不会重连。
后端相对简单:
package mainimport ("fmt""net/http""time"
)func sseHandler(w http.ResponseWriter, r *http.Request) {// 设置必要的 Headerw.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")w.Header().Set("Access-Control-Allow-Origin", "*") // 跨域支持(如果有需要)// 确保支持 Flushflusher, ok := w.(http.Flusher)if !ok {http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)return}// 这里是你的推送逻辑:每秒发一条消息ticker := time.NewTicker(1 * time.Second)defer ticker.Stop()// 如果客户端断开,这里可以检测ctx := r.Context()for {select {case <-ctx.Done():fmt.Println("客户端断开连接")returncase t := <-ticker.C:// 注意!每条消息需要以两个换行符 \n\n 结尾fmt.Fprintf(w, "event: tick\n")fmt.Fprintf(w, "data: %s\n\n", t.Format(time.RFC3339))// 刷新到客户端flusher.Flush()}}
}func main() {http.HandleFunc("/sse", sseHandler)fmt.Println("SSE 服务启动在 http://localhost:8080/sse")http.ListenAndServe(":8080", nil)
}
对于go也有对应的三方包,
使用非常简单:
import "github.com/gin-contrib/sse"func httpHandler(w http.ResponseWriter, req *http.Request) {// data can be a primitive like a string, an integer or a floatsse.Encode(w, sse.Event{Event: "message",Data: "some data\nmore data",})// also a complex type, like a map, a struct or a slicesse.Encode(w, sse.Event{Id: "124",Event: "message",Data: map[string]interface{}{"user": "manu","date": time.Now().Unix(),"content": "hi!",},})
}
为什么选择 gin-contrib/sse
?
-
与 Gin 的无缝集成
gin-contrib/sse
是 Gin 官方维护的中间件之一,专为与 Gin 框架协作而设计。它与 Gin 的上下文 (*gin.Context
) 紧密结合,简化了 SSE 的实现过程。 -
简化的 API
该库提供了c.SSEvent(event string, data interface{})
方法,允许开发者轻松发送事件数据,无需手动设置响应头或处理连接管理。 -
自动处理连接生命周期
gin-contrib/sse
自动处理连接的打开和关闭,减少了开发者需要关注的细节。 -
支持事件 ID 和重连机制
该库支持设置事件 ID 和重连时间,符合 SSE 的标准规范,增强了消息的可靠性和客户端的容错能力。 -
广泛的社区支持
作为 Gin 官方提供的中间件之一,gin-contrib/sse
拥有广泛的社区支持和文档资源,易于学习和使用。
以下是使用 gin-contrib/sse
实现 SSE 的示例:
package mainimport ("github.com/gin-gonic/gin""github.com/gin-contrib/sse"
)func main() {r := gin.Default()r.GET("/events", func(c *gin.Context) {c.Stream(func(w io.Writer) bool {sse.Encode(w, sse.Event{Event: "message",Data: "Hello, SSE!",})return true})})r.Run(":8080")
}
与原生 Go SSE 的对比
特性 | 原生 Go SSE 实现 | gin-contrib/sse |
---|---|---|
与框架集成 | 需要手动设置响应头和连接管理 | 与 Gin 无缝集成,简化实现 |
API 简洁性 | 需要手动编码事件格式 | 提供 c.SSEvent 等简洁方法 |
连接生命周期管理 | 需要手动管理连接的打开和关闭 | 自动处理连接的打开和关闭 |
事件 ID 和重连 | 需要手动实现 | 内置支持事件 ID 和重连机制 |
社区支持 | 取决于使用的库 | 作为 Gin 官方中间件,拥有广泛的社区支持 |
使用案例
FastAPI + SSE-STARLETTE 模拟大模型推理流
在使用 ChatGPT 时,发现输入 prompt 后,页面是逐步给出回复的,起初以为使用了 WebSckets 持久化连接协议,查看其网络请求,发现这个接口的通信方式并非传统的 http 接口或者 WebSockets,而是基于 EventStream 的事件流,像打字机一样,一段一段的返回答案。
ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢的多,普通 http 接口等待时间过长,显然并不合适。对于这种单项对话场景,ChagtGPT 将先计算出的数据“推送”给用户,边计算边返回,避免用户因为等待时间过长关闭页面。而这,可以采用 SSE 技术。
而现在很多大模型 API 服务(像 OpenAI 的 ChatGPT-API、各种 LLMs 推理服务)基本上都是:
- 后端 Python
- 框架FastAPI(因为支持 ASGI,可以异步高效处理流式返回)
- 返回SSE流
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import datetimeapp = FastAPI()async def event_generator():while True:# 等待1秒await asyncio.sleep(1)# 当前时间now = datetime.datetime.now().isoformat()# 注意SSE格式要求两个 \n\n 结尾yield f"event: tick\ndata: {now}\n\n"@app.get("/sse")
async def sse_endpoint(request: Request):# 检测客户端断开连接async def server_sent_events():async for event in event_generator():if await request.is_disconnected():print("客户端断开连接")breakyield eventreturn StreamingResponse(server_sent_events(),media_type="text/event-stream")
Python 生态里还提供了专门的 SSE 辅助库,最有名的是:
库名 | 说明 |
---|---|
sse-starlette | 基于 Starlette(FastAPI的底层框架),专门为 FastAPI/FastASGI 写的 SSE 工具。 |
flask-sse | 专门给 Flask 用户用的,封装了 Redis PubSub,适合广播场景。 |
python-sse | 一个小型独立库,纯粹处理 SSE 协议格式,不依赖具体 Web 框架。 |
🔥 举个 sse-starlette
用法示范(适合 FastAPI)
首先安装:
pip install sse-starlette
然后代码很简单:
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import datetimeapp = FastAPI()async def event_publisher():while True:await asyncio.sleep(1)yield {"event": "tick","data": datetime.datetime.now().isoformat()}@app.get("/sse")
async def sse():return EventSourceResponse(event_publisher())
🔥 这个库帮你自动做了什么?
功能 | 原本要手动做的事 |
---|---|
帮你正确格式化 event: 、data: 、\n\n | 你自己就不用手动 yield f"event: xx\ndata: yy\n\n" |
自动设置 Content-Type: text/event-stream | |
支持 request.is_disconnected() 检测 | 防止死循环 |
支持传 retry: 字段(控制断线重连时间) | |
支持 id: 字段(让前端从上次断开位置继续接收) |
基本就是 开箱即用,专门为 SSE 而生,而且和 FastAPI 非常搭配。✨
🖥 代码:FastAPI + SSE-STARLETTE 模拟大模型推理流
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import randomapp = FastAPI()# 模拟的 LLM 推理,每次产出一个 "token"
async def fake_llm_stream(prompt: str):fake_tokens = ["Hello", ",", " this", " is", " a", " simulated", " response", ".", " Thank", " you", " for", " using", " our", " AI", " model", "!"]for token in fake_tokens:# 每隔随机 100~400ms 推一个 tokenawait asyncio.sleep(random.uniform(0.1, 0.4))yield {"event": "token", # 自定义事件名:token"data": token}# 推送一个结束标志(可以不推)yield {"event": "end","data": "[DONE]"}@app.get("/chat/stream")
async def chat_stream(prompt: str):# 每次访问 /chat/stream?prompt=xxx,就返回一个 Streaming Responsereturn EventSourceResponse(fake_llm_stream(prompt))
🛠 前端测试 HTML
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>LLM Stream Demo</title>
</head>
<body><h1>LLM 流式输出 Demo</h1><div id="output" style="white-space: pre-wrap; font-family: monospace;"></div><script>const prompt = "你好,请介绍一下自己";const es = new EventSource(`http://localhost:8000/chat/stream?prompt=${encodeURIComponent(prompt)}`);es.addEventListener("token", (e) => {document.getElementById("output").textContent += e.data;});es.addEventListener("end", (e) => {console.log("流式输出结束:", e.data);es.close();});es.onerror = (e) => {console.error("连接出错", e);es.close();};</script>
</body>
</html>
🔥 效果
- 你一打开页面,它就调用
/chat/stream
。 - 后端像大模型那样一块块流式返回 Token。
- 浏览器前端实时接收、一字字拼出来!
- 最后收到
event: end
,自动关闭连接。
就跟你用 OpenAI ChatCompletion stream=True
一模一样的体验!🎯
📦 依赖安装
别忘了装必要依赖哦:
pip install fastapi sse-starlette uvicorn
然后运行:
uvicorn your_file_name:app --reload
(记得把 your_file_name
换成你保存的 Python 文件名)