Server-Sent Events简介及简单使用

TrumanWong
6/1/2023
TrumanWong

背景

此前在使用chatgpt的时候,发现chatgpt的聊天请求是一个字一个字地输出,开始以为是WebsocketF12看了之后发现Server-Sent Events,遂对此做个小小的总结。

原理

Server-Sent Events,简称SSE,是一种服务器推送技术,使客户端能够通过HTTP连接从服务器接收自动更新,通常用于向浏览器客户端发送消息更新或连续数据流。

此外,SSE技术还有一些特殊功能,如自动重连,Event IDs及发送自定义事件。SSE可以看作是一个客户端去从服务端订阅一条事件流Evemt Stream,之后服务端可以发送订阅特定时间的消息给客户端,直到服务端或客户端关闭该连接。

SSE通常被拿来与Websocket相比较:

事件流EventStream格式

事件流EventStream是一个简单的文本数据流,文本必须是UTF-8格式的编码。事件流中的消息由一堆换行符分开。以冒号开头的行为注释行,会被忽略。

注释行可以用来防止连接超时,服务器可以定期发送一条消息注释行,以保持连接不断。

每条消息由一行或多行文字组成,列出该消息的字段。每个字段由字段名表示,后面是冒号,然后是该字段值的文本数据。

每条消息中都有以下字段的某种组合:

所有其他的字段名都会被忽略。

如果一行不包含冒号,则整行将被视为具有空值字符串的字段名称。

示例

纯数据消息

在下面的示例中,发送了三个消息。第一个只是注释,因为它以冒号字符开头。如上所述,如果不能定期发送消息,这可以用作保持活动机制。

第二条消息包含一个值为some text的数据字段。第三条消息包含一个值为another message\nwith two lines的数据字段。请注意值中的换行特殊字符。

: this is a test stream

data: some text

data: another message
data: with two lines

命名事件

此示例发送命名事件。每个都有一个由字段指定的事件名称event,以及一个data字段,data可以包含任何字符串数据,通常是一个 JSON 字符串。

event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

event: userdisconnect
data: {"username": "bobby", "time": "02:34:23"}

event: usermessage
data: {"username": "sean", "time": "02:34:36", "text": "Bye, bobby."}

混合搭配

event: userconnect
data: {"username": "bobby", "time": "02:33:48"}

data: Here's a system message of some kind that will get used
data: to accomplish some task.

event: usermessage
data: {"username": "bobby", "time": "02:34:11", "text": "Hi everyone."}

使用Go语言实现SSE

服务端:

package main

import (
	"fmt"
	"io"
	"log"
	"time"

	"github.com/gin-gonic/gin"
)

// Event It keeps a list of clients those are currently attached
// and broadcasting events to those clients.
type Event struct {
	// Events are pushed to this channel by the main events-gathering routine
	Message chan string

	// New client connections
	NewClients chan chan string

	// Closed client connections
	ClosedClients chan chan string

	// Total client connections
	TotalClients map[chan string]bool
}

// ClientChan New event messages are broadcast to all registered client connection channels
type ClientChan chan string

func main() {
	router := gin.New()

	// Initialize new streaming server
	stream := NewServer()

	// We are streaming current time to clients in the interval 10 seconds
	go func() {
		for {
			time.Sleep(time.Second * 10)
			now := time.Now().Format("2006-01-02 15:04:05")
			currentTime := fmt.Sprintf("The Current Time Is %v", now)

			// Send current time to clients message channel
			stream.Message <- currentTime
		}
	}()

	// Add event-streaming headers
	api := router.Group("/api")
	api.GET("/stream", HeadersMiddleware(), stream.serveHTTP(), func(c *gin.Context) {
		v, ok := c.Get("clientChan")
		if !ok {
			return
		}
		clientChan, ok := v.(ClientChan)
		if !ok {
			return
		}
		c.Stream(func(w io.Writer) bool {
			// Stream message to client from message channel
			if msg, ok := <-clientChan; ok {
				c.SSEvent("message", msg)
				return true
			}
			return false
		})
	})

	// Parse Static files
	router.StaticFile("/", "./public/index.html")

	router.Run(":8085")
}

// NewServer Initialize event and Start procnteessing requests
func NewServer() (event *Event) {
	event = &Event{
		Message:       make(chan string),
		NewClients:    make(chan chan string),
		ClosedClients: make(chan chan string),
		TotalClients:  make(map[chan string]bool),
	}

	go event.listen()

	return
}

// It Listens all incoming requests from clients.
// Handles addition and removal of clients and broadcast messages to clients.
func (stream *Event) listen() {
	for {
		select {
		// Add new available client
		case client := <-stream.NewClients:
			stream.TotalClients[client] = true
			log.Printf("Client added. %d registered clients", len(stream.TotalClients))

		// Remove closed client
		case client := <-stream.ClosedClients:
			delete(stream.TotalClients, client)
			close(client)
			log.Printf("Removed client. %d registered clients", len(stream.TotalClients))

		// Broadcast message to client
		case eventMsg := <-stream.Message:
			for clientMessageChan := range stream.TotalClients {
				clientMessageChan <- eventMsg
			}
		}
	}
}

func (stream *Event) serveHTTP() gin.HandlerFunc {
	return func(c *gin.Context) {
		// Initialize client channel
		clientChan := make(ClientChan)

		// Send new connection to event server
		stream.NewClients <- clientChan

		defer func() {
			// Send closed connection to event server
			stream.ClosedClients <- clientChan
		}()

		c.Set("clientChan", clientChan)

		c.Next()
	}
}

func HeadersMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		c.Writer.Header().Set("Content-Type", "text/event-stream")
		c.Writer.Header().Set("Cache-Control", "no-cache")
		c.Writer.Header().Set("Connection", "keep-alive")
		c.Writer.Header().Set("Transfer-Encoding", "chunked")
		c.Next()
	}
}

客户端

<!doctype html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <title>Server Sent Event</title>
</head>

<body>
<div class="event-data"></div>
</body>

<script src="https://code.jquery.com/jquery-3.7.0.min.js"></script>
<script>
    // EventSource object of javascript listens the streaming events from our go server and prints the message.
    let stream = new EventSource("/api/stream");
    stream.addEventListener("message", function(e){
        $('.event-data').append(e.data + "</br>")
    });
</script>

</html>

文中代码已上传github