- stats: fix read-write race

* periodicFlush() operation doesn't result in an inconsistent state at any time
* stats readers use the last unit ID properly, excluding the possibility
 when unit ID could be changed, but this unit isn't yet created
This commit is contained in:
Simon Zolin 2019-11-11 16:18:20 +03:00
parent 2a6e9f3c11
commit f64868472a

View File

@ -207,6 +207,13 @@ func btoi(b []byte) uint64 {
} }
// Flush the current unit to DB and delete an old unit when a new hour is started // Flush the current unit to DB and delete an old unit when a new hour is started
// If a unit must be flushed:
// . lock DB
// . atomically set a new empty unit as the current one and get the old unit
// This is important to do it inside DB lock, so the reader won't get inconsistent results.
// . write the unit to DB
// . remove the stale unit from DB
// . unlock DB
func (s *statsCtx) periodicFlush() { func (s *statsCtx) periodicFlush() {
for { for {
s.unitLock.Lock() s.unitLock.Lock()
@ -222,12 +229,13 @@ func (s *statsCtx) periodicFlush() {
continue continue
} }
tx := s.beginTxn(true)
nu := unit{} nu := unit{}
s.initUnit(&nu, id) s.initUnit(&nu, id)
u := s.swapUnit(&nu) u := s.swapUnit(&nu)
udb := serialize(u) udb := serialize(u)
tx := s.beginTxn(true)
if tx == nil { if tx == nil {
continue continue
} }
@ -455,15 +463,20 @@ func (s *statsCtx) Update(e Entry) {
s.unitLock.Unlock() s.unitLock.Unlock()
} }
func (s *statsCtx) loadUnits(lastID uint32) []*unitDB { func (s *statsCtx) loadUnits() ([]*unitDB, uint32) {
tx := s.beginTxn(false) tx := s.beginTxn(false)
if tx == nil { if tx == nil {
return nil return nil, 0
} }
s.unitLock.Lock()
curUnit := serialize(s.unit)
curID := s.unit.id
s.unitLock.Unlock()
units := []*unitDB{} //per-hour units units := []*unitDB{} //per-hour units
firstID := lastID - s.limit + 1 firstID := curID - s.limit + 1
for i := firstID; i != lastID; i++ { for i := firstID; i != curID; i++ {
u := s.loadUnitFromDB(tx, i) u := s.loadUnitFromDB(tx, i)
if u == nil { if u == nil {
u = &unitDB{} u = &unitDB{}
@ -474,20 +487,13 @@ func (s *statsCtx) loadUnits(lastID uint32) []*unitDB {
_ = tx.Rollback() _ = tx.Rollback()
s.unitLock.Lock() units = append(units, curUnit)
cu := serialize(s.unit)
cuID := s.unit.id
s.unitLock.Unlock()
if cuID != lastID {
units = units[1:]
}
units = append(units, cu)
if len(units) != int(s.limit) { if len(units) != int(s.limit) {
log.Fatalf("len(units) != s.limit: %d %d", len(units), s.limit) log.Fatalf("len(units) != s.limit: %d %d", len(units), s.limit)
} }
return units return units, firstID
} }
/* Algorithm: /* Algorithm:
@ -521,9 +527,7 @@ func (s *statsCtx) loadUnits(lastID uint32) []*unitDB {
func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} { func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} {
d := map[string]interface{}{} d := map[string]interface{}{}
lastID := s.conf.UnitID() units, firstID := s.loadUnits()
firstID := lastID - s.limit + 1
units := s.loadUnits(lastID)
if units == nil { if units == nil {
return nil return nil
} }
@ -699,8 +703,7 @@ func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} {
} }
func (s *statsCtx) GetTopClientsIP(limit uint) []string { func (s *statsCtx) GetTopClientsIP(limit uint) []string {
lastID := s.conf.UnitID() units, _ := s.loadUnits()
units := s.loadUnits(lastID)
if units == nil { if units == nil {
return nil return nil
} }