diff --git a/stats/stats_test.go b/stats/stats_test.go index f540e7cb..0c7c4bb5 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -6,7 +6,6 @@ import ( "os" "sync/atomic" "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -105,7 +104,6 @@ func TestLargeNumbers(t *testing.T) { for h := 0; h != 12; h++ { if h != 0 { atomic.AddInt32(&hour, 1) - time.Sleep(1500 * time.Millisecond) } for i := 0; i != n; i++ { e.Domain = fmt.Sprintf("domain%d", i) diff --git a/stats/stats_unit.go b/stats/stats_unit.go index 3db14d5b..72ccfc36 100644 --- a/stats/stats_unit.go +++ b/stats/stats_unit.go @@ -207,6 +207,13 @@ func btoi(b []byte) uint64 { } // Flush the current unit to DB and delete an old unit when a new hour is started +// If a unit must be flushed: +// . lock DB +// . atomically set a new empty unit as the current one and get the old unit +// This is important to do it inside DB lock, so the reader won't get inconsistent results. +// . write the unit to DB +// . remove the stale unit from DB +// . unlock DB func (s *statsCtx) periodicFlush() { for { s.unitLock.Lock() @@ -222,12 +229,13 @@ func (s *statsCtx) periodicFlush() { continue } + tx := s.beginTxn(true) + nu := unit{} s.initUnit(&nu, id) u := s.swapUnit(&nu) udb := serialize(u) - tx := s.beginTxn(true) if tx == nil { continue } @@ -455,15 +463,20 @@ func (s *statsCtx) Update(e Entry) { s.unitLock.Unlock() } -func (s *statsCtx) loadUnits(lastID uint32) []*unitDB { +func (s *statsCtx) loadUnits() ([]*unitDB, uint32) { tx := s.beginTxn(false) if tx == nil { - return nil + return nil, 0 } + s.unitLock.Lock() + curUnit := serialize(s.unit) + curID := s.unit.id + s.unitLock.Unlock() + units := []*unitDB{} //per-hour units - firstID := lastID - s.limit + 1 - for i := firstID; i != lastID; i++ { + firstID := curID - s.limit + 1 + for i := firstID; i != curID; i++ { u := s.loadUnitFromDB(tx, i) if u == nil { u = &unitDB{} @@ -474,20 +487,13 @@ func (s *statsCtx) loadUnits(lastID uint32) []*unitDB { _ = tx.Rollback() - s.unitLock.Lock() - cu := serialize(s.unit) - cuID := s.unit.id - s.unitLock.Unlock() - if cuID != lastID { - units = units[1:] - } - units = append(units, cu) + units = append(units, curUnit) if len(units) != int(s.limit) { log.Fatalf("len(units) != s.limit: %d %d", len(units), s.limit) } - return units + return units, firstID } /* Algorithm: @@ -521,9 +527,7 @@ func (s *statsCtx) loadUnits(lastID uint32) []*unitDB { func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} { d := map[string]interface{}{} - lastID := s.conf.UnitID() - firstID := lastID - s.limit + 1 - units := s.loadUnits(lastID) + units, firstID := s.loadUnits() if units == nil { return nil } @@ -699,8 +703,7 @@ func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} { } func (s *statsCtx) GetTopClientsIP(limit uint) []string { - lastID := s.conf.UnitID() - units := s.loadUnits(lastID) + units, _ := s.loadUnits() if units == nil { return nil }