package counters import ( "database/sql" "sync" "sync/atomic" ".." "../../query_gen/lib" ) var ReferrerTracker *DefaultReferrerTracker // Add ReferrerItems here after they've had zero views for a while var referrersToDelete = make(map[string]*ReferrerItem) type ReferrerItem struct { Count int64 } // ? We'll track referrer domains here rather than the exact URL they arrived from for now, we'll think about expanding later // ? Referrers are fluid and ever-changing so we have to use string keys rather than 'enum' ints type DefaultReferrerTracker struct { odd map[string]*ReferrerItem even map[string]*ReferrerItem oddLock sync.RWMutex evenLock sync.RWMutex insert *sql.Stmt } func NewDefaultReferrerTracker() (*DefaultReferrerTracker, error) { acc := qgen.Builder.Accumulator() refTracker := &DefaultReferrerTracker{ odd: make(map[string]*ReferrerItem), even: make(map[string]*ReferrerItem), insert: acc.Insert("viewchunks_referrers").Columns("count, createdAt, domain").Fields("?,UTC_TIMESTAMP(),?").Prepare(), // TODO: Do something more efficient than doing a query for each referrer } common.AddScheduledFifteenMinuteTask(refTracker.Tick) //common.AddScheduledSecondTask(refTracker.Tick) common.AddShutdownTask(refTracker.Tick) return refTracker, acc.FirstError() } // TODO: Move this and the other view tickers out of the main task loop to avoid blocking other tasks? func (ref *DefaultReferrerTracker) Tick() (err error) { for referrer, counter := range referrersToDelete { // Handle views which squeezed through the gaps at the last moment count := counter.Count if count != 0 { err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed? if err != nil { return err } } delete(referrersToDelete, referrer) } // Run the queries and schedule zero view refs for deletion from memory ref.oddLock.Lock() for referrer, counter := range ref.odd { if counter.Count == 0 { referrersToDelete[referrer] = counter delete(ref.odd, referrer) } count := atomic.SwapInt64(&counter.Count, 0) err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed? if err != nil { return err } } ref.oddLock.Unlock() ref.evenLock.Lock() for referrer, counter := range ref.even { if counter.Count == 0 { referrersToDelete[referrer] = counter delete(ref.even, referrer) } count := atomic.SwapInt64(&counter.Count, 0) err := ref.insertChunk(referrer, count) // TODO: Bulk insert for speed? if err != nil { return err } } ref.evenLock.Unlock() return nil } func (ref *DefaultReferrerTracker) insertChunk(referrer string, count int64) error { if count == 0 { return nil } common.DebugDetailf("Inserting a viewchunk with a count of %d for referrer %s", count, referrer) _, err := ref.insert.Exec(count, referrer) return err } func (ref *DefaultReferrerTracker) Bump(referrer string) { if referrer == "" { return } var refItem *ReferrerItem // Slightly crude and rudimentary, but it should give a basic degree of sharding if referrer[0]%2 == 0 { ref.evenLock.RLock() refItem = ref.even[referrer] ref.evenLock.RUnlock() if refItem != nil { atomic.AddInt64(&refItem.Count, 1) } else { ref.evenLock.Lock() ref.even[referrer] = &ReferrerItem{Count: 1} ref.evenLock.Unlock() } } else { ref.oddLock.RLock() refItem = ref.odd[referrer] ref.oddLock.RUnlock() if refItem != nil { atomic.AddInt64(&refItem.Count, 1) } else { ref.oddLock.Lock() ref.odd[referrer] = &ReferrerItem{Count: 1} ref.oddLock.Unlock() } } }