gosora/common/requests.go

131 lines
3.5 KiB
Go

package common
import (
"database/sql"
"sync"
"sync/atomic"
"../query_gen/lib"
)
var ReferrerTracker *DefaultReferrerTracker
// Add ReferrerItems here after they've had zero views for a while
var referrersToDelete = make(map[string]*ReferrerItem)
type ReferrerItem struct {
Count int64
}
// ? We'll track referrer domains here rather than the exact URL they arrived from for now, we'll think about expanding later
// ? Referrers are fluid and ever-changing so we have to use string keys rather than 'enum' ints
type DefaultReferrerTracker struct {
odd map[string]*ReferrerItem
even map[string]*ReferrerItem
oddLock sync.RWMutex
evenLock sync.RWMutex
insert *sql.Stmt
}
func NewDefaultReferrerTracker() (*DefaultReferrerTracker, error) {
acc := qgen.Builder.Accumulator()
refTracker := &DefaultReferrerTracker{
odd: make(map[string]*ReferrerItem),
even: make(map[string]*ReferrerItem),
insert: acc.Insert("viewchunks_referrers").Columns("count, createdAt, domain").Fields("?,UTC_TIMESTAMP(),?").Prepare(), // TODO: Do something more efficient than doing a query for each referrer
}
AddScheduledFifteenMinuteTask(refTracker.Tick)
//AddScheduledSecondTask(refTracker.Tick)
AddShutdownTask(refTracker.Tick)
return refTracker, acc.FirstError()
}
// TODO: Move this and the other view tickers out of the main task loop to avoid blocking other tasks?
func (ref *DefaultReferrerTracker) Tick() (err error) {
for referrer, counter := range referrersToDelete {
// Handle views which squeezed through the gaps at the last moment
count := counter.Count
if count != 0 {
err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed?
if err != nil {
return err
}
}
delete(referrersToDelete, referrer)
}
// Run the queries and schedule zero view refs for deletion from memory
ref.oddLock.Lock()
for referrer, counter := range ref.odd {
if counter.Count == 0 {
referrersToDelete[referrer] = counter
delete(ref.odd, referrer)
}
count := atomic.SwapInt64(&counter.Count, 0)
err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed?
if err != nil {
return err
}
}
ref.oddLock.Unlock()
ref.evenLock.Lock()
for referrer, counter := range ref.even {
if counter.Count == 0 {
referrersToDelete[referrer] = counter
delete(ref.even, referrer)
}
count := atomic.SwapInt64(&counter.Count, 0)
err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed?
if err != nil {
return err
}
}
ref.evenLock.Unlock()
return nil
}
func (ref *DefaultReferrerTracker) insertChunk(referrer string, count int64) error {
if count == 0 {
return nil
}
debugDetailf("Inserting a viewchunk with a count of %d for referrer %s", count, referrer)
_, err := ref.insert.Exec(count, referrer)
return err
}
func (ref *DefaultReferrerTracker) Bump(referrer string) {
if referrer == "" {
return
}
var refItem *ReferrerItem
// Slightly crude and rudimentary, but it should give a basic degree of sharding
if referrer[0]%2 == 0 {
ref.evenLock.RLock()
refItem = ref.even[referrer]
ref.evenLock.RUnlock()
if refItem != nil {
atomic.AddInt64(&refItem.Count, 1)
} else {
ref.evenLock.Lock()
ref.even[referrer] = &ReferrerItem{Count: 1}
ref.evenLock.Unlock()
}
} else {
ref.oddLock.RLock()
refItem = ref.odd[referrer]
ref.oddLock.RUnlock()
if refItem != nil {
atomic.AddInt64(&refItem.Count, 1)
} else {
ref.oddLock.Lock()
ref.odd[referrer] = &ReferrerItem{Count: 1}
ref.oddLock.Unlock()
}
}
}