From cd89e836a12cb147c00d8385f4c1a5fb7f202349 Mon Sep 17 00:00:00 2001 From: Azareal Date: Thu, 13 Sep 2018 17:41:01 +1000 Subject: [PATCH] Some resource management work and performance improvements. Added the DeallocOverflow and BulkRemove methods to UserCache. Added the HasUser method to WsHubImpl. Added a hot path for the common case of a single uncached item in BulkGetMap in DefaultUserStore. --- common/null_user_cache.go | 2 ++ common/topic_list.go | 2 ++ common/topic_store.go | 2 +- common/user_cache.go | 50 ++++++++++++++++++++++++++++++++++++++- common/user_store.go | 10 ++++++++ common/ws_hub.go | 14 +++++++++++ main.go | 23 ++++++++++++++++++ 7 files changed, 101 insertions(+), 2 deletions(-) diff --git a/common/null_user_cache.go b/common/null_user_cache.go index 057244a6..04cc9203 100644 --- a/common/null_user_cache.go +++ b/common/null_user_cache.go @@ -10,6 +10,7 @@ func NewNullUserCache() *NullUserCache { } // nolint +func (mus *NullUserCache) DeallocOverflow() {} func (mus *NullUserCache) Get(id int) (*User, error) { return nil, ErrNoRows } @@ -34,6 +35,7 @@ func (mus *NullUserCache) Remove(id int) error { func (mus *NullUserCache) RemoveUnsafe(id int) error { return nil } +func (mus *NullUserCache) BulkRemove(ids []int) {} func (mus *NullUserCache) Flush() { } func (mus *NullUserCache) Length() int { diff --git a/common/topic_list.go b/common/topic_list.go index 565fa765..3e41ae2a 100644 --- a/common/topic_list.go +++ b/common/topic_list.go @@ -216,6 +216,7 @@ func (tList *DefaultTopicList) getList(page int, argList []interface{}, qlist st } offset, page, lastPage := PageOffset(topicCount, page, Config.ItemsPerPage) + // TODO: Prepare common qlist lengths to speed this up in common cases, prepared statements are prepared lazily anyway, so it probably doesn't matter if we do ten or so stmt, err := qgen.Builder.SimpleSelect("topics", "tid, title, content, createdBy, is_closed, sticky, createdAt, lastReplyAt, lastReplyBy, parentID, views, postCount, likeCount", "parentID IN("+qlist+")", "sticky DESC, lastReplyAt DESC, createdBy DESC", "?,?") if err != nil { return nil, Paginator{nil, 1, 1}, err @@ -233,6 +234,7 @@ func (tList *DefaultTopicList) getList(page int, argList []interface{}, qlist st var reqUserList = make(map[int]bool) for rows.Next() { + // TODO: Embed Topic structs in TopicsRow to make it easier for us to reuse this work in the topic cache topicItem := TopicsRow{ID: 0} err := rows.Scan(&topicItem.ID, &topicItem.Title, &topicItem.Content, &topicItem.CreatedBy, &topicItem.IsClosed, &topicItem.Sticky, &topicItem.CreatedAt, &topicItem.LastReplyAt, &topicItem.LastReplyBy, &topicItem.ParentID, &topicItem.ViewCount, &topicItem.PostCount, &topicItem.LikeCount) if err != nil { diff --git a/common/topic_store.go b/common/topic_store.go index e089bce9..96b3f9d2 100644 --- a/common/topic_store.go +++ b/common/topic_store.go @@ -1,7 +1,7 @@ /* * * Gosora Topic Store -* Copyright Azareal 2017 - 2018 +* Copyright Azareal 2017 - 2019 * */ package common diff --git a/common/user_cache.go b/common/user_cache.go index f738a184..8678908c 100644 --- a/common/user_cache.go +++ b/common/user_cache.go @@ -7,6 +7,7 @@ import ( // UserCache is an interface which spits out users from a fast cache rather than the database, whether from memory or from an application like Redis. Users may not be present in the cache but may be in the database type UserCache interface { + DeallocOverflow() // May cause thread contention, looks for items to evict Get(id int) (*User, error) GetUnsafe(id int) (*User, error) BulkGet(ids []int) (list []*User) @@ -23,7 +24,7 @@ type UserCache interface { // MemoryUserCache stores and pulls users out of the current process' memory type MemoryUserCache struct { - items map[int]*User + items map[int]*User // TODO: Shard this into two? length int64 capacity int @@ -38,6 +39,39 @@ func NewMemoryUserCache(capacity int) *MemoryUserCache { } } +// TODO: Avoid deallocating topic list users +func (mus *MemoryUserCache) DeallocOverflow() { + var toEvict = make([]int, 10) + var evIndex = 0 + mus.RLock() + for _, user := range mus.items { + if /*user.LastActiveAt < lastActiveCutoff && */ user.Score == 0 && !user.IsMod { + if EnableWebsockets && WsHub.HasUser(user.ID) { + continue + } + toEvict[evIndex] = user.ID + evIndex++ + if evIndex == 10 { + break + } + } + } + mus.RUnlock() + + // Remove zero IDs from the evictable list, so we don't waste precious cycles locked for those + var lastZero = -1 + for i, uid := range toEvict { + if uid == 0 { + lastZero = i + } + } + if lastZero != -1 { + toEvict = toEvict[:lastZero] + } + + mus.BulkRemove(toEvict) +} + // Get fetches a user by ID. Returns ErrNoRows if not present. func (mus *MemoryUserCache) Get(id int) (*User, error) { mus.RLock() @@ -136,6 +170,20 @@ func (mus *MemoryUserCache) RemoveUnsafe(id int) error { return nil } +func (mus *MemoryUserCache) BulkRemove(ids []int) { + var rCount int64 + mus.Lock() + for _, id := range ids { + _, ok := mus.items[id] + if ok { + delete(mus.items, id) + rCount++ + } + } + mus.Unlock() + atomic.AddInt64(&mus.length, -rCount) +} + // Flush removes all the users from the cache, useful for tests. func (mus *MemoryUserCache) Flush() { mus.Lock() diff --git a/common/user_store.go b/common/user_store.go index 485368f3..46d23640 100644 --- a/common/user_store.go +++ b/common/user_store.go @@ -66,6 +66,9 @@ func (mus *DefaultUserStore) DirtyGet(id int) *User { if err == nil { return user } + /*if mus.OutOfBounds(id) { + return BlankUser() + }*/ user = &User{ID: id, Loggedin: true} err = mus.get.QueryRow(id).Scan(&user.Name, &user.Group, &user.Active, &user.IsSuperAdmin, &user.Session, &user.Email, &user.RawAvatar, &user.Message, &user.URLPrefix, &user.URLName, &user.Level, &user.Score, &user.Liked, &user.LastIP, &user.TempGroup) @@ -143,6 +146,13 @@ func (mus *DefaultUserStore) BulkGetMap(ids []int) (list map[int]*User, err erro // If every user is in the cache, then return immediately if len(ids) == 0 { return list, nil + } else if len(ids) == 1 { + topic, err := mus.Get(ids[0]) + if err != nil { + return list, err + } + list[topic.ID] = topic + return list, nil } // TODO: Add a function for the qlist stuff diff --git a/common/ws_hub.go b/common/ws_hub.go index 498686b8..d8bd6159 100644 --- a/common/ws_hub.go +++ b/common/ws_hub.go @@ -12,6 +12,7 @@ import ( var WsHub WsHubImpl // TODO: Make this an interface? +// TODO: Write tests for this type WsHubImpl struct { // TODO: Implement some form of generics so we don't write as much odd-even sharding code evenOnlineUsers map[int]*WSUser @@ -198,6 +199,19 @@ func (hub *WsHubImpl) UserCount() (count int) { return count } +func (hub *WsHubImpl) HasUser(uid int) (exists bool) { + hub.evenUserLock.RLock() + _, exists = hub.evenOnlineUsers[uid] + hub.evenUserLock.RUnlock() + if exists { + return exists + } + hub.oddUserLock.RLock() + _, exists = hub.oddOnlineUsers[uid] + hub.oddUserLock.RUnlock() + return exists +} + func (hub *WsHubImpl) broadcastMessage(msg string) error { var userLoop = func(users map[int]*WSUser, mutex *sync.RWMutex) error { defer mutex.RUnlock() diff --git a/main.go b/main.go index b37e4d46..45d2a159 100644 --- a/main.go +++ b/main.go @@ -372,6 +372,29 @@ func main() { hourTicker := time.NewTicker(time.Hour) go tickLoop(thumbChan, halfSecondTicker, secondTicker, fifteenMinuteTicker, hourTicker) + // Resource Management Goroutine + go func() { + ucache := common.Users.GetCache() + tcache := common.Topics.GetCache() + if ucache == nil && tcache == nil { + return + } + + for { + select { + case <-secondTicker.C: + // TODO: Add a LastRequested field to cached User structs to avoid evicting the same things which wind up getting loaded again anyway? + if ucache != nil { + ucap := ucache.GetCapacity() + if ucache.Length() <= ucap || common.Users.GlobalCount() <= ucap { + continue + } + //ucache.DeallocOverflow() + } + } + } + }() + log.Print("Initialising the router") router, err = NewGenRouter(http.FileServer(http.Dir("./uploads"))) if err != nil {