此前在使用chatgpt
的时候,发现chatgpt
的聊天请求是一个字一个字地输出,开始以为是Websocket
,F12
看了之后发现Server-Sent Events
,遂对此做个小小的总结。
Server-Sent Events
,简称SSE
,是一种服务器推送技术,使客户端能够通过HTTP
连接从服务器接收自动更新,通常用于向浏览器客户端发送消息更新或连续数据流。
此外,SSE
技术还有一些特殊功能,如自动重连,Event IDs
及发送自定义事件。SSE
可以看作是一个客户端去从服务端订阅一条事件流Evemt Stream
,之后服务端可以发送订阅特定时间的消息给客户端,直到服务端或客户端关闭该连接。
SSE
通常被拿来与Websocket
相比较:
EventStream
格式事件流EventStream
是一个简单的文本数据流,文本必须是UTF-8
格式的编码。事件流中的消息由一堆换行符分开。以冒号开头的行为注释行,会被忽略。
每条消息由一行或多行文字组成,列出该消息的字段。每个字段由字段名表示,后面是冒号,然后是该字段值的文本数据。
每条消息中都有以下字段的某种组合:
所有其他的字段名都会被忽略。
在下面的示例中,发送了三个消息。第一个只是注释,因为它以冒号字符开头。如上所述,如果不能定期发送消息,这可以用作保持活动机制。
第二条消息包含一个值为some text
的数据字段。第三条消息包含一个值为another message\nwith two lines
的数据字段。请注意值中的换行特殊字符。
: this is a test stream
data: some text
data: another message
data: with two lines
此示例发送命名事件。每个都有一个由字段指定的事件名称event
,以及一个data
字段,data
可以包含任何字符串数据,通常是一个 JSON
字符串。
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}
event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}
event: userconnect
data: {"username": "bobby", "time": "02:33:48"}
data: Here's a system message of some kind that will get used
data: to accomplish some task.
event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}
Go
语言实现SSE
服务端:
package main
import (
"fmt"
"io"
"log"
"time"
"github.com/gin-gonic/gin"
)
// Event It keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type Event struct {
// Events are pushed to this channel by the main events-gathering routine
Message chan string
// New client connections
NewClients chan chan string
// Closed client connections
ClosedClients chan chan string
// Total client connections
TotalClients map[chan string]bool
}
// ClientChan New event messages are broadcast to all registered client connection channels
type ClientChan chan string
func main() {
router := gin.New()
// Initialize new streaming server
stream := NewServer()
// We are streaming current time to clients in the interval 10 seconds
go func() {
for {
time.Sleep(time.Second * 10)
now := time.Now().Format("2006-01-02 15:04:05")
currentTime := fmt.Sprintf("The Current Time Is %v", now)
// Send current time to clients message channel
stream.Message <- currentTime
}
}()
// Add event-streaming headers
api := router.Group("/api")
api.GET("/stream", HeadersMiddleware(), stream.serveHTTP(), func(c *gin.Context) {
v, ok := c.Get("clientChan")
if !ok {
return
}
clientChan, ok := v.(ClientChan)
if !ok {
return
}
c.Stream(func(w io.Writer) bool {
// Stream message to client from message channel
if msg, ok := <-clientChan; ok {
c.SSEvent("message", msg)
return true
}
return false
})
})
// Parse Static files
router.StaticFile("/", "./public/index.html")
router.Run(":8085")
}
// NewServer Initialize event and Start procnteessing requests
func NewServer() (event *Event) {
event = &Event{
Message: make(chan string),
NewClients: make(chan chan string),
ClosedClients: make(chan chan string),
TotalClients: make(map[chan string]bool),
}
go event.listen()
return
}
// It Listens all incoming requests from clients.
// Handles addition and removal of clients and broadcast messages to clients.
func (stream *Event) listen() {
for {
select {
// Add new available client
case client := <-stream.NewClients:
stream.TotalClients[client] = true
log.Printf("Client added. %d registered clients", len(stream.TotalClients))
// Remove closed client
case client := <-stream.ClosedClients:
delete(stream.TotalClients, client)
close(client)
log.Printf("Removed client. %d registered clients", len(stream.TotalClients))
// Broadcast message to client
case eventMsg := <-stream.Message:
for clientMessageChan := range stream.TotalClients {
clientMessageChan <- eventMsg
}
}
}
}
func (stream *Event) serveHTTP() gin.HandlerFunc {
return func(c *gin.Context) {
// Initialize client channel
clientChan := make(ClientChan)
// Send new connection to event server
stream.NewClients <- clientChan
defer func() {
// Send closed connection to event server
stream.ClosedClients <- clientChan
}()
c.Set("clientChan", clientChan)
c.Next()
}
}
func HeadersMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
c.Writer.Header().Set("Content-Type", "text/event-stream")
c.Writer.Header().Set("Cache-Control", "no-cache")
c.Writer.Header().Set("Connection", "keep-alive")
c.Writer.Header().Set("Transfer-Encoding", "chunked")
c.Next()
}
}
客户端
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Server Sent Event</title>
</head>
<body>
<div class="event-data"></div>
</body>
<script src="https://code.jquery.com/jquery-3.7.0.min.js"></script>
<script>
// EventSource object of javascript listens the streaming events from our go server and prints the message.
let stream = new EventSource("/api/stream");
stream.addEventListener("message", function(e){
$('.event-data').append(e.data + "</br>")
});
</script>
</html>
文中代码已上传github。