simplify the agents, forums, memory and referrer counters
move DirSize into it's own file
This commit is contained in:
parent
01ccdb2087
commit
a465850adb
@ -4,7 +4,8 @@ import (
|
||||
"database/sql"
|
||||
|
||||
c "github.com/Azareal/Gosora/common"
|
||||
"github.com/Azareal/Gosora/query_gen"
|
||||
qgen "github.com/Azareal/Gosora/query_gen"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var AgentViewCounter *DefaultAgentViewCounter
|
||||
@ -19,49 +20,49 @@ func NewDefaultAgentViewCounter(acc *qgen.Accumulator) (*DefaultAgentViewCounter
|
||||
for bucketID, _ := range agentBuckets {
|
||||
agentBuckets[bucketID] = &RWMutexCounterBucket{counter: 0}
|
||||
}
|
||||
counter := &DefaultAgentViewCounter{
|
||||
co := &DefaultAgentViewCounter{
|
||||
agentBuckets: agentBuckets,
|
||||
insert: acc.Insert("viewchunks_agents").Columns("count, createdAt, browser").Fields("?,UTC_TIMESTAMP(),?").Prepare(),
|
||||
}
|
||||
c.AddScheduledFifteenMinuteTask(counter.Tick)
|
||||
//c.AddScheduledSecondTask(counter.Tick)
|
||||
c.AddShutdownTask(counter.Tick)
|
||||
return counter, acc.FirstError()
|
||||
c.AddScheduledFifteenMinuteTask(co.Tick)
|
||||
//c.AddScheduledSecondTask(co.Tick)
|
||||
c.AddShutdownTask(co.Tick)
|
||||
return co, acc.FirstError()
|
||||
}
|
||||
|
||||
func (counter *DefaultAgentViewCounter) Tick() error {
|
||||
for agentID, agentBucket := range counter.agentBuckets {
|
||||
func (co *DefaultAgentViewCounter) Tick() error {
|
||||
for agentID, agentBucket := range co.agentBuckets {
|
||||
var count int
|
||||
agentBucket.RLock()
|
||||
count = agentBucket.counter
|
||||
agentBucket.counter = 0
|
||||
agentBucket.RUnlock()
|
||||
|
||||
err := counter.insertChunk(count, agentID) // TODO: Bulk insert for speed?
|
||||
err := co.insertChunk(count, agentID) // TODO: Bulk insert for speed?
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(errors.WithStack(err), "agent counter")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (counter *DefaultAgentViewCounter) insertChunk(count int, agent int) error {
|
||||
func (co *DefaultAgentViewCounter) insertChunk(count int, agent int) error {
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
var agentName = reverseAgentMapEnum[agent]
|
||||
c.DebugLogf("Inserting a viewchunk with a count of %d for agent %s (%d)", count, agentName, agent)
|
||||
_, err := counter.insert.Exec(count, agentName)
|
||||
agentName := reverseAgentMapEnum[agent]
|
||||
c.DebugLogf("Inserting a vchunk with a count of %d for agent %s (%d)", count, agentName, agent)
|
||||
_, err := co.insert.Exec(count, agentName)
|
||||
return err
|
||||
}
|
||||
|
||||
func (counter *DefaultAgentViewCounter) Bump(agent int) {
|
||||
func (co *DefaultAgentViewCounter) Bump(agent int) {
|
||||
// TODO: Test this check
|
||||
c.DebugDetail("counter.agentBuckets[", agent, "]: ", counter.agentBuckets[agent])
|
||||
if len(counter.agentBuckets) <= agent || agent < 0 {
|
||||
c.DebugDetail("co.agentBuckets[", agent, "]: ", co.agentBuckets[agent])
|
||||
if len(co.agentBuckets) <= agent || agent < 0 {
|
||||
return
|
||||
}
|
||||
counter.agentBuckets[agent].Lock()
|
||||
counter.agentBuckets[agent].counter++
|
||||
counter.agentBuckets[agent].Unlock()
|
||||
co.agentBuckets[agent].Lock()
|
||||
co.agentBuckets[agent].counter++
|
||||
co.agentBuckets[agent].Unlock()
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
|
||||
c "github.com/Azareal/Gosora/common"
|
||||
"github.com/Azareal/Gosora/query_gen"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var ForumViewCounter *DefaultForumViewCounter
|
||||
@ -23,95 +24,84 @@ type DefaultForumViewCounter struct {
|
||||
|
||||
func NewDefaultForumViewCounter() (*DefaultForumViewCounter, error) {
|
||||
acc := qgen.NewAcc()
|
||||
counter := &DefaultForumViewCounter{
|
||||
co := &DefaultForumViewCounter{
|
||||
oddMap: make(map[int]*RWMutexCounterBucket),
|
||||
evenMap: make(map[int]*RWMutexCounterBucket),
|
||||
insert: acc.Insert("viewchunks_forums").Columns("count, createdAt, forum").Fields("?,UTC_TIMESTAMP(),?").Prepare(),
|
||||
}
|
||||
c.AddScheduledFifteenMinuteTask(counter.Tick) // There could be a lot of routes, so we don't want to be running this every second
|
||||
//c.AddScheduledSecondTask(counter.Tick)
|
||||
c.AddShutdownTask(counter.Tick)
|
||||
return counter, acc.FirstError()
|
||||
c.AddScheduledFifteenMinuteTask(co.Tick) // There could be a lot of routes, so we don't want to be running this every second
|
||||
//c.AddScheduledSecondTask(co.Tick)
|
||||
c.AddShutdownTask(co.Tick)
|
||||
return co, acc.FirstError()
|
||||
}
|
||||
|
||||
func (counter *DefaultForumViewCounter) Tick() error {
|
||||
counter.oddLock.RLock()
|
||||
oddMap := counter.oddMap
|
||||
counter.oddLock.RUnlock()
|
||||
for forumID, forum := range oddMap {
|
||||
var count int
|
||||
forum.RLock()
|
||||
count = forum.counter
|
||||
forum.RUnlock()
|
||||
// TODO: Only delete the bucket when it's zero to avoid hitting popular forums?
|
||||
counter.oddLock.Lock()
|
||||
delete(counter.oddMap, forumID)
|
||||
counter.oddLock.Unlock()
|
||||
err := counter.insertChunk(count, forumID)
|
||||
if err != nil {
|
||||
return err
|
||||
func (co *DefaultForumViewCounter) Tick() error {
|
||||
cLoop := func(l *sync.RWMutex, m map[int]*RWMutexCounterBucket) error {
|
||||
l.RLock()
|
||||
for forumID, forum := range m {
|
||||
l.RUnlock()
|
||||
var count int
|
||||
forum.RLock()
|
||||
count = forum.counter
|
||||
forum.RUnlock()
|
||||
// TODO: Only delete the bucket when it's zero to avoid hitting popular forums?
|
||||
l.Lock()
|
||||
delete(m, forumID)
|
||||
l.Unlock()
|
||||
err := co.insertChunk(count, forumID)
|
||||
if err != nil {
|
||||
return errors.Wrap(errors.WithStack(err),"forum counter")
|
||||
}
|
||||
l.RLock()
|
||||
}
|
||||
l.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
counter.evenLock.RLock()
|
||||
evenMap := counter.evenMap
|
||||
counter.evenLock.RUnlock()
|
||||
for forumID, forum := range evenMap {
|
||||
var count int
|
||||
forum.RLock()
|
||||
count = forum.counter
|
||||
forum.RUnlock()
|
||||
// TODO: Only delete the bucket when it's zero to avoid hitting popular forums?
|
||||
counter.evenLock.Lock()
|
||||
delete(counter.evenMap, forumID)
|
||||
counter.evenLock.Unlock()
|
||||
err := counter.insertChunk(count, forumID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err := cLoop(&co.oddLock,co.oddMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return cLoop(&co.evenLock,co.evenMap)
|
||||
}
|
||||
|
||||
func (counter *DefaultForumViewCounter) insertChunk(count int, forum int) error {
|
||||
func (co *DefaultForumViewCounter) insertChunk(count int, forum int) error {
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
c.DebugLogf("Inserting a viewchunk with a count of %d for forum %d", count, forum)
|
||||
_, err := counter.insert.Exec(count, forum)
|
||||
c.DebugLogf("Inserting a vchunk with a count of %d for forum %d", count, forum)
|
||||
_, err := co.insert.Exec(count, forum)
|
||||
return err
|
||||
}
|
||||
|
||||
func (counter *DefaultForumViewCounter) Bump(forumID int) {
|
||||
func (co *DefaultForumViewCounter) Bump(forumID int) {
|
||||
// Is the ID even?
|
||||
if forumID%2 == 0 {
|
||||
counter.evenLock.RLock()
|
||||
forum, ok := counter.evenMap[forumID]
|
||||
counter.evenLock.RUnlock()
|
||||
co.evenLock.RLock()
|
||||
forum, ok := co.evenMap[forumID]
|
||||
co.evenLock.RUnlock()
|
||||
if ok {
|
||||
forum.Lock()
|
||||
forum.counter++
|
||||
forum.Unlock()
|
||||
} else {
|
||||
counter.evenLock.Lock()
|
||||
counter.evenMap[forumID] = &RWMutexCounterBucket{counter: 1}
|
||||
counter.evenLock.Unlock()
|
||||
co.evenLock.Lock()
|
||||
co.evenMap[forumID] = &RWMutexCounterBucket{counter: 1}
|
||||
co.evenLock.Unlock()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
counter.oddLock.RLock()
|
||||
forum, ok := counter.oddMap[forumID]
|
||||
counter.oddLock.RUnlock()
|
||||
co.oddLock.RLock()
|
||||
forum, ok := co.oddMap[forumID]
|
||||
co.oddLock.RUnlock()
|
||||
if ok {
|
||||
forum.Lock()
|
||||
forum.counter++
|
||||
forum.Unlock()
|
||||
} else {
|
||||
counter.oddLock.Lock()
|
||||
counter.oddMap[forumID] = &RWMutexCounterBucket{counter: 1}
|
||||
counter.oddLock.Unlock()
|
||||
co.oddLock.Lock()
|
||||
co.oddMap[forumID] = &RWMutexCounterBucket{counter: 1}
|
||||
co.oddLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,8 +1,12 @@
|
||||
package counters
|
||||
|
||||
import "database/sql"
|
||||
import c "github.com/Azareal/Gosora/common"
|
||||
import "github.com/Azareal/Gosora/query_gen"
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
c "github.com/Azareal/Gosora/common"
|
||||
qgen "github.com/Azareal/Gosora/query_gen"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var LangViewCounter *DefaultLangViewCounter
|
||||
|
||||
@ -101,56 +105,56 @@ type DefaultLangViewCounter struct {
|
||||
}
|
||||
|
||||
func NewDefaultLangViewCounter(acc *qgen.Accumulator) (*DefaultLangViewCounter, error) {
|
||||
var langBuckets = make([]*RWMutexCounterBucket, len(langCodes))
|
||||
langBuckets := make([]*RWMutexCounterBucket, len(langCodes))
|
||||
for bucketID, _ := range langBuckets {
|
||||
langBuckets[bucketID] = &RWMutexCounterBucket{counter: 0}
|
||||
}
|
||||
var codesToIndices = make(map[string]int)
|
||||
codesToIndices := make(map[string]int, len(langCodes))
|
||||
for index, code := range langCodes {
|
||||
codesToIndices[code] = index
|
||||
}
|
||||
|
||||
counter := &DefaultLangViewCounter{
|
||||
co := &DefaultLangViewCounter{
|
||||
buckets: langBuckets,
|
||||
codesToIndices: codesToIndices,
|
||||
insert: acc.Insert("viewchunks_langs").Columns("count, createdAt, lang").Fields("?,UTC_TIMESTAMP(),?").Prepare(),
|
||||
}
|
||||
|
||||
c.AddScheduledFifteenMinuteTask(counter.Tick)
|
||||
//c.AddScheduledSecondTask(counter.Tick)
|
||||
c.AddShutdownTask(counter.Tick)
|
||||
return counter, acc.FirstError()
|
||||
c.AddScheduledFifteenMinuteTask(co.Tick)
|
||||
//c.AddScheduledSecondTask(co.Tick)
|
||||
c.AddShutdownTask(co.Tick)
|
||||
return co, acc.FirstError()
|
||||
}
|
||||
|
||||
func (counter *DefaultLangViewCounter) Tick() error {
|
||||
for id, bucket := range counter.buckets {
|
||||
func (co *DefaultLangViewCounter) Tick() error {
|
||||
for id, bucket := range co.buckets {
|
||||
var count int
|
||||
bucket.RLock()
|
||||
count = bucket.counter
|
||||
bucket.counter = 0 // TODO: Add a SetZero method to reduce the amount of duplicate code between the OS and agent counters?
|
||||
bucket.RUnlock()
|
||||
|
||||
err := counter.insertChunk(count, id) // TODO: Bulk insert for speed?
|
||||
err := co.insertChunk(count, id) // TODO: Bulk insert for speed?
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(errors.WithStack(err), "langview counter")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (counter *DefaultLangViewCounter) insertChunk(count int, id int) error {
|
||||
func (co *DefaultLangViewCounter) insertChunk(count int, id int) error {
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
var langCode = langCodes[id]
|
||||
c.DebugLogf("Inserting a viewchunk with a count of %d for lang %s (%d)", count, langCode, id)
|
||||
_, err := counter.insert.Exec(count, langCode)
|
||||
langCode := langCodes[id]
|
||||
c.DebugLogf("Inserting a vchunk with a count of %d for lang %s (%d)", count, langCode, id)
|
||||
_, err := co.insert.Exec(count, langCode)
|
||||
return err
|
||||
}
|
||||
|
||||
func (counter *DefaultLangViewCounter) Bump(langCode string) (validCode bool) {
|
||||
func (co *DefaultLangViewCounter) Bump(langCode string) (validCode bool) {
|
||||
validCode = true
|
||||
id, ok := counter.codesToIndices[langCode]
|
||||
id, ok := co.codesToIndices[langCode]
|
||||
if !ok {
|
||||
// TODO: Tell the caller that the code's invalid
|
||||
id = 0 // Unknown
|
||||
@ -158,13 +162,13 @@ func (counter *DefaultLangViewCounter) Bump(langCode string) (validCode bool) {
|
||||
}
|
||||
|
||||
// TODO: Test this check
|
||||
c.DebugDetail("counter.buckets[", id, "]: ", counter.buckets[id])
|
||||
if len(counter.buckets) <= id || id < 0 {
|
||||
c.DebugDetail("co.buckets[", id, "]: ", co.buckets[id])
|
||||
if len(co.buckets) <= id || id < 0 {
|
||||
return validCode
|
||||
}
|
||||
counter.buckets[id].Lock()
|
||||
counter.buckets[id].counter++
|
||||
counter.buckets[id].Unlock()
|
||||
co.buckets[id].Lock()
|
||||
co.buckets[id].counter++
|
||||
co.buckets[id].Unlock()
|
||||
|
||||
return validCode
|
||||
}
|
||||
|
@ -1,13 +1,14 @@
|
||||
package counters
|
||||
|
||||
import (
|
||||
"time"
|
||||
"sync"
|
||||
"runtime"
|
||||
"database/sql"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
c "github.com/Azareal/Gosora/common"
|
||||
"github.com/Azareal/Gosora/query_gen"
|
||||
qgen "github.com/Azareal/Gosora/query_gen"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var MemoryCounter *DefaultMemoryCounter
|
||||
@ -15,19 +16,19 @@ var MemoryCounter *DefaultMemoryCounter
|
||||
type DefaultMemoryCounter struct {
|
||||
insert *sql.Stmt
|
||||
|
||||
totMem uint64
|
||||
totCount uint64
|
||||
stackMem uint64
|
||||
totMem uint64
|
||||
totCount uint64
|
||||
stackMem uint64
|
||||
stackCount uint64
|
||||
heapMem uint64
|
||||
heapCount uint64
|
||||
heapMem uint64
|
||||
heapCount uint64
|
||||
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
func NewMemoryCounter(acc *qgen.Accumulator) (*DefaultMemoryCounter, error) {
|
||||
co := &DefaultMemoryCounter{
|
||||
insert: acc.Insert("memchunks").Columns("count, stack, heap, createdAt").Fields("?,?,?,UTC_TIMESTAMP()").Prepare(),
|
||||
insert: acc.Insert("memchunks").Columns("count, stack, heap, createdAt").Fields("?,?,?,UTC_TIMESTAMP()").Prepare(),
|
||||
}
|
||||
c.AddScheduledFifteenMinuteTask(co.Tick)
|
||||
//c.AddScheduledSecondTask(co.Tick)
|
||||
@ -81,5 +82,8 @@ func (co *DefaultMemoryCounter) Tick() (err error) {
|
||||
|
||||
c.DebugLogf("Inserting a memchunk with a value of %d - %d - %d", avgMem, avgStack, avgHeap)
|
||||
_, err = co.insert.Exec(avgMem, avgStack, avgHeap)
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Wrap(errors.WithStack(err), "mem counter")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -5,8 +5,9 @@ import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/Azareal/Gosora/common"
|
||||
c "github.com/Azareal/Gosora/common"
|
||||
"github.com/Azareal/Gosora/query_gen"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var ReferrerTracker *DefaultReferrerTracker
|
||||
@ -36,9 +37,9 @@ func NewDefaultReferrerTracker() (*DefaultReferrerTracker, error) {
|
||||
even: make(map[string]*ReferrerItem),
|
||||
insert: acc.Insert("viewchunks_referrers").Columns("count, createdAt, domain").Fields("?,UTC_TIMESTAMP(),?").Prepare(), // TODO: Do something more efficient than doing a query for each referrer
|
||||
}
|
||||
common.AddScheduledFifteenMinuteTask(refTracker.Tick)
|
||||
//common.AddScheduledSecondTask(refTracker.Tick)
|
||||
common.AddShutdownTask(refTracker.Tick)
|
||||
c.AddScheduledFifteenMinuteTask(refTracker.Tick)
|
||||
//c.AddScheduledSecondTask(refTracker.Tick)
|
||||
c.AddShutdownTask(refTracker.Tick)
|
||||
return refTracker, acc.FirstError()
|
||||
}
|
||||
|
||||
@ -50,50 +51,41 @@ func (ref *DefaultReferrerTracker) Tick() (err error) {
|
||||
if count != 0 {
|
||||
err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed?
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.Wrap(errors.WithStack(err),"ref counter")
|
||||
}
|
||||
}
|
||||
|
||||
delete(referrersToDelete, referrer)
|
||||
}
|
||||
|
||||
// Run the queries and schedule zero view refs for deletion from memory
|
||||
ref.oddLock.Lock()
|
||||
for referrer, counter := range ref.odd {
|
||||
if counter.Count == 0 {
|
||||
referrersToDelete[referrer] = counter
|
||||
delete(ref.odd, referrer)
|
||||
}
|
||||
count := atomic.SwapInt64(&counter.Count, 0)
|
||||
err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed?
|
||||
if err != nil {
|
||||
return err
|
||||
refLoop := func(l *sync.RWMutex, m map[string]*ReferrerItem) error {
|
||||
l.Lock()
|
||||
defer l.Unlock()
|
||||
for referrer, counter := range m {
|
||||
if counter.Count == 0 {
|
||||
referrersToDelete[referrer] = counter
|
||||
delete(m, referrer)
|
||||
}
|
||||
count := atomic.SwapInt64(&counter.Count, 0)
|
||||
err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed?
|
||||
if err != nil {
|
||||
return errors.Wrap(errors.WithStack(err),"ref counter")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
ref.oddLock.Unlock()
|
||||
|
||||
ref.evenLock.Lock()
|
||||
for referrer, counter := range ref.even {
|
||||
if counter.Count == 0 {
|
||||
referrersToDelete[referrer] = counter
|
||||
delete(ref.even, referrer)
|
||||
}
|
||||
count := atomic.SwapInt64(&counter.Count, 0)
|
||||
err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed?
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = refLoop(&ref.oddLock,ref.odd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ref.evenLock.Unlock()
|
||||
|
||||
return nil
|
||||
return refLoop(&ref.evenLock,ref.even)
|
||||
}
|
||||
|
||||
func (ref *DefaultReferrerTracker) insertChunk(referrer string, count int64) error {
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
common.DebugDetailf("Inserting a viewchunk with a count of %d for referrer %s", count, referrer)
|
||||
c.DebugDetailf("Inserting a vchunk with a count of %d for referrer %s", count, referrer)
|
||||
_, err := ref.insert.Exec(count, referrer)
|
||||
return err
|
||||
}
|
||||
|
20
common/disk.go
Normal file
20
common/disk.go
Normal file
@ -0,0 +1,20 @@
|
||||
package common
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"os"
|
||||
)
|
||||
|
||||
func DirSize(path string) (int, error) {
|
||||
var size int64
|
||||
err := filepath.Walk(path, func(_ string, file os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !file.IsDir() {
|
||||
size += file.Size()
|
||||
}
|
||||
return err
|
||||
})
|
||||
return int(size), err
|
||||
}
|
@ -10,7 +10,6 @@ import (
|
||||
"crypto/rand"
|
||||
"encoding/base32"
|
||||
"encoding/base64"
|
||||
"path/filepath"
|
||||
"errors"
|
||||
"fmt"
|
||||
"html"
|
||||
@ -536,18 +535,4 @@ func BuildSlug(slug string, id int) string {
|
||||
return strconv.Itoa(id)
|
||||
}
|
||||
return slug + "." + strconv.Itoa(id)
|
||||
}
|
||||
|
||||
func DirSize(path string) (int, error) {
|
||||
var size int64
|
||||
err := filepath.Walk(path, func(_ string, file os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !file.IsDir() {
|
||||
size += file.Size()
|
||||
}
|
||||
return err
|
||||
})
|
||||
return int(size), err
|
||||
}
|
Loading…
Reference in New Issue
Block a user