2018-09-08 05:12:48 +00:00
package common
import (
"encoding/json"
2019-10-01 21:06:22 +00:00
"log"
2018-09-08 05:12:48 +00:00
"sync"
"time"
"github.com/gorilla/websocket"
)
// TODO: Rename this to WebSockets?
var WsHub WsHubImpl
// TODO: Make this an interface?
2018-09-13 07:41:01 +00:00
// TODO: Write tests for this
2018-09-08 05:12:48 +00:00
type WsHubImpl struct {
// TODO: Implement some form of generics so we don't write as much odd-even sharding code
evenOnlineUsers map [ int ] * WSUser
oddOnlineUsers map [ int ] * WSUser
evenUserLock sync . RWMutex
oddUserLock sync . RWMutex
// TODO: Add sharding for this too?
OnlineGuests map [ * WSUser ] bool
GuestLock sync . RWMutex
lastTick time . Time
lastTopicList [ ] * TopicsRow
}
func init ( ) {
// TODO: Do we really want to initialise this here instead of in main.go / general_test.go like the other things?
WsHub = WsHubImpl {
evenOnlineUsers : make ( map [ int ] * WSUser ) ,
oddOnlineUsers : make ( map [ int ] * WSUser ) ,
OnlineGuests : make ( map [ * WSUser ] bool ) ,
}
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) Start ( ) {
2019-06-19 03:16:03 +00:00
log . Print ( "Setting up the WebSocket ticks" )
2019-02-28 07:28:17 +00:00
ticker := time . NewTicker ( time . Minute * 5 )
defer func ( ) {
ticker . Stop ( )
} ( )
go func ( ) {
for {
2019-10-01 21:06:22 +00:00
item := func ( l * sync . RWMutex , userMap map [ int ] * WSUser ) {
l . RLock ( )
defer l . RUnlock ( )
2019-02-28 07:28:17 +00:00
// TODO: Copy to temporary slice for less contention?
2021-04-28 09:29:15 +00:00
for _ , u := range userMap {
u . Ping ( )
2019-02-28 07:28:17 +00:00
}
}
select {
case <- ticker . C :
2019-10-01 21:06:22 +00:00
item ( & h . evenUserLock , h . evenOnlineUsers )
item ( & h . oddUserLock , h . oddOnlineUsers )
2019-02-28 07:28:17 +00:00
}
}
} ( )
2018-09-08 05:12:48 +00:00
if Config . DisableLiveTopicList {
return
}
2019-10-01 21:06:22 +00:00
h . lastTick = time . Now ( )
AddScheduledSecondTask ( h . Tick )
2018-09-08 05:12:48 +00:00
}
2018-12-27 05:42:41 +00:00
// This Tick is separate from the admin one, as we want to process that in parallel with this due to the blocking calls to gopsutil
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) Tick ( ) error {
return wsTopicListTick ( h )
2019-02-10 05:52:26 +00:00
}
2019-10-01 21:06:22 +00:00
func wsTopicListTick ( h * WsHubImpl ) error {
2019-04-19 23:15:17 +00:00
// Avoid hitting GetList when the topic list hasn't changed
2019-10-01 21:06:22 +00:00
if ! TopicListThaw . Thawed ( ) && h . lastTopicList != nil {
2019-04-19 23:15:17 +00:00
return nil
}
2019-05-09 06:58:55 +00:00
tickStart := time . Now ( )
2019-10-01 21:06:22 +00:00
2018-09-08 05:12:48 +00:00
// Don't waste CPU time if nothing has happened
// TODO: Get a topic list method which strips stickies?
2020-03-03 04:25:18 +00:00
tList , _ , _ , err := TopicList . GetList ( 1 , 0 , nil )
2018-09-08 05:12:48 +00:00
if err != nil {
2019-10-01 21:06:22 +00:00
h . lastTick = tickStart
2018-09-08 05:12:48 +00:00
return err // TODO: Do we get ErrNoRows here?
}
defer func ( ) {
2019-10-01 21:06:22 +00:00
h . lastTick = tickStart
h . lastTopicList = tList
2018-09-08 05:12:48 +00:00
} ( )
if len ( tList ) == 0 {
return nil
}
// TODO: Optimise this by only sniffing the top non-sticky
// TODO: Optimise this by getting back an unsorted list so we don't have to hop around the stickies
// TODO: Add support for new stickies / replies to them
2019-10-01 21:06:22 +00:00
if len ( tList ) == len ( h . lastTopicList ) {
hasItem := false
2018-09-08 05:12:48 +00:00
for j , tItem := range tList {
if ! tItem . Sticky {
2019-10-01 21:06:22 +00:00
if tItem . ID != h . lastTopicList [ j ] . ID || ! tItem . LastReplyAt . Equal ( h . lastTopicList [ j ] . LastReplyAt ) {
2018-09-08 05:12:48 +00:00
hasItem = true
2019-11-04 07:00:12 +00:00
break
2018-09-08 05:12:48 +00:00
}
}
}
if ! hasItem {
return nil
}
}
// TODO: Implement this for guests too? Should be able to optimise it far better there due to them sharing the same permission set
// TODO: Be less aggressive with the locking, maybe use an array of sorts instead of hitting the main map every-time
topicListMutex . RLock ( )
if len ( topicListWatchers ) == 0 {
topicListMutex . RUnlock ( )
return nil
}
// Copy these over so we close this loop as fast as possible so we can release the read lock, especially if the group gets are backed by calls to the database
2019-10-01 21:06:22 +00:00
groupIDs := make ( map [ int ] bool )
currentWatchers := make ( [ ] * WSUser , len ( topicListWatchers ) )
i := 0
2018-09-08 05:12:48 +00:00
for wsUser , _ := range topicListWatchers {
currentWatchers [ i ] = wsUser
groupIDs [ wsUser . User . Group ] = true
i ++
}
topicListMutex . RUnlock ( )
2019-10-01 21:06:22 +00:00
groups := make ( map [ int ] * Group )
canSeeMap := make ( map [ string ] [ ] int )
2021-04-28 09:29:15 +00:00
for gid , _ := range groupIDs {
g , err := Groups . Get ( gid )
2018-09-08 05:12:48 +00:00
if err != nil {
// TODO: Do we really want to halt all pushes for what is possibly just one user?
return err
}
2021-04-28 09:29:15 +00:00
groups [ g . ID ] = g
2018-09-08 05:12:48 +00:00
2021-04-28 09:29:15 +00:00
canSee := make ( [ ] byte , len ( g . CanSee ) )
for i , item := range g . CanSee {
2018-09-08 05:12:48 +00:00
canSee [ i ] = byte ( item )
}
2021-04-28 09:29:15 +00:00
canSeeMap [ string ( canSee ) ] = g . CanSee
2018-09-08 05:12:48 +00:00
}
2019-10-01 21:06:22 +00:00
canSeeRenders := make ( map [ string ] [ ] byte )
2020-07-19 03:26:56 +00:00
canSeeLists := make ( map [ string ] [ ] * WsTopicsRow )
2018-09-08 05:12:48 +00:00
for name , canSee := range canSeeMap {
2020-03-03 04:25:18 +00:00
topicList , forumList , _ , err := TopicList . GetListByCanSee ( canSee , 1 , 0 , nil )
2018-09-08 05:12:48 +00:00
if err != nil {
return err // TODO: Do we get ErrNoRows here?
}
if len ( topicList ) == 0 {
continue
}
_ = forumList // Might use this later after we get the base feature working
if topicList [ 0 ] . Sticky {
2019-10-01 21:06:22 +00:00
lastSticky := 0
2018-09-08 05:12:48 +00:00
for i , row := range topicList {
if ! row . Sticky {
lastSticky = i
break
}
}
if lastSticky == 0 {
continue
}
topicList = topicList [ lastSticky : ]
}
// TODO: Compare to previous tick to eliminate unnecessary work and data
2019-10-01 21:06:22 +00:00
wsTopicList := make ( [ ] * WsTopicsRow , len ( topicList ) )
2018-09-08 05:12:48 +00:00
for i , topicRow := range topicList {
wsTopicList [ i ] = topicRow . WebSockets ( )
}
2020-07-19 03:26:56 +00:00
canSeeLists [ name ] = wsTopicList
2018-09-08 05:12:48 +00:00
2019-05-09 06:58:55 +00:00
outBytes , err := json . Marshal ( & WsTopicList { wsTopicList , 0 , tickStart . Unix ( ) } )
2018-09-08 05:12:48 +00:00
if err != nil {
return err
}
canSeeRenders [ name ] = outBytes
}
// TODO: Use MessagePack for additional speed?
//fmt.Println("writing to the clients")
for _ , wsUser := range currentWatchers {
2020-07-19 03:26:56 +00:00
u := wsUser . User
group := groups [ u . Group ]
2019-10-01 21:06:22 +00:00
canSee := make ( [ ] byte , len ( group . CanSee ) )
2018-09-08 05:12:48 +00:00
for i , item := range group . CanSee {
canSee [ i ] = byte ( item )
}
2020-07-19 03:26:56 +00:00
sCanSee := string ( canSee )
l := canSeeLists [ sCanSee ]
// TODO: Optimise this away for guests?
anyMod , anyLock , anyMove , allMod := false , false , false , true
var modSet map [ int ] int
if u . IsSuperAdmin {
anyMod = true
anyLock = true
anyMove = true
} else {
modSet = make ( map [ int ] int , len ( l ) )
for i , t := range l {
// TODO: Abstract this?
2021-04-28 09:29:15 +00:00
fp , e := FPStore . Get ( t . ParentID , u . Group )
if e == ErrNoRows {
2020-07-19 03:26:56 +00:00
fp = BlankForumPerms ( )
2021-04-28 09:29:15 +00:00
} else if e != nil {
return e
2020-07-19 03:26:56 +00:00
}
var ccanMod , ccanLock , ccanMove bool
if fp . Overrides {
ccanLock = fp . CloseTopic
ccanMove = fp . MoveTopic
ccanMod = t . CreatedBy == u . ID || fp . DeleteTopic || ccanLock || ccanMove
} else {
ccanLock = u . Perms . CloseTopic
ccanMove = u . Perms . MoveTopic
ccanMod = t . CreatedBy == u . ID || u . Perms . DeleteTopic || ccanLock || ccanMove
}
if ccanLock {
anyLock = true
}
if ccanMove {
anyMove = true
}
if ccanMod {
anyMod = true
} else {
allMod = false
}
var v int
if ccanMod {
v = 1
}
modSet [ i ] = v
}
}
2018-09-08 05:12:48 +00:00
//fmt.Println("writing to user #", wsUser.User.ID)
2020-07-19 03:26:56 +00:00
outBytes := canSeeRenders [ sCanSee ]
2018-09-08 05:12:48 +00:00
//fmt.Println("outBytes: ", string(outBytes))
2020-07-19 03:26:56 +00:00
//fmt.Println("outBytes[:len(outBytes)-1]: ", string(outBytes[:len(outBytes)-1]))
//e := wsUser.WriteToPageBytes(outBytes, "/topics/")
//e := wsUser.WriteToPageBytesMulti([][]byte{outBytes[:len(outBytes)-1], []byte(`,"mod":1}`)}, "/topics/")
var e error
if ! anyMod {
e = wsUser . WriteToPageBytes ( outBytes , "/topics/" )
} else {
var lm [ ] byte
if anyLock && anyMove {
lm = [ ] byte ( ` ,"lock":1,"move":1} ` )
} else if anyLock {
lm = [ ] byte ( ` ,"lock":1} ` )
} else if anyMove {
lm = [ ] byte ( ` ,"move":1} ` )
} else {
lm = [ ] byte ( "}" )
}
if allMod {
e = wsUser . WriteToPageBytesMulti ( [ ] [ ] byte { outBytes [ : len ( outBytes ) - 1 ] , [ ] byte ( ` ,"mod":1 ` ) , lm } , "/topics/" )
} else {
// TODO: Temporary and inefficient
mBytes , err := json . Marshal ( modSet )
if err != nil {
return err
}
e = wsUser . WriteToPageBytesMulti ( [ ] [ ] byte { outBytes [ : len ( outBytes ) - 1 ] , [ ] byte ( ` ,"mod": ` ) , mBytes , lm } , "/topics/" )
}
}
if e == ErrNoneOnPage {
2018-09-08 05:12:48 +00:00
//fmt.Printf("werr for #%d: %s\n", wsUser.User.ID, err)
wsUser . FinalizePage ( "/topics/" , func ( ) {
topicListMutex . Lock ( )
delete ( topicListWatchers , wsUser )
topicListMutex . Unlock ( )
} )
continue
}
}
return nil
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) GuestCount ( ) int {
h . GuestLock . RLock ( )
defer h . GuestLock . RUnlock ( )
return len ( h . OnlineGuests )
2018-09-08 05:12:48 +00:00
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) UserCount ( ) ( count int ) {
h . evenUserLock . RLock ( )
count += len ( h . evenOnlineUsers )
h . evenUserLock . RUnlock ( )
2019-10-27 23:13:24 +00:00
2019-10-01 21:06:22 +00:00
h . oddUserLock . RLock ( )
count += len ( h . oddOnlineUsers )
h . oddUserLock . RUnlock ( )
2018-09-08 05:12:48 +00:00
return count
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) HasUser ( uid int ) ( exists bool ) {
h . evenUserLock . RLock ( )
_ , exists = h . evenOnlineUsers [ uid ]
h . evenUserLock . RUnlock ( )
2018-09-13 07:41:01 +00:00
if exists {
return exists
}
2019-11-04 07:00:12 +00:00
2019-10-01 21:06:22 +00:00
h . oddUserLock . RLock ( )
_ , exists = h . oddOnlineUsers [ uid ]
h . oddUserLock . RUnlock ( )
2018-09-13 07:41:01 +00:00
return exists
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) broadcastMessage ( msg string ) error {
userLoop := func ( users map [ int ] * WSUser , m * sync . RWMutex ) error {
m . RLock ( )
defer m . RUnlock ( )
2018-09-08 05:12:48 +00:00
for _ , wsUser := range users {
2021-04-28 09:29:15 +00:00
e := wsUser . WriteAll ( msg )
if e != nil {
return e
2018-09-08 05:12:48 +00:00
}
}
return nil
}
// TODO: Can we move this RLock inside the closure safely?
2021-04-28 09:29:15 +00:00
e := userLoop ( h . evenOnlineUsers , & h . evenUserLock )
if e != nil {
return e
2018-09-08 05:12:48 +00:00
}
2019-10-01 21:06:22 +00:00
return userLoop ( h . oddOnlineUsers , & h . oddUserLock )
2018-09-08 05:12:48 +00:00
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) getUser ( uid int ) ( wsUser * WSUser , err error ) {
2018-09-08 05:12:48 +00:00
var ok bool
if uid % 2 == 0 {
2019-10-01 21:06:22 +00:00
h . evenUserLock . RLock ( )
wsUser , ok = h . evenOnlineUsers [ uid ]
h . evenUserLock . RUnlock ( )
2018-09-08 05:12:48 +00:00
} else {
2019-10-01 21:06:22 +00:00
h . oddUserLock . RLock ( )
wsUser , ok = h . oddOnlineUsers [ uid ]
h . oddUserLock . RUnlock ( )
2018-09-08 05:12:48 +00:00
}
if ! ok {
return nil , errWsNouser
}
return wsUser , nil
}
// Warning: For efficiency, some of the *WSUsers may be nil pointers, DO NOT EXPORT
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) getUsers ( uids [ ] int ) ( wsUsers [ ] * WSUser , err error ) {
2018-09-08 05:12:48 +00:00
if len ( uids ) == 0 {
return nil , errWsNouser
}
2021-04-28 09:29:15 +00:00
wsUsers = make ( [ ] * WSUser , len ( uids ) )
i := 0
2019-10-01 21:06:22 +00:00
appender := func ( l * sync . RWMutex , users map [ int ] * WSUser ) {
l . RLock ( )
defer l . RUnlock ( )
2019-01-21 12:27:59 +00:00
// We don't want to keep a lock on this for too long, so we'll accept some nil pointers
for _ , uid := range uids {
2021-04-28 09:29:15 +00:00
wsUsers [ i ] = users [ uid ]
i ++
2019-01-21 12:27:59 +00:00
}
2018-09-08 05:12:48 +00:00
}
2019-10-01 21:06:22 +00:00
appender ( & h . evenUserLock , h . evenOnlineUsers )
appender ( & h . oddUserLock , h . oddOnlineUsers )
2018-09-08 05:12:48 +00:00
if len ( wsUsers ) == 0 {
return nil , errWsNouser
}
return wsUsers , nil
}
2019-01-21 12:27:59 +00:00
// For Widget WOL, please avoid using this as it might wind up being really long and slow without the right safeguards
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) AllUsers ( ) ( users [ ] * User ) {
appender := func ( l * sync . RWMutex , userMap map [ int ] * WSUser ) {
l . RLock ( )
defer l . RUnlock ( )
2021-04-28 09:29:15 +00:00
for _ , u := range userMap {
users = append ( users , u . User )
2019-01-21 12:27:59 +00:00
}
}
2019-10-01 21:06:22 +00:00
appender ( & h . evenUserLock , h . evenOnlineUsers )
appender ( & h . oddUserLock , h . oddOnlineUsers )
2019-01-21 12:27:59 +00:00
return users
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) removeUser ( uid int ) {
2018-09-08 05:12:48 +00:00
if uid % 2 == 0 {
2019-10-01 21:06:22 +00:00
h . evenUserLock . Lock ( )
delete ( h . evenOnlineUsers , uid )
h . evenUserLock . Unlock ( )
2018-09-08 05:12:48 +00:00
} else {
2019-10-01 21:06:22 +00:00
h . oddUserLock . Lock ( )
delete ( h . oddOnlineUsers , uid )
h . oddUserLock . Unlock ( )
2018-09-08 05:12:48 +00:00
}
}
2020-03-18 09:21:34 +00:00
func ( h * WsHubImpl ) AddConn ( user * User , conn * websocket . Conn ) ( * WSUser , error ) {
2018-09-08 05:12:48 +00:00
if user . ID == 0 {
wsUser := new ( WSUser )
2019-02-28 07:28:17 +00:00
wsUser . User = new ( User )
2020-03-18 09:21:34 +00:00
* wsUser . User = * user
2018-09-08 05:12:48 +00:00
wsUser . AddSocket ( conn , "" )
WsHub . GuestLock . Lock ( )
WsHub . OnlineGuests [ wsUser ] = true
WsHub . GuestLock . Unlock ( )
return wsUser , nil
}
2019-02-28 07:28:17 +00:00
// TODO: How should we handle user state changes if we're holding a pointer which never changes?
userptr , err := Users . Get ( user . ID )
if err != nil && err != ErrStoreCapacityOverflow {
return nil , err
}
2018-09-08 05:12:48 +00:00
var mutex * sync . RWMutex
var theMap map [ int ] * WSUser
if user . ID % 2 == 0 {
2019-10-01 21:06:22 +00:00
mutex = & h . evenUserLock
theMap = h . evenOnlineUsers
2018-09-08 05:12:48 +00:00
} else {
2019-10-01 21:06:22 +00:00
mutex = & h . oddUserLock
theMap = h . oddOnlineUsers
2018-09-08 05:12:48 +00:00
}
mutex . Lock ( )
wsUser , ok := theMap [ user . ID ]
if ! ok {
wsUser = new ( WSUser )
wsUser . User = userptr
2021-01-06 06:41:08 +00:00
wsUser . Sockets = [ ] * WSUserSocket { { conn , "" } }
2018-09-08 05:12:48 +00:00
theMap [ user . ID ] = wsUser
mutex . Unlock ( )
return wsUser , nil
}
mutex . Unlock ( )
wsUser . AddSocket ( conn , "" )
return wsUser , nil
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) RemoveConn ( wsUser * WSUser , conn * websocket . Conn ) {
2018-09-08 05:12:48 +00:00
wsUser . RemoveSocket ( conn )
wsUser . Lock ( )
if len ( wsUser . Sockets ) == 0 {
2019-10-01 21:06:22 +00:00
h . removeUser ( wsUser . User . ID )
2018-09-08 05:12:48 +00:00
}
wsUser . Unlock ( )
}
2019-10-01 21:06:22 +00:00
func ( h * WsHubImpl ) PushMessage ( targetUser int , msg string ) error {
2021-04-28 09:29:15 +00:00
wsUser , e := h . getUser ( targetUser )
if e != nil {
return e
2018-09-08 05:12:48 +00:00
}
return wsUser . WriteAll ( msg )
}
2021-04-28 09:29:15 +00:00
func ( h * WsHubImpl ) pushAlert ( targetUser int , a Alert ) error {
wsUser , e := h . getUser ( targetUser )
if e != nil {
return e
2018-09-08 05:12:48 +00:00
}
2021-04-28 09:29:15 +00:00
astr , e := BuildAlert ( a , * wsUser . User )
if e != nil {
return e
2018-09-08 05:12:48 +00:00
}
2018-11-22 07:21:43 +00:00
return wsUser . WriteAll ( astr )
2018-09-08 05:12:48 +00:00
}
2021-04-28 09:29:15 +00:00
func ( h * WsHubImpl ) pushAlerts ( users [ ] int , a Alert ) error {
2019-10-01 21:06:22 +00:00
wsUsers , err := h . getUsers ( users )
2018-09-08 05:12:48 +00:00
if err != nil {
return err
}
var errs [ ] error
for _ , wsUser := range wsUsers {
if wsUser == nil {
continue
}
2021-04-28 09:29:15 +00:00
alert , err := BuildAlert ( a , * wsUser . User )
2018-09-08 05:12:48 +00:00
if err != nil {
errs = append ( errs , err )
}
err = wsUser . WriteAll ( alert )
if err != nil {
errs = append ( errs , err )
}
}
// Return the first error
if len ( errs ) != 0 {
2021-04-28 09:29:15 +00:00
for _ , e := range errs {
return e
2018-09-08 05:12:48 +00:00
}
}
return nil
}