当前位置: 首页 > news >正文

SSE协议

目录

  • SSE协议
    • 协议实现
    • 传输格式
      • data 字段
      • id 字段
      • event 字段
      • retry 字段
    • 前后端实现
    • 使用案例
      • FastAPI + SSE-STARLETTE 模拟大模型推理流
        • 🖥 代码:FastAPI + SSE-STARLETTE 模拟大模型推理流

SSE协议

在这里插入图片描述

SSE,全称是 Server-Sent Events,是一种 服务器主动推送消息 到浏览器(客户端)的一种通信协议,基于 HTTP 单向流。

简单理解就是:

  • 客户端发起一个普通的 HTTP 请求(通常是 GET)。

  • 服务器保持这个连接不断开,持续地、实时地往客户端推送数据(类似实时通知、消息推送)。

  • 客户端收到数据后可以及时处理显示。

主要特点

  • 单向通信:服务器 → 客户端(客户端不能主动通过这个连接回传数据,只能发起新请求)。

  • 基于文本格式,数据流以 text/event-stream 的 MIME 类型传输。

  • 轻量、简单,不需要像 WebSocket 那样升级协议。

  • 自动重连(浏览器原生支持,断了会自动重连)。

  • 有序(服务器推送的消息默认是顺序到达的)。

同为浏览器推送技术,相较于 WebSocket 而言,Server-Sent Events (简称SSE)更少被人知晓,具体实践也较少。

原因有两点:

  • WebSocket 比 SSE 更强大,Websocket 在客户端和服务器之间建立了双向的实时通信。而 SSE 只支持从服务器到客户端的单向实时通信。
  • WebSocket 在浏览器方面支持更广(详见下图),IE / Edge 几乎根本不支持 SSE。

然而,就第一点而言,与 WebSocket 相比,SSE 也有独特的优势。

  • SSE 的浏览器端实现内置断线重连和消息追踪的功能,WebSocket 也能实现,但是不在协议设计范围内,需要手动处理。
  • SSE 实现简单,完全复用现有的 HTTP 协议,而 WebSocket 是相对独立于 HTTP 的一套标准,跨平台实现较为复杂。

协议实现

SSE 协议很简单,本质上是一个客户端发起的 HTTP Get 请求,服务器在接到该请求后,返回 200 OK 状态,同时附带以下 Headers:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
  • SSE 的 MIME Type 规定为 text/event-stream
  • SSE 肯定不允许缓存
  • SSE 是一个一直打开的 TCP 连接,所以 Connection 为 Keep-Alive

传输格式

每一条消息长这样:

data: 这是推送的数据
id: 1234
event: message
retry: 10000
  • data: 是发送的内容(可以多行)
  • id: 是消息 ID(浏览器会记下来,用于断线续传)
  • event: 可以指定事件类型(配合前端 addEventListener 监听不同事件)

每条消息 以两个换行符(\n\n)结尾,标志一条消息结束。

data 字段

数据内容用data字段表示。

data:  message\n\n

如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。

data: begin message\n
data: continue message\n\n

下面是一个发送 JSON 数据的例子。

data: {\n
data: "foo": "bar",\n
data: "baz", 555\n
data: }\n\n

id 字段

数据标识符用id字段表示,相当于每一条数据的编号。

id: msg1\n
data: message\n\n

浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。

event 字段

event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。

event: foo\n
data: a foo event\n\ndata: an unnamed event\n\nevent: bar\n
data: a bar event\n\n

上面的代码创造了三条信息。第一条的名字是foo,触发浏览器的foo事件;第二条未取名,表示默认类型,触发浏览器的message事件;第三条是bar,触发浏览器的bar事件。

  • 在 SSE 协议规范(WHATWG EventSource spec)里,event: 是一个合法的标准字段。
  • event: 后面的内容(事件名)是可以随便取的,你可以叫 updatenew-messageheartbeatanything-you-want
  • 客户端可以通过 addEventListener("事件名", handler) 来分别监听不同类型的事件。

也就是说,SSE协议只规定了格式,但没有限制你具体的 event 名字

服务器发送的数据:

event: user-message
data: {"user":"Alice","msg":"Hi there"}event: system-alert
data: {"level":"warning","message":"Server is hot"}event: heartbeat
data: ping

前端可以这样监听不同的事件:

const es = new EventSource("/stream");// 监听普通消息
es.addEventListener("user-message", e => {console.log("收到用户消息:", e.data);
});// 监听系统警告
es.addEventListener("system-alert", e => {console.warn("系统警告:", e.data);
});// 监听心跳
es.addEventListener("heartbeat", e => {console.log("心跳包:", e.data);
});// 监听默认事件(没有 event: 字段时)
es.onmessage = e => {console.log("默认消息:", e.data);
};
  • 如果服务器没有指定 event: xxx,那浏览器默认就是 message 事件(onmessage 触发)。
  • 如果有指定 event: xxx,就要用 addEventListener("xxx", handler) 来监听。
  • event: 字段必须在 data: 字段前面,否则它只对下一条消息生效。

retry 字段

服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。

retry: 10000\n

两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。

前后端实现

前端代码,使用浏览器原生提供的方法即可:

const url = '/xx/xxx'
// 1. 创建实例
var source = new EventSource(url)// 2. 事件监听
// 建立连接后,触发`open` 事件
source.addEventListener('open', (e) => {console.log('open', e)
})
// 收到消息,触发`message` 事件
source.addEventListener('message', (e) => {console.log('message', e)
})
// 发生错误,触发`error` 事件
source.addEventListener('error', (e) => {console.log('error', e)
})
// 自定义事件
source.addEventListener('eventName', (e) => {// ...
}, false)// 3. 关闭链接
source.close()

上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。

var source = new EventSource(url, { withCredentials: true });

EventSource实例的readyState属性,表明连接的当前状态。该属性只读,可以取以下值。

0:相当于常量EventSource.CONNECTING,表示连接还未建立,或者断线正在重连。
1:相当于常量EventSource.OPEN,表示连接已经建立,可以接受数据。
2:相当于常量EventSource.CLOSED,表示连接已断,且不会重连。

后端相对简单:

package mainimport ("fmt""net/http""time"
)func sseHandler(w http.ResponseWriter, r *http.Request) {// 设置必要的 Headerw.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")w.Header().Set("Access-Control-Allow-Origin", "*") // 跨域支持(如果有需要)// 确保支持 Flushflusher, ok := w.(http.Flusher)if !ok {http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)return}// 这里是你的推送逻辑:每秒发一条消息ticker := time.NewTicker(1 * time.Second)defer ticker.Stop()// 如果客户端断开,这里可以检测ctx := r.Context()for {select {case <-ctx.Done():fmt.Println("客户端断开连接")returncase t := <-ticker.C:// 注意!每条消息需要以两个换行符 \n\n 结尾fmt.Fprintf(w, "event: tick\n")fmt.Fprintf(w, "data: %s\n\n", t.Format(time.RFC3339))// 刷新到客户端flusher.Flush()}}
}func main() {http.HandleFunc("/sse", sseHandler)fmt.Println("SSE 服务启动在 http://localhost:8080/sse")http.ListenAndServe(":8080", nil)
}

对于go也有对应的三方包,

在这里插入图片描述
使用非常简单:

import "github.com/gin-contrib/sse"func httpHandler(w http.ResponseWriter, req *http.Request) {// data can be a primitive like a string, an integer or a floatsse.Encode(w, sse.Event{Event: "message",Data:  "some data\nmore data",})// also a complex type, like a map, a struct or a slicesse.Encode(w, sse.Event{Id:    "124",Event: "message",Data: map[string]interface{}{"user":    "manu","date":    time.Now().Unix(),"content": "hi!",},})
}

为什么选择 gin-contrib/sse

  1. 与 Gin 的无缝集成
    gin-contrib/sse 是 Gin 官方维护的中间件之一,专为与 Gin 框架协作而设计。它与 Gin 的上下文 (*gin.Context) 紧密结合,简化了 SSE 的实现过程。

  2. 简化的 API
    该库提供了 c.SSEvent(event string, data interface{}) 方法,允许开发者轻松发送事件数据,无需手动设置响应头或处理连接管理。

  3. 自动处理连接生命周期
    gin-contrib/sse 自动处理连接的打开和关闭,减少了开发者需要关注的细节。

  4. 支持事件 ID 和重连机制
    该库支持设置事件 ID 和重连时间,符合 SSE 的标准规范,增强了消息的可靠性和客户端的容错能力。

  5. 广泛的社区支持
    作为 Gin 官方提供的中间件之一,gin-contrib/sse 拥有广泛的社区支持和文档资源,易于学习和使用。

以下是使用 gin-contrib/sse 实现 SSE 的示例:

package mainimport ("github.com/gin-gonic/gin""github.com/gin-contrib/sse"
)func main() {r := gin.Default()r.GET("/events", func(c *gin.Context) {c.Stream(func(w io.Writer) bool {sse.Encode(w, sse.Event{Event: "message",Data:  "Hello, SSE!",})return true})})r.Run(":8080")
}

与原生 Go SSE 的对比

特性原生 Go SSE 实现gin-contrib/sse
与框架集成需要手动设置响应头和连接管理与 Gin 无缝集成,简化实现
API 简洁性需要手动编码事件格式提供 c.SSEvent 等简洁方法
连接生命周期管理需要手动管理连接的打开和关闭自动处理连接的打开和关闭
事件 ID 和重连需要手动实现内置支持事件 ID 和重连机制
社区支持取决于使用的库作为 Gin 官方中间件,拥有广泛的社区支持

使用案例

FastAPI + SSE-STARLETTE 模拟大模型推理流

在使用 ChatGPT 时,发现输入 prompt 后,页面是逐步给出回复的,起初以为使用了 WebSckets 持久化连接协议,查看其网络请求,发现这个接口的通信方式并非传统的 http 接口或者 WebSockets,而是基于 EventStream 的事件流,像打字机一样,一段一段的返回答案。

ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢的多,普通 http 接口等待时间过长,显然并不合适。对于这种单项对话场景,ChagtGPT 将先计算出的数据“推送”给用户,边计算边返回,避免用户因为等待时间过长关闭页面。而这,可以采用 SSE 技术。

在这里插入图片描述

而现在很多大模型 API 服务(像 OpenAI 的 ChatGPT-API、各种 LLMs 推理服务)基本上都是:

  • 后端 Python
  • 框架FastAPI(因为支持 ASGI,可以异步高效处理流式返回)
  • 返回SSE流
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import datetimeapp = FastAPI()async def event_generator():while True:# 等待1秒await asyncio.sleep(1)# 当前时间now = datetime.datetime.now().isoformat()# 注意SSE格式要求两个 \n\n 结尾yield f"event: tick\ndata: {now}\n\n"@app.get("/sse")
async def sse_endpoint(request: Request):# 检测客户端断开连接async def server_sent_events():async for event in event_generator():if await request.is_disconnected():print("客户端断开连接")breakyield eventreturn StreamingResponse(server_sent_events(),media_type="text/event-stream")

Python 生态里还提供了专门的 SSE 辅助库,最有名的是:

库名说明
sse-starlette基于 Starlette(FastAPI的底层框架),专门为 FastAPI/FastASGI 写的 SSE 工具。
flask-sse专门给 Flask 用户用的,封装了 Redis PubSub,适合广播场景。
python-sse一个小型独立库,纯粹处理 SSE 协议格式,不依赖具体 Web 框架。

🔥 举个 sse-starlette 用法示范(适合 FastAPI)

首先安装:

pip install sse-starlette

然后代码很简单:

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import datetimeapp = FastAPI()async def event_publisher():while True:await asyncio.sleep(1)yield {"event": "tick","data": datetime.datetime.now().isoformat()}@app.get("/sse")
async def sse():return EventSourceResponse(event_publisher())

🔥 这个库帮你自动做了什么?

功能原本要手动做的事
帮你正确格式化 event:data:\n\n你自己就不用手动 yield f"event: xx\ndata: yy\n\n"
自动设置 Content-Type: text/event-stream
支持 request.is_disconnected() 检测防止死循环
支持传 retry: 字段(控制断线重连时间)
支持 id: 字段(让前端从上次断开位置继续接收)

基本就是 开箱即用,专门为 SSE 而生,而且和 FastAPI 非常搭配。✨

🖥 代码:FastAPI + SSE-STARLETTE 模拟大模型推理流
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import randomapp = FastAPI()# 模拟的 LLM 推理,每次产出一个 "token"
async def fake_llm_stream(prompt: str):fake_tokens = ["Hello", ",", " this", " is", " a", " simulated", " response", ".", " Thank", " you", " for", " using", " our", " AI", " model", "!"]for token in fake_tokens:# 每隔随机 100~400ms 推一个 tokenawait asyncio.sleep(random.uniform(0.1, 0.4))yield {"event": "token",              # 自定义事件名:token"data": token}# 推送一个结束标志(可以不推)yield {"event": "end","data": "[DONE]"}@app.get("/chat/stream")
async def chat_stream(prompt: str):# 每次访问 /chat/stream?prompt=xxx,就返回一个 Streaming Responsereturn EventSourceResponse(fake_llm_stream(prompt))

🛠 前端测试 HTML

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>LLM Stream Demo</title>
</head>
<body><h1>LLM 流式输出 Demo</h1><div id="output" style="white-space: pre-wrap; font-family: monospace;"></div><script>const prompt = "你好,请介绍一下自己";const es = new EventSource(`http://localhost:8000/chat/stream?prompt=${encodeURIComponent(prompt)}`);es.addEventListener("token", (e) => {document.getElementById("output").textContent += e.data;});es.addEventListener("end", (e) => {console.log("流式输出结束:", e.data);es.close();});es.onerror = (e) => {console.error("连接出错", e);es.close();};</script>
</body>
</html>

🔥 效果

  • 你一打开页面,它就调用 /chat/stream
  • 后端像大模型那样一块块流式返回 Token。
  • 浏览器前端实时接收、一字字拼出来
  • 最后收到 event: end,自动关闭连接。

就跟你用 OpenAI ChatCompletion stream=True 一模一样的体验!🎯

📦 依赖安装

别忘了装必要依赖哦:

pip install fastapi sse-starlette uvicorn

然后运行:

uvicorn your_file_name:app --reload

(记得把 your_file_name 换成你保存的 Python 文件名)

相关文章:

  • 《数字图像处理(面向新工科的电工电子信息基础课程系列教材)》图4-2
  • 数据资产价值及其实现路径-简答题回顾
  • 什么是WebSocket?NGINX如何支持WebSocket协议?
  • 2025春季NC:3.1TheTrapeziumRule
  • 第十一章 多态
  • Linux下编译并打包MNN项目迁移至其他设备
  • RTMP 协议解析 1
  • 摸鱼屏保神器工具软件下载及使用教程
  • AIGC在游戏开发中的革命:自动化生成3A级游戏内容
  • Vue3 组件通信与插槽
  • Qt开发:如何加载样式文件
  • 玩转Pygame绘图:从简单图形到炫酷精灵
  • 【开源飞控】调试
  • maven打包时配置多环境参数
  • 设置右键打开VSCode
  • MCP协议:AI与数据世界的“万能连接器“
  • [创业之路-390]:人力资源 - 社会性生命系统的解构与重构:人的角色嬗变与组织进化论
  • SpringBoot技术概述与应用实践
  • GPT系列模型-20250426
  • 《软件设计师》复习笔记(6.1)——信息安全及技术
  • 这些被低估的降血压运动,每天几分钟就管用
  • 南阳市委原书记朱是西被“双开”:搞劳民伤财的“政绩工程”
  • 持续更新丨伊朗港口爆炸事件已致406人受伤
  • 邮轮、无人机、水上运动……上海多区推动文旅商体展融合发展
  • 【社论】以“法治之盾”护航每一份创新
  • “今日海上”对话“今日维也纳”,东西方艺术在上海碰撞