Some resource management work and performance improvements.
Added the DeallocOverflow and BulkRemove methods to UserCache. Added the HasUser method to WsHubImpl. Added a hot path for the common case of a single uncached item in BulkGetMap in DefaultUserStore.
This commit is contained in:
parent
1854010840
commit
cd89e836a1
|
@ -10,6 +10,7 @@ func NewNullUserCache() *NullUserCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
// nolint
|
// nolint
|
||||||
|
func (mus *NullUserCache) DeallocOverflow() {}
|
||||||
func (mus *NullUserCache) Get(id int) (*User, error) {
|
func (mus *NullUserCache) Get(id int) (*User, error) {
|
||||||
return nil, ErrNoRows
|
return nil, ErrNoRows
|
||||||
}
|
}
|
||||||
|
@ -34,6 +35,7 @@ func (mus *NullUserCache) Remove(id int) error {
|
||||||
func (mus *NullUserCache) RemoveUnsafe(id int) error {
|
func (mus *NullUserCache) RemoveUnsafe(id int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (mus *NullUserCache) BulkRemove(ids []int) {}
|
||||||
func (mus *NullUserCache) Flush() {
|
func (mus *NullUserCache) Flush() {
|
||||||
}
|
}
|
||||||
func (mus *NullUserCache) Length() int {
|
func (mus *NullUserCache) Length() int {
|
||||||
|
|
|
@ -216,6 +216,7 @@ func (tList *DefaultTopicList) getList(page int, argList []interface{}, qlist st
|
||||||
}
|
}
|
||||||
offset, page, lastPage := PageOffset(topicCount, page, Config.ItemsPerPage)
|
offset, page, lastPage := PageOffset(topicCount, page, Config.ItemsPerPage)
|
||||||
|
|
||||||
|
// TODO: Prepare common qlist lengths to speed this up in common cases, prepared statements are prepared lazily anyway, so it probably doesn't matter if we do ten or so
|
||||||
stmt, err := qgen.Builder.SimpleSelect("topics", "tid, title, content, createdBy, is_closed, sticky, createdAt, lastReplyAt, lastReplyBy, parentID, views, postCount, likeCount", "parentID IN("+qlist+")", "sticky DESC, lastReplyAt DESC, createdBy DESC", "?,?")
|
stmt, err := qgen.Builder.SimpleSelect("topics", "tid, title, content, createdBy, is_closed, sticky, createdAt, lastReplyAt, lastReplyBy, parentID, views, postCount, likeCount", "parentID IN("+qlist+")", "sticky DESC, lastReplyAt DESC, createdBy DESC", "?,?")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, Paginator{nil, 1, 1}, err
|
return nil, Paginator{nil, 1, 1}, err
|
||||||
|
@ -233,6 +234,7 @@ func (tList *DefaultTopicList) getList(page int, argList []interface{}, qlist st
|
||||||
|
|
||||||
var reqUserList = make(map[int]bool)
|
var reqUserList = make(map[int]bool)
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
|
// TODO: Embed Topic structs in TopicsRow to make it easier for us to reuse this work in the topic cache
|
||||||
topicItem := TopicsRow{ID: 0}
|
topicItem := TopicsRow{ID: 0}
|
||||||
err := rows.Scan(&topicItem.ID, &topicItem.Title, &topicItem.Content, &topicItem.CreatedBy, &topicItem.IsClosed, &topicItem.Sticky, &topicItem.CreatedAt, &topicItem.LastReplyAt, &topicItem.LastReplyBy, &topicItem.ParentID, &topicItem.ViewCount, &topicItem.PostCount, &topicItem.LikeCount)
|
err := rows.Scan(&topicItem.ID, &topicItem.Title, &topicItem.Content, &topicItem.CreatedBy, &topicItem.IsClosed, &topicItem.Sticky, &topicItem.CreatedAt, &topicItem.LastReplyAt, &topicItem.LastReplyBy, &topicItem.ParentID, &topicItem.ViewCount, &topicItem.PostCount, &topicItem.LikeCount)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
/*
|
/*
|
||||||
*
|
*
|
||||||
* Gosora Topic Store
|
* Gosora Topic Store
|
||||||
* Copyright Azareal 2017 - 2018
|
* Copyright Azareal 2017 - 2019
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
package common
|
package common
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
// UserCache is an interface which spits out users from a fast cache rather than the database, whether from memory or from an application like Redis. Users may not be present in the cache but may be in the database
|
// UserCache is an interface which spits out users from a fast cache rather than the database, whether from memory or from an application like Redis. Users may not be present in the cache but may be in the database
|
||||||
type UserCache interface {
|
type UserCache interface {
|
||||||
|
DeallocOverflow() // May cause thread contention, looks for items to evict
|
||||||
Get(id int) (*User, error)
|
Get(id int) (*User, error)
|
||||||
GetUnsafe(id int) (*User, error)
|
GetUnsafe(id int) (*User, error)
|
||||||
BulkGet(ids []int) (list []*User)
|
BulkGet(ids []int) (list []*User)
|
||||||
|
@ -23,7 +24,7 @@ type UserCache interface {
|
||||||
|
|
||||||
// MemoryUserCache stores and pulls users out of the current process' memory
|
// MemoryUserCache stores and pulls users out of the current process' memory
|
||||||
type MemoryUserCache struct {
|
type MemoryUserCache struct {
|
||||||
items map[int]*User
|
items map[int]*User // TODO: Shard this into two?
|
||||||
length int64
|
length int64
|
||||||
capacity int
|
capacity int
|
||||||
|
|
||||||
|
@ -38,6 +39,39 @@ func NewMemoryUserCache(capacity int) *MemoryUserCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Avoid deallocating topic list users
|
||||||
|
func (mus *MemoryUserCache) DeallocOverflow() {
|
||||||
|
var toEvict = make([]int, 10)
|
||||||
|
var evIndex = 0
|
||||||
|
mus.RLock()
|
||||||
|
for _, user := range mus.items {
|
||||||
|
if /*user.LastActiveAt < lastActiveCutoff && */ user.Score == 0 && !user.IsMod {
|
||||||
|
if EnableWebsockets && WsHub.HasUser(user.ID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
toEvict[evIndex] = user.ID
|
||||||
|
evIndex++
|
||||||
|
if evIndex == 10 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mus.RUnlock()
|
||||||
|
|
||||||
|
// Remove zero IDs from the evictable list, so we don't waste precious cycles locked for those
|
||||||
|
var lastZero = -1
|
||||||
|
for i, uid := range toEvict {
|
||||||
|
if uid == 0 {
|
||||||
|
lastZero = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastZero != -1 {
|
||||||
|
toEvict = toEvict[:lastZero]
|
||||||
|
}
|
||||||
|
|
||||||
|
mus.BulkRemove(toEvict)
|
||||||
|
}
|
||||||
|
|
||||||
// Get fetches a user by ID. Returns ErrNoRows if not present.
|
// Get fetches a user by ID. Returns ErrNoRows if not present.
|
||||||
func (mus *MemoryUserCache) Get(id int) (*User, error) {
|
func (mus *MemoryUserCache) Get(id int) (*User, error) {
|
||||||
mus.RLock()
|
mus.RLock()
|
||||||
|
@ -136,6 +170,20 @@ func (mus *MemoryUserCache) RemoveUnsafe(id int) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mus *MemoryUserCache) BulkRemove(ids []int) {
|
||||||
|
var rCount int64
|
||||||
|
mus.Lock()
|
||||||
|
for _, id := range ids {
|
||||||
|
_, ok := mus.items[id]
|
||||||
|
if ok {
|
||||||
|
delete(mus.items, id)
|
||||||
|
rCount++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mus.Unlock()
|
||||||
|
atomic.AddInt64(&mus.length, -rCount)
|
||||||
|
}
|
||||||
|
|
||||||
// Flush removes all the users from the cache, useful for tests.
|
// Flush removes all the users from the cache, useful for tests.
|
||||||
func (mus *MemoryUserCache) Flush() {
|
func (mus *MemoryUserCache) Flush() {
|
||||||
mus.Lock()
|
mus.Lock()
|
||||||
|
|
|
@ -66,6 +66,9 @@ func (mus *DefaultUserStore) DirtyGet(id int) *User {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return user
|
return user
|
||||||
}
|
}
|
||||||
|
/*if mus.OutOfBounds(id) {
|
||||||
|
return BlankUser()
|
||||||
|
}*/
|
||||||
|
|
||||||
user = &User{ID: id, Loggedin: true}
|
user = &User{ID: id, Loggedin: true}
|
||||||
err = mus.get.QueryRow(id).Scan(&user.Name, &user.Group, &user.Active, &user.IsSuperAdmin, &user.Session, &user.Email, &user.RawAvatar, &user.Message, &user.URLPrefix, &user.URLName, &user.Level, &user.Score, &user.Liked, &user.LastIP, &user.TempGroup)
|
err = mus.get.QueryRow(id).Scan(&user.Name, &user.Group, &user.Active, &user.IsSuperAdmin, &user.Session, &user.Email, &user.RawAvatar, &user.Message, &user.URLPrefix, &user.URLName, &user.Level, &user.Score, &user.Liked, &user.LastIP, &user.TempGroup)
|
||||||
|
@ -143,6 +146,13 @@ func (mus *DefaultUserStore) BulkGetMap(ids []int) (list map[int]*User, err erro
|
||||||
// If every user is in the cache, then return immediately
|
// If every user is in the cache, then return immediately
|
||||||
if len(ids) == 0 {
|
if len(ids) == 0 {
|
||||||
return list, nil
|
return list, nil
|
||||||
|
} else if len(ids) == 1 {
|
||||||
|
topic, err := mus.Get(ids[0])
|
||||||
|
if err != nil {
|
||||||
|
return list, err
|
||||||
|
}
|
||||||
|
list[topic.ID] = topic
|
||||||
|
return list, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Add a function for the qlist stuff
|
// TODO: Add a function for the qlist stuff
|
||||||
|
|
|
@ -12,6 +12,7 @@ import (
|
||||||
var WsHub WsHubImpl
|
var WsHub WsHubImpl
|
||||||
|
|
||||||
// TODO: Make this an interface?
|
// TODO: Make this an interface?
|
||||||
|
// TODO: Write tests for this
|
||||||
type WsHubImpl struct {
|
type WsHubImpl struct {
|
||||||
// TODO: Implement some form of generics so we don't write as much odd-even sharding code
|
// TODO: Implement some form of generics so we don't write as much odd-even sharding code
|
||||||
evenOnlineUsers map[int]*WSUser
|
evenOnlineUsers map[int]*WSUser
|
||||||
|
@ -198,6 +199,19 @@ func (hub *WsHubImpl) UserCount() (count int) {
|
||||||
return count
|
return count
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (hub *WsHubImpl) HasUser(uid int) (exists bool) {
|
||||||
|
hub.evenUserLock.RLock()
|
||||||
|
_, exists = hub.evenOnlineUsers[uid]
|
||||||
|
hub.evenUserLock.RUnlock()
|
||||||
|
if exists {
|
||||||
|
return exists
|
||||||
|
}
|
||||||
|
hub.oddUserLock.RLock()
|
||||||
|
_, exists = hub.oddOnlineUsers[uid]
|
||||||
|
hub.oddUserLock.RUnlock()
|
||||||
|
return exists
|
||||||
|
}
|
||||||
|
|
||||||
func (hub *WsHubImpl) broadcastMessage(msg string) error {
|
func (hub *WsHubImpl) broadcastMessage(msg string) error {
|
||||||
var userLoop = func(users map[int]*WSUser, mutex *sync.RWMutex) error {
|
var userLoop = func(users map[int]*WSUser, mutex *sync.RWMutex) error {
|
||||||
defer mutex.RUnlock()
|
defer mutex.RUnlock()
|
||||||
|
|
23
main.go
23
main.go
|
@ -372,6 +372,29 @@ func main() {
|
||||||
hourTicker := time.NewTicker(time.Hour)
|
hourTicker := time.NewTicker(time.Hour)
|
||||||
go tickLoop(thumbChan, halfSecondTicker, secondTicker, fifteenMinuteTicker, hourTicker)
|
go tickLoop(thumbChan, halfSecondTicker, secondTicker, fifteenMinuteTicker, hourTicker)
|
||||||
|
|
||||||
|
// Resource Management Goroutine
|
||||||
|
go func() {
|
||||||
|
ucache := common.Users.GetCache()
|
||||||
|
tcache := common.Topics.GetCache()
|
||||||
|
if ucache == nil && tcache == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-secondTicker.C:
|
||||||
|
// TODO: Add a LastRequested field to cached User structs to avoid evicting the same things which wind up getting loaded again anyway?
|
||||||
|
if ucache != nil {
|
||||||
|
ucap := ucache.GetCapacity()
|
||||||
|
if ucache.Length() <= ucap || common.Users.GlobalCount() <= ucap {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
//ucache.DeallocOverflow()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
log.Print("Initialising the router")
|
log.Print("Initialising the router")
|
||||||
router, err = NewGenRouter(http.FileServer(http.Dir("./uploads")))
|
router, err = NewGenRouter(http.FileServer(http.Dir("./uploads")))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue