- Stats: fix crash

Close #1170

* commit 'f64868472aae50ed8203ef4d3ec9de7a7cf96fd9':
  - stats: fix read-write race
  * minor
This commit is contained in:
Simon Zolin 2019-11-11 18:04:01 +03:00
commit ec5c5e8109
2 changed files with 22 additions and 21 deletions

View File

@ -6,7 +6,6 @@ import (
"os" "os"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -105,7 +104,6 @@ func TestLargeNumbers(t *testing.T) {
for h := 0; h != 12; h++ { for h := 0; h != 12; h++ {
if h != 0 { if h != 0 {
atomic.AddInt32(&hour, 1) atomic.AddInt32(&hour, 1)
time.Sleep(1500 * time.Millisecond)
} }
for i := 0; i != n; i++ { for i := 0; i != n; i++ {
e.Domain = fmt.Sprintf("domain%d", i) e.Domain = fmt.Sprintf("domain%d", i)

View File

@ -207,6 +207,13 @@ func btoi(b []byte) uint64 {
} }
// Flush the current unit to DB and delete an old unit when a new hour is started // Flush the current unit to DB and delete an old unit when a new hour is started
// If a unit must be flushed:
// . lock DB
// . atomically set a new empty unit as the current one and get the old unit
// This is important to do it inside DB lock, so the reader won't get inconsistent results.
// . write the unit to DB
// . remove the stale unit from DB
// . unlock DB
func (s *statsCtx) periodicFlush() { func (s *statsCtx) periodicFlush() {
for { for {
s.unitLock.Lock() s.unitLock.Lock()
@ -222,12 +229,13 @@ func (s *statsCtx) periodicFlush() {
continue continue
} }
tx := s.beginTxn(true)
nu := unit{} nu := unit{}
s.initUnit(&nu, id) s.initUnit(&nu, id)
u := s.swapUnit(&nu) u := s.swapUnit(&nu)
udb := serialize(u) udb := serialize(u)
tx := s.beginTxn(true)
if tx == nil { if tx == nil {
continue continue
} }
@ -455,15 +463,20 @@ func (s *statsCtx) Update(e Entry) {
s.unitLock.Unlock() s.unitLock.Unlock()
} }
func (s *statsCtx) loadUnits(lastID uint32) []*unitDB { func (s *statsCtx) loadUnits() ([]*unitDB, uint32) {
tx := s.beginTxn(false) tx := s.beginTxn(false)
if tx == nil { if tx == nil {
return nil return nil, 0
} }
s.unitLock.Lock()
curUnit := serialize(s.unit)
curID := s.unit.id
s.unitLock.Unlock()
units := []*unitDB{} //per-hour units units := []*unitDB{} //per-hour units
firstID := lastID - s.limit + 1 firstID := curID - s.limit + 1
for i := firstID; i != lastID; i++ { for i := firstID; i != curID; i++ {
u := s.loadUnitFromDB(tx, i) u := s.loadUnitFromDB(tx, i)
if u == nil { if u == nil {
u = &unitDB{} u = &unitDB{}
@ -474,20 +487,13 @@ func (s *statsCtx) loadUnits(lastID uint32) []*unitDB {
_ = tx.Rollback() _ = tx.Rollback()
s.unitLock.Lock() units = append(units, curUnit)
cu := serialize(s.unit)
cuID := s.unit.id
s.unitLock.Unlock()
if cuID != lastID {
units = units[1:]
}
units = append(units, cu)
if len(units) != int(s.limit) { if len(units) != int(s.limit) {
log.Fatalf("len(units) != s.limit: %d %d", len(units), s.limit) log.Fatalf("len(units) != s.limit: %d %d", len(units), s.limit)
} }
return units return units, firstID
} }
/* Algorithm: /* Algorithm:
@ -521,9 +527,7 @@ func (s *statsCtx) loadUnits(lastID uint32) []*unitDB {
func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} { func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} {
d := map[string]interface{}{} d := map[string]interface{}{}
lastID := s.conf.UnitID() units, firstID := s.loadUnits()
firstID := lastID - s.limit + 1
units := s.loadUnits(lastID)
if units == nil { if units == nil {
return nil return nil
} }
@ -699,8 +703,7 @@ func (s *statsCtx) getData(timeUnit TimeUnit) map[string]interface{} {
} }
func (s *statsCtx) GetTopClientsIP(limit uint) []string { func (s *statsCtx) GetTopClientsIP(limit uint) []string {
lastID := s.conf.UnitID() units, _ := s.loadUnits()
units := s.loadUnits(lastID)
if units == nil { if units == nil {
return nil return nil
} }