直播是当下很火的内容表现形式,而直播的实时的弹幕是其中的重要组成部分,弹幕可以用websocket
实现,同时弹幕是一个高并发的场景,试想一下如果一个直播间有10万人同时在线,每秒有100条新消息,那么弹幕系统的推送频率是100w*10/s = 1000w/s
1 拉模式和推模式区别
1.1 拉模式(客户端定时轮询)
-
服务端数据更新频率太低,大多查询都是无意义的
-
客户端增多,服务端查询负载很高
-
非实时,无法满足时效性需求
1.2 推模式(websocket)
- 有消息才会推送(有效)
- 需要维护大量在线的
长连接
- 有消息立即推送(及时)
2 websocket协议和通信过程
浏览器先发起一个http
请求,请求头中带有Upgrade: websocket
,请求升级为一个websocket
请求,服务端收到请求后,完成升级,之后websockt
使用message
来通信,底层将message
分成多个Data frame
进行传输
3 websocket简单实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package main
import (
"github.com/gorilla/websocket"
"log"
"net/http"
)
var (
//允许跨域
upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
)
func handleWs(w http.ResponseWriter, r *http.Request) {
var (
conn *websocket.Conn
err error
data []byte
)
//将一个http请求升级为websocket连接
if conn, err = upgrader.Upgrade(w, r, nil); err != nil {
log.Fatal(err)
return
}
// 读取 websocket 数据
for {
if _, data, err = conn.ReadMessage(); err != nil {
log.Println(err)
goto ERR
}
if err = conn.WriteMessage(websocket.TextMessage, data); err != nil {
log.Println(err)
goto ERR
}
}
ERR:
conn.Close()
}
func main() {
http.HandleFunc("/ws", handleWs)
http.ListenAndServe("0.0.0.0:7777", nil)
}
|
代码缺乏工程化的设计
- 其他代码模块无法直接操作
websocket
连接
websocket
连接非并发安全,并发读、写需同步手段。 ReadMessage、WriteMessage
同一时刻只能有一个调用者
4 封装websocket
隐藏细节,封装API
- 封装
Connection
结构,隐藏websocket
底层连接
- 封装
Connection
的API
,提供 Read
/Close
/send
等线程安全接口
4.1 API原理
SendMessage
将消息投递到out channel
ReadMessge
从in channel
读取消息
4.2 内部实现
4.3 代码封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
package impl
import (
"errors"
"github.com/gorilla/websocket"
"log"
"sync"
)
type Connection struct {
webConn *websocket.Conn
inChan chan [] byte
outChan chan [] byte
closeChan chan byte
mutex sync.Mutex
isClosed bool
}
func InitConnection(webconn *websocket.Conn) (*Connection, error) {
conn := &Connection{
webConn: webconn,
inChan: make(chan [] byte, 1000),
outChan: make(chan [] byte, 1000),
closeChan: make(chan byte),
}
// 启动读协程
go conn.readLoop()
// 启动写协程
go conn.writeLoop()
return conn, nil
}
// API
func (conn *Connection) ReadMessage() (data []byte, err error) {
select {
case data = <-conn.inChan:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return data, err
}
func (conn *Connection) WriteMessage(data []byte) (err error) {
select {
case conn.outChan <- data:
case <-conn.closeChan:
err = errors.New("connection is closed")
}
return err
}
func (conn *Connection) Close() {
// 并发安全,可重入(多次调用)
conn.webConn.Close()
conn.mutex.Lock()
defer conn.mutex.Unlock()
if !conn.isClosed {
conn.isClosed = true
}
close(conn.closeChan)
}
//内部实现
func (conn *Connection) readLoop() {
var (
data []byte
err error
)
for {
if _, data, err = conn.webConn.ReadMessage(); err != nil {
log.Println(err)
goto ERR
}
select {
case conn.inChan <- data:
case <-conn.closeChan:
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection) writeLoop() {
var (
data []byte
err error
)
for {
select {
case data = <-conn.outChan:
case <-conn.closeChan:
goto ERR
}
if err = conn.webConn.WriteMessage(websocket.TextMessage, data); err != nil {
log.Println(err)
goto ERR
}
}
ERR:
conn.Close()
}
|
5 技术瓶颈&解决方案
应用场景:100万人同时在线,每秒10条新消息,推送频率是1000w/s
5.1 内核瓶颈
- 100w * 10/s = 1000w/s,每秒1000万的推送频率
linux
内核发送tcp
的极限包帧是100w/s
优化方案
:
- 将同一秒内的N条消息合并为一条
- 合并后,每秒的推送次数等于在线用户数
5.2 锁瓶颈
优化方案`:
- 连接分散到多个集合中,每个集合都有自己的锁(大锁拆小)
- 多个线程并发的遍历这些集合,避免锁竞争
- 用读写锁代替互斥锁,多个推送任务可以并发的读取同一个小集合
5.3 CPU瓶颈
- 浏览器和服务端通常是
json
通信
json
编码耗费CPU
资源
- 100万次
json
编码
优化方案
:
- 1次编码,100w次推送
- N条消息合并后,只需要编码一次
6 其他
- 技术只是一个工具,而工具会越用越简单
- 大量高质量的技术实践和规律总结,可以形成超越特定技术的思想
- 积累了大量思想,各种思想融汇贯通,最后形成了一种解决问题精神,而这种精神完全也可以应用到生活中