目录

Websocket千万级并发实现思路

直播是当下很火的内容表现形式,而直播的实时的弹幕是其中的重要组成部分,弹幕可以用websocket实现,同时弹幕是一个高并发的场景,试想一下如果一个直播间有10万人同时在线,每秒有100条新消息,那么弹幕系统的推送频率是100w*10/s = 1000w/s

1 拉模式和推模式区别

1.1 拉模式(客户端定时轮询)
  • 服务端数据更新频率太低,大多查询都是无意义的

  • 客户端增多,服务端查询负载很高

  • 非实时,无法满足时效性需求

1.2 推模式(websocket)
  • 有消息才会推送(有效)
  • 需要维护大量在线的长连接
  • 有消息立即推送(及时)

2 websocket协议和通信过程

浏览器先发起一个http请求,请求头中带有Upgrade: websocket,请求升级为一个websocket请求,服务端收到请求后,完成升级,之后websockt使用message来通信,底层将message分成多个Data frame进行传输

https://s1.ax1x.com/2020/10/13/0fXpBF.png

3 websocket简单实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package main

import (
	"github.com/gorilla/websocket"
	"log"
	"net/http"
)

var (
	//允许跨域
	upgrader = websocket.Upgrader{
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
	}
)

func handleWs(w http.ResponseWriter, r *http.Request) {

	var (
		conn *websocket.Conn
		err  error
		data []byte
	)
	//将一个http请求升级为websocket连接
	if conn, err = upgrader.Upgrade(w, r, nil); err != nil {
		log.Fatal(err)
		return
	}

	// 读取 websocket 数据

	for {
		if _, data, err = conn.ReadMessage(); err != nil {
			log.Println(err)
			goto ERR
		}

		if err = conn.WriteMessage(websocket.TextMessage, data); err != nil {
			log.Println(err)
			goto ERR
		}
	}
ERR:
	conn.Close()
}


func main() {
	http.HandleFunc("/ws", handleWs)

	http.ListenAndServe("0.0.0.0:7777", nil)
}



代码缺乏工程化的设计

  • 其他代码模块无法直接操作websocket连接
  • websocket连接非并发安全,并发读、写需同步手段。 ReadMessage、WriteMessage同一时刻只能有一个调用者

4 封装websocket

隐藏细节,封装API

  • 封装Connection结构,隐藏websocket底层连接
  • 封装ConnectionAPI,提供 Read/Close/send等线程安全接口
4.1 API原理
  • SendMessage 将消息投递到out channel
  • ReadMessgein channel读取消息
4.2 内部实现
  • 读协程,循环读取websocket,将消息投递到in channel

  • 写协程,循环读取out channel,将消息写给websocket

https://s1.ax1x.com/2020/10/12/0RQ7y8.png

4.3 代码封装
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
package impl

import (
	"errors"
	"github.com/gorilla/websocket"
	"log"
	"sync"
)

type Connection struct {
	webConn   *websocket.Conn
	inChan    chan [] byte
	outChan   chan [] byte
	closeChan chan byte
	mutex     sync.Mutex
	isClosed  bool
}

func InitConnection(webconn *websocket.Conn) (*Connection, error) {
	conn := &Connection{
		webConn:   webconn,
		inChan:    make(chan [] byte, 1000),
		outChan:   make(chan [] byte, 1000),
		closeChan: make(chan byte),
	}

	// 启动读协程
	go conn.readLoop()

	// 启动写协程
	go conn.writeLoop()

	return conn, nil
}

// API
func (conn *Connection) ReadMessage() (data []byte, err error) {
	select {
	case data = <-conn.inChan:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}
	return data, err
}

func (conn *Connection) WriteMessage(data []byte) (err error) {
	select {
	case conn.outChan <- data:
	case <-conn.closeChan:
		err = errors.New("connection is closed")
	}

	return err
}

func (conn *Connection) Close() {
	// 并发安全,可重入(多次调用)
	conn.webConn.Close()

	conn.mutex.Lock()
	defer conn.mutex.Unlock()
	if !conn.isClosed {
		conn.isClosed = true
	}
	close(conn.closeChan)
}

//内部实现
func (conn *Connection) readLoop() {
	var (
		data []byte
		err  error
	)
	for {
		if _, data, err = conn.webConn.ReadMessage(); err != nil {
			log.Println(err)
			goto ERR
		}

		select {
		case conn.inChan <- data:
		case <-conn.closeChan:
			goto ERR
		}

	}
ERR:
	conn.Close()
}

func (conn *Connection) writeLoop() {
	var (
		data []byte
		err  error
	)
	for {
		select {
		case data = <-conn.outChan:
		case <-conn.closeChan:
			goto ERR
		}
		if err = conn.webConn.WriteMessage(websocket.TextMessage, data); err != nil {
			log.Println(err)
			goto ERR
		}

	}
ERR:
	conn.Close()
}

5 技术瓶颈&解决方案

应用场景:100万人同时在线,每秒10条新消息,推送频率是1000w/s

5.1 内核瓶颈
  • 100w * 10/s = 1000w/s,每秒1000万的推送频率
  • linux 内核发送tcp的极限包帧是100w/s

优化方案:

  • 将同一秒内的N条消息合并为一条
  • 合并后,每秒的推送次数等于在线用户数
5.2 锁瓶颈
  • 需要维护一个在线用户列表(100w),通常是一个字典结构

  • 需要遍历这个集合,然后顺序推送消息,耗时极长

  • 推送过程中,客户端仍旧在上、下线,所以需要对集合加锁

优化方案`:

  • 连接分散到多个集合中,每个集合都有自己的锁(大锁拆小)
  • 多个线程并发的遍历这些集合,避免锁竞争
  • 用读写锁代替互斥锁,多个推送任务可以并发的读取同一个小集合
5.3 CPU瓶颈
  • 浏览器和服务端通常是json通信
  • json编码耗费CPU资源
  • 100万次json编码

优化方案

  • 1次编码,100w次推送
  • N条消息合并后,只需要编码一次

6 其他

  • 技术只是一个工具,而工具会越用越简单
  • 大量高质量的技术实践和规律总结,可以形成超越特定技术的思想
  • 积累了大量思想,各种思想融汇贯通,最后形成了一种解决问题精神,而这种精神完全也可以应用到生活中