2019-08-22 13:34:58 +00:00
package stats
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
2020-03-03 17:21:53 +00:00
"net"
2019-08-22 13:34:58 +00:00
"os"
"sort"
"sync"
"time"
2021-05-24 14:28:11 +00:00
"github.com/AdguardTeam/golibs/errors"
2019-08-22 13:34:58 +00:00
"github.com/AdguardTeam/golibs/log"
2020-04-05 15:21:26 +00:00
bolt "go.etcd.io/bbolt"
2019-08-22 13:34:58 +00:00
)
2021-02-11 14:55:37 +00:00
// TODO(a.garipov): Rewrite all of this. Add proper error handling and
// inspection. Improve logging. Decrease complexity.
2019-08-22 13:34:58 +00:00
const (
maxDomains = 100 // max number of top domains to store in file or return via Get()
maxClients = 100 // max number of top clients to store in file or return via Get()
)
// statsCtx - global context
type statsCtx struct {
2021-12-06 14:26:43 +00:00
// mu protects unit.
mu * sync . Mutex
// current is the actual statistics collection result.
current * unit
2019-12-11 09:38:58 +00:00
db * bolt . DB
conf * Config
2019-08-22 13:34:58 +00:00
}
// data for 1 time unit
type unit struct {
2019-09-10 14:59:10 +00:00
id uint32 // unit ID. Default: absolute hour since Jan 1, 1970
2019-08-22 13:34:58 +00:00
2019-09-10 14:59:10 +00:00
nTotal uint64 // total requests
nResult [ ] uint64 // number of requests per one result
timeSum uint64 // sum of processing time of all requests (usec)
2019-08-22 13:34:58 +00:00
// top:
2019-09-10 14:59:10 +00:00
domains map [ string ] uint64 // number of requests per domain
blockedDomains map [ string ] uint64 // number of blocked requests per domain
clients map [ string ] uint64 // number of requests per client
2019-08-22 13:34:58 +00:00
}
// name-count pair
type countPair struct {
Name string
2019-09-10 14:59:10 +00:00
Count uint64
2019-08-22 13:34:58 +00:00
}
// structure for storing data in file
type unitDB struct {
2019-09-10 14:59:10 +00:00
NTotal uint64
NResult [ ] uint64
2019-08-22 13:34:58 +00:00
Domains [ ] countPair
BlockedDomains [ ] countPair
Clients [ ] countPair
2019-09-10 14:59:10 +00:00
TimeAvg uint32 // usec
2019-08-22 13:34:58 +00:00
}
2022-01-20 14:19:09 +00:00
// withRecovered turns the value recovered from panic if any into an error and
// combines it with the one pointed by orig. orig must be non-nil.
func withRecovered ( orig * error ) {
p := recover ( )
if p == nil {
return
}
var err error
switch p := p . ( type ) {
case error :
err = fmt . Errorf ( "panic: %w" , p )
default :
err = fmt . Errorf ( "panic: recovered value of type %[1]T: %[1]v" , p )
}
* orig = errors . WithDeferred ( * orig , err )
}
// createObject creates s from conf and properly initializes it.
2021-02-11 14:55:37 +00:00
func createObject ( conf Config ) ( s * statsCtx , err error ) {
2022-01-20 14:19:09 +00:00
defer withRecovered ( & err )
2021-12-06 14:26:43 +00:00
s = & statsCtx {
mu : & sync . Mutex { } ,
}
2019-09-25 12:36:09 +00:00
if ! checkInterval ( conf . LimitDays ) {
conf . LimitDays = 1
}
2021-02-11 14:55:37 +00:00
2019-12-11 09:38:58 +00:00
s . conf = & Config { }
* s . conf = conf
s . conf . limit = conf . LimitDays * 24
2019-09-16 13:14:52 +00:00
if conf . UnitID == nil {
s . conf . UnitID = newUnitID
2019-08-22 13:34:58 +00:00
}
if ! s . dbOpen ( ) {
2019-09-06 12:42:21 +00:00
return nil , fmt . Errorf ( "open database" )
2019-08-22 13:34:58 +00:00
}
2019-09-16 13:14:52 +00:00
id := s . conf . UnitID ( )
2019-08-22 13:34:58 +00:00
tx := s . beginTxn ( true )
var udb * unitDB
if tx != nil {
log . Tracef ( "Deleting old units..." )
2019-12-11 09:38:58 +00:00
firstID := id - s . conf . limit - 1
2019-08-22 13:34:58 +00:00
unitDel := 0
2021-02-11 14:55:37 +00:00
2021-08-27 11:50:37 +00:00
err = tx . ForEach ( newBucketWalker ( tx , & unitDel , firstID ) )
2021-02-11 14:55:37 +00:00
if err != nil && ! errors . Is ( err , errStop ) {
log . Debug ( "stats: deleting units: %s" , err )
2019-08-22 13:34:58 +00:00
}
udb = s . loadUnitFromDB ( tx , id )
if unitDel != 0 {
s . commitTxn ( tx )
} else {
2021-02-11 14:55:37 +00:00
err = tx . Rollback ( )
if err != nil {
log . Debug ( "rolling back: %s" , err )
}
2019-08-22 13:34:58 +00:00
}
}
u := unit { }
s . initUnit ( & u , id )
if udb != nil {
deserialize ( & u , udb )
}
2021-12-06 14:26:43 +00:00
s . current = & u
2019-08-22 13:34:58 +00:00
2021-02-11 14:55:37 +00:00
log . Debug ( "stats: initialized" )
return s , nil
2019-08-22 13:34:58 +00:00
}
2021-08-27 11:50:37 +00:00
// TODO(a.garipov): See if this is actually necessary. Looks like a rather
// bizarre solution.
const errStop errors . Error = "stop iteration"
// newBucketWalker returns a new bucket walker that deletes old units. The
// integer that unitDelPtr points to is incremented for every successful
// deletion. If the bucket isn't deleted, f returns errStop.
func newBucketWalker (
tx * bolt . Tx ,
unitDelPtr * int ,
firstID uint32 ,
) ( f func ( name [ ] byte , b * bolt . Bucket ) ( err error ) ) {
return func ( name [ ] byte , _ * bolt . Bucket ) ( err error ) {
nameID , ok := unitNameToID ( name )
if ! ok || nameID < firstID {
err = tx . DeleteBucket ( name )
if err != nil {
log . Debug ( "stats: tx.DeleteBucket: %s" , err )
return nil
}
log . Debug ( "stats: deleted unit %d (name %x)" , nameID , name )
* unitDelPtr ++
return nil
}
return errStop
}
}
2020-01-16 11:25:40 +00:00
func ( s * statsCtx ) Start ( ) {
s . initWeb ( )
go s . periodicFlush ( )
}
2019-09-25 12:36:09 +00:00
func checkInterval ( days uint32 ) bool {
2021-06-17 16:44:46 +00:00
return days == 0 || days == 1 || days == 7 || days == 30 || days == 90
2019-09-25 12:36:09 +00:00
}
2019-08-22 13:34:58 +00:00
func ( s * statsCtx ) dbOpen ( ) bool {
var err error
log . Tracef ( "db.Open..." )
2020-11-06 09:15:08 +00:00
s . db , err = bolt . Open ( s . conf . Filename , 0 o644 , nil )
2019-08-22 13:34:58 +00:00
if err != nil {
2021-02-11 14:55:37 +00:00
log . Error ( "stats: open DB: %s: %s" , s . conf . Filename , err )
2019-12-09 11:13:39 +00:00
if err . Error ( ) == "invalid argument" {
2021-04-08 13:44:01 +00:00
log . Error ( "AdGuard Home cannot be initialized due to an incompatible file system.\nPlease read the explanation here: https://github.com/AdguardTeam/AdGuardHome/wiki/Getting-Started#limitations" )
2019-12-09 11:13:39 +00:00
}
2019-08-22 13:34:58 +00:00
return false
}
log . Tracef ( "db.Open" )
return true
}
// Atomically swap the currently active unit with a new value
// Return old value
2021-12-06 14:26:43 +00:00
func ( s * statsCtx ) swapUnit ( new * unit ) ( u * unit ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
u = s . current
s . current = new
2019-08-22 13:34:58 +00:00
return u
}
// Get unit ID for the current hour
2019-09-10 14:59:10 +00:00
func newUnitID ( ) uint32 {
return uint32 ( time . Now ( ) . Unix ( ) / ( 60 * 60 ) )
2019-08-22 13:34:58 +00:00
}
// Initialize a unit
2019-09-10 14:59:10 +00:00
func ( s * statsCtx ) initUnit ( u * unit , id uint32 ) {
2019-08-22 13:34:58 +00:00
u . id = id
2019-09-10 14:59:10 +00:00
u . nResult = make ( [ ] uint64 , rLast )
u . domains = make ( map [ string ] uint64 )
u . blockedDomains = make ( map [ string ] uint64 )
u . clients = make ( map [ string ] uint64 )
2019-08-22 13:34:58 +00:00
}
// Open a DB transaction
func ( s * statsCtx ) beginTxn ( wr bool ) * bolt . Tx {
db := s . db
if db == nil {
return nil
}
log . Tracef ( "db.Begin..." )
tx , err := db . Begin ( wr )
if err != nil {
log . Error ( "db.Begin: %s" , err )
return nil
}
log . Tracef ( "db.Begin" )
return tx
}
func ( s * statsCtx ) commitTxn ( tx * bolt . Tx ) {
err := tx . Commit ( )
if err != nil {
log . Debug ( "tx.Commit: %s" , err )
return
}
log . Tracef ( "tx.Commit" )
}
2021-08-27 11:50:37 +00:00
// bucketNameLen is the length of a bucket, a 64-bit unsigned integer.
//
// TODO(a.garipov): Find out why a 64-bit integer is used when IDs seem to
// always be 32 bits.
const bucketNameLen = 8
// idToUnitName converts a numerical ID into a database unit name.
func idToUnitName ( id uint32 ) ( name [ ] byte ) {
name = make ( [ ] byte , bucketNameLen )
binary . BigEndian . PutUint64 ( name , uint64 ( id ) )
2019-08-22 13:34:58 +00:00
2021-08-27 11:50:37 +00:00
return name
2019-08-22 13:34:58 +00:00
}
2021-08-27 11:50:37 +00:00
// unitNameToID converts a database unit name into a numerical ID. ok is false
// if name is not a valid database unit name.
func unitNameToID ( name [ ] byte ) ( id uint32 , ok bool ) {
if len ( name ) < bucketNameLen {
return 0 , false
}
return uint32 ( binary . BigEndian . Uint64 ( name ) ) , true
2019-08-22 13:34:58 +00:00
}
2021-12-06 14:26:43 +00:00
func ( s * statsCtx ) ongoing ( ) ( u * unit ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
return s . current
}
2019-08-22 13:34:58 +00:00
// Flush the current unit to DB and delete an old unit when a new hour is started
2019-11-11 13:18:20 +00:00
// If a unit must be flushed:
// . lock DB
// . atomically set a new empty unit as the current one and get the old unit
// This is important to do it inside DB lock, so the reader won't get inconsistent results.
// . write the unit to DB
// . remove the stale unit from DB
// . unlock DB
2019-08-22 13:34:58 +00:00
func ( s * statsCtx ) periodicFlush ( ) {
for {
2021-12-06 14:26:43 +00:00
ptr := s . ongoing ( )
2019-08-22 13:34:58 +00:00
if ptr == nil {
break
}
2019-09-16 13:14:52 +00:00
id := s . conf . UnitID ( )
2021-06-17 16:44:46 +00:00
if ptr . id == id || s . conf . limit == 0 {
2019-08-22 13:34:58 +00:00
time . Sleep ( time . Second )
2021-01-27 15:32:13 +00:00
2019-08-22 13:34:58 +00:00
continue
}
2019-11-11 13:18:20 +00:00
tx := s . beginTxn ( true )
2019-08-22 13:34:58 +00:00
nu := unit { }
s . initUnit ( & nu , id )
u := s . swapUnit ( & nu )
udb := serialize ( u )
if tx == nil {
continue
}
2021-01-27 15:32:13 +00:00
2019-08-22 13:34:58 +00:00
ok1 := s . flushUnitToDB ( tx , u . id , udb )
2019-12-11 09:38:58 +00:00
ok2 := s . deleteUnit ( tx , id - s . conf . limit )
2019-08-22 13:34:58 +00:00
if ok1 || ok2 {
s . commitTxn ( tx )
} else {
_ = tx . Rollback ( )
}
}
2021-01-27 15:32:13 +00:00
2019-08-22 13:34:58 +00:00
log . Tracef ( "periodicFlush() exited" )
}
// Delete unit's data from file
2019-09-10 14:59:10 +00:00
func ( s * statsCtx ) deleteUnit ( tx * bolt . Tx , id uint32 ) bool {
2021-08-27 11:50:37 +00:00
err := tx . DeleteBucket ( idToUnitName ( id ) )
2019-08-22 13:34:58 +00:00
if err != nil {
2021-02-11 14:55:37 +00:00
log . Tracef ( "stats: bolt DeleteBucket: %s" , err )
2019-08-22 13:34:58 +00:00
return false
}
2021-02-11 14:55:37 +00:00
log . Debug ( "stats: deleted unit %d" , id )
2019-08-22 13:34:58 +00:00
return true
}
2021-01-27 15:32:13 +00:00
func convertMapToSlice ( m map [ string ] uint64 , max int ) [ ] countPair {
2019-08-22 13:34:58 +00:00
a := [ ] countPair { }
for k , v := range m {
pair := countPair { }
pair . Name = k
2019-09-10 14:59:10 +00:00
pair . Count = v
2019-08-22 13:34:58 +00:00
a = append ( a , pair )
}
less := func ( i , j int ) bool {
2020-11-06 09:15:08 +00:00
return a [ j ] . Count < a [ i ] . Count
2019-08-22 13:34:58 +00:00
}
sort . Slice ( a , less )
if max > len ( a ) {
max = len ( a )
}
return a [ : max ]
}
2021-01-27 15:32:13 +00:00
func convertSliceToMap ( a [ ] countPair ) map [ string ] uint64 {
2019-09-10 14:59:10 +00:00
m := map [ string ] uint64 { }
2019-08-22 13:34:58 +00:00
for _ , it := range a {
2019-09-10 14:59:10 +00:00
m [ it . Name ] = it . Count
2019-08-22 13:34:58 +00:00
}
return m
}
func serialize ( u * unit ) * unitDB {
udb := unitDB { }
2019-09-10 14:59:10 +00:00
udb . NTotal = u . nTotal
2020-11-06 09:15:08 +00:00
udb . NResult = append ( udb . NResult , u . nResult ... )
2019-08-22 13:34:58 +00:00
if u . nTotal != 0 {
2019-09-10 14:59:10 +00:00
udb . TimeAvg = uint32 ( u . timeSum / u . nTotal )
2019-08-22 13:34:58 +00:00
}
2020-11-06 09:15:08 +00:00
2021-01-27 15:32:13 +00:00
udb . Domains = convertMapToSlice ( u . domains , maxDomains )
udb . BlockedDomains = convertMapToSlice ( u . blockedDomains , maxDomains )
udb . Clients = convertMapToSlice ( u . clients , maxClients )
2020-11-06 09:15:08 +00:00
2019-08-22 13:34:58 +00:00
return & udb
}
func deserialize ( u * unit , udb * unitDB ) {
2019-09-10 14:59:10 +00:00
u . nTotal = udb . NTotal
2019-09-10 15:04:43 +00:00
n := len ( udb . NResult )
if n < len ( u . nResult ) {
n = len ( u . nResult ) // n = min(len(udb.NResult), len(u.nResult))
}
2019-09-13 09:55:01 +00:00
for i := 1 ; i < n ; i ++ {
2019-09-10 15:04:43 +00:00
u . nResult [ i ] = udb . NResult [ i ]
2019-08-22 13:34:58 +00:00
}
2019-09-10 15:04:43 +00:00
2021-01-27 15:32:13 +00:00
u . domains = convertSliceToMap ( udb . Domains )
u . blockedDomains = convertSliceToMap ( udb . BlockedDomains )
u . clients = convertSliceToMap ( udb . Clients )
2019-09-10 14:59:10 +00:00
u . timeSum = uint64 ( udb . TimeAvg ) * u . nTotal
2019-08-22 13:34:58 +00:00
}
2019-09-10 14:59:10 +00:00
func ( s * statsCtx ) flushUnitToDB ( tx * bolt . Tx , id uint32 , udb * unitDB ) bool {
2019-08-22 13:34:58 +00:00
log . Tracef ( "Flushing unit %d" , id )
2021-08-27 11:50:37 +00:00
bkt , err := tx . CreateBucketIfNotExists ( idToUnitName ( id ) )
2019-08-22 13:34:58 +00:00
if err != nil {
log . Error ( "tx.CreateBucketIfNotExists: %s" , err )
return false
}
var buf bytes . Buffer
enc := gob . NewEncoder ( & buf )
err = enc . Encode ( udb )
if err != nil {
log . Error ( "gob.Encode: %s" , err )
return false
}
err = bkt . Put ( [ ] byte { 0 } , buf . Bytes ( ) )
if err != nil {
log . Error ( "bkt.Put: %s" , err )
return false
}
return true
}
2019-09-10 14:59:10 +00:00
func ( s * statsCtx ) loadUnitFromDB ( tx * bolt . Tx , id uint32 ) * unitDB {
2021-08-27 11:50:37 +00:00
bkt := tx . Bucket ( idToUnitName ( id ) )
2019-08-22 13:34:58 +00:00
if bkt == nil {
return nil
}
2019-10-23 13:48:00 +00:00
// log.Tracef("Loading unit %d", id)
2019-08-22 13:34:58 +00:00
var buf bytes . Buffer
buf . Write ( bkt . Get ( [ ] byte { 0 } ) )
dec := gob . NewDecoder ( & buf )
udb := unitDB { }
err := dec . Decode ( & udb )
if err != nil {
log . Error ( "gob Decode: %s" , err )
return nil
}
return & udb
}
2021-01-27 15:32:13 +00:00
func convertTopSlice ( a [ ] countPair ) [ ] map [ string ] uint64 {
2019-09-10 14:59:10 +00:00
m := [ ] map [ string ] uint64 { }
2019-08-22 13:34:58 +00:00
for _ , it := range a {
2019-09-10 14:59:10 +00:00
ent := map [ string ] uint64 { }
2019-08-22 13:34:58 +00:00
ent [ it . Name ] = it . Count
m = append ( m , ent )
}
return m
}
2019-09-25 12:36:09 +00:00
func ( s * statsCtx ) setLimit ( limitDays int ) {
2021-06-17 16:44:46 +00:00
s . conf . limit = uint32 ( limitDays ) * 24
if limitDays == 0 {
s . clear ( )
}
2021-02-11 14:55:37 +00:00
log . Debug ( "stats: set limit: %d" , limitDays )
2019-09-25 12:36:09 +00:00
}
func ( s * statsCtx ) WriteDiskConfig ( dc * DiskConfig ) {
2019-12-11 09:38:58 +00:00
dc . Interval = s . conf . limit / 24
2019-08-22 13:34:58 +00:00
}
func ( s * statsCtx ) Close ( ) {
u := s . swapUnit ( nil )
udb := serialize ( u )
tx := s . beginTxn ( true )
if tx != nil {
if s . flushUnitToDB ( tx , u . id , udb ) {
s . commitTxn ( tx )
} else {
_ = tx . Rollback ( )
}
}
if s . db != nil {
log . Tracef ( "db.Close..." )
_ = s . db . Close ( )
log . Tracef ( "db.Close" )
}
2021-02-11 14:55:37 +00:00
log . Debug ( "stats: closed" )
2019-08-22 13:34:58 +00:00
}
2019-09-25 12:36:09 +00:00
// Reset counters and clear database
func ( s * statsCtx ) clear ( ) {
2019-08-22 13:34:58 +00:00
tx := s . beginTxn ( true )
if tx != nil {
db := s . db
s . db = nil
_ = tx . Rollback ( )
// the active transactions can continue using database,
// but no new transactions will be opened
_ = db . Close ( )
log . Tracef ( "db.Close" )
// all active transactions are now closed
}
u := unit { }
2019-09-16 13:14:52 +00:00
s . initUnit ( & u , s . conf . UnitID ( ) )
2019-08-22 13:34:58 +00:00
_ = s . swapUnit ( & u )
2019-09-16 13:14:52 +00:00
err := os . Remove ( s . conf . Filename )
2019-08-22 13:34:58 +00:00
if err != nil {
log . Error ( "os.Remove: %s" , err )
}
_ = s . dbOpen ( )
2021-02-11 14:55:37 +00:00
log . Debug ( "stats: cleared" )
2019-08-22 13:34:58 +00:00
}
func ( s * statsCtx ) Update ( e Entry ) {
2021-06-17 16:44:46 +00:00
if s . conf . limit == 0 {
return
}
2019-08-22 13:34:58 +00:00
if e . Result == 0 ||
2019-09-16 12:54:41 +00:00
e . Result >= rLast ||
2021-01-27 15:32:13 +00:00
e . Domain == "" ||
e . Client == "" {
2019-08-22 13:34:58 +00:00
return
}
2021-01-27 15:32:13 +00:00
clientID := e . Client
if ip := net . ParseIP ( clientID ) ; ip != nil {
clientID = ip . String ( )
}
2019-08-22 13:34:58 +00:00
2021-12-06 14:26:43 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
2021-01-27 15:32:13 +00:00
2021-12-06 14:26:43 +00:00
u := s . current
2019-08-22 13:34:58 +00:00
u . nResult [ e . Result ] ++
if e . Result == RNotFiltered {
u . domains [ e . Domain ] ++
} else {
u . blockedDomains [ e . Domain ] ++
}
2021-01-27 15:32:13 +00:00
u . clients [ clientID ] ++
2019-09-10 14:59:10 +00:00
u . timeSum += uint64 ( e . Time )
2019-08-22 13:34:58 +00:00
u . nTotal ++
}
2019-12-11 09:38:58 +00:00
func ( s * statsCtx ) loadUnits ( limit uint32 ) ( [ ] * unitDB , uint32 ) {
2019-10-07 12:55:09 +00:00
tx := s . beginTxn ( false )
if tx == nil {
2019-11-11 13:18:20 +00:00
return nil , 0
2019-10-07 12:55:09 +00:00
}
2021-12-06 14:26:43 +00:00
cur := s . ongoing ( )
curID := cur . id
2019-11-11 13:18:20 +00:00
2020-11-06 09:15:08 +00:00
// Per-hour units.
units := [ ] * unitDB { }
2019-12-11 09:38:58 +00:00
firstID := curID - limit + 1
2019-11-11 13:18:20 +00:00
for i := firstID ; i != curID ; i ++ {
2019-10-07 12:55:09 +00:00
u := s . loadUnitFromDB ( tx , i )
if u == nil {
u = & unitDB { }
u . NResult = make ( [ ] uint64 , rLast )
}
units = append ( units , u )
}
_ = tx . Rollback ( )
2021-12-06 14:26:43 +00:00
units = append ( units , serialize ( cur ) )
2019-10-07 12:55:09 +00:00
2019-12-11 09:38:58 +00:00
if len ( units ) != int ( limit ) {
log . Fatalf ( "len(units) != limit: %d %d" , len ( units ) , limit )
2019-10-07 12:55:09 +00:00
}
2019-11-11 13:18:20 +00:00
return units , firstID
2019-10-07 12:55:09 +00:00
}
2021-02-09 16:38:31 +00:00
// numsGetter is a signature for statsCollector argument.
type numsGetter func ( u * unitDB ) ( num uint64 )
// statsCollector collects statisctics for the given *unitDB slice by specified
// timeUnit using ng to retrieve data.
func statsCollector ( units [ ] * unitDB , firstID uint32 , timeUnit TimeUnit , ng numsGetter ) ( nums [ ] uint64 ) {
if timeUnit == Hours {
for _ , u := range units {
nums = append ( nums , ng ( u ) )
}
} else {
// Per time unit counters: 720 hours may span 31 days, so we
// skip data for the first day in this case.
// align_ceil(24)
firstDayID := ( firstID + 24 - 1 ) / 24 * 24
var sum uint64
id := firstDayID
nextDayID := firstDayID + 24
for i := int ( firstDayID - firstID ) ; i != len ( units ) ; i ++ {
sum += ng ( units [ i ] )
if id == nextDayID {
nums = append ( nums , sum )
sum = 0
nextDayID += 24
}
id ++
}
if id <= nextDayID {
nums = append ( nums , sum )
}
}
return nums
}
// pairsGetter is a signature for topsCollector argument.
type pairsGetter func ( u * unitDB ) ( pairs [ ] countPair )
// topsCollector collects statistics about highest values fro the given *unitDB
// slice using pg to retrieve data.
func topsCollector ( units [ ] * unitDB , max int , pg pairsGetter ) [ ] map [ string ] uint64 {
m := map [ string ] uint64 { }
for _ , u := range units {
for _ , it := range pg ( u ) {
m [ it . Name ] += it . Count
}
}
a2 := convertMapToSlice ( m , max )
return convertTopSlice ( a2 )
}
2019-08-22 13:34:58 +00:00
/ * Algorithm :
. Prepare array of N units , where N is the value of "limit" configuration setting
. Load data for the most recent units from file
If a unit with required ID doesn ' t exist , just add an empty unit
. Get data for the current unit
. Process data from the units and prepare an output map object :
* per time unit counters :
* DNS - queries / time - unit
* blocked / time - unit
* safebrowsing - blocked / time - unit
* parental - blocked / time - unit
If time - unit is an hour , just add values from each unit to an array .
If time - unit is a day , aggregate per - hour data into days .
* top counters :
* queries / domain
* queries / blocked - domain
* queries / client
To get these values we first sum up data for all units into a single map .
Then we get the pairs with the highest numbers ( the values are sorted in descending order )
* total counters :
* DNS - queries
* blocked
* safebrowsing - blocked
* safesearch - blocked
* parental - blocked
These values are just the sum of data for all units .
* /
2021-01-21 16:55:41 +00:00
func ( s * statsCtx ) getData ( ) ( statsResponse , bool ) {
2019-12-11 09:38:58 +00:00
limit := s . conf . limit
timeUnit := Hours
if limit / 24 > 7 {
timeUnit = Days
}
2019-08-22 13:34:58 +00:00
2019-12-11 09:38:58 +00:00
units , firstID := s . loadUnits ( limit )
2019-10-07 12:55:09 +00:00
if units == nil {
2021-01-21 16:55:41 +00:00
return statsResponse { } , false
2019-08-22 13:34:58 +00:00
}
2021-02-09 16:38:31 +00:00
dnsQueries := statsCollector ( units , firstID , timeUnit , func ( u * unitDB ) ( num uint64 ) { return u . NTotal } )
2020-11-20 14:32:41 +00:00
if timeUnit != Hours && len ( dnsQueries ) != int ( limit / 24 ) {
log . Fatalf ( "len(dnsQueries) != limit: %d %d" , len ( dnsQueries ) , limit )
2019-08-22 13:34:58 +00:00
}
2021-01-21 16:55:41 +00:00
data := statsResponse {
DNSQueries : dnsQueries ,
2021-02-09 16:38:31 +00:00
BlockedFiltering : statsCollector ( units , firstID , timeUnit , func ( u * unitDB ) ( num uint64 ) { return u . NResult [ RFiltered ] } ) ,
ReplacedSafebrowsing : statsCollector ( units , firstID , timeUnit , func ( u * unitDB ) ( num uint64 ) { return u . NResult [ RSafeBrowsing ] } ) ,
ReplacedParental : statsCollector ( units , firstID , timeUnit , func ( u * unitDB ) ( num uint64 ) { return u . NResult [ RParental ] } ) ,
TopQueried : topsCollector ( units , maxDomains , func ( u * unitDB ) ( pairs [ ] countPair ) { return u . Domains } ) ,
TopBlocked : topsCollector ( units , maxDomains , func ( u * unitDB ) ( pairs [ ] countPair ) { return u . BlockedDomains } ) ,
TopClients : topsCollector ( units , maxClients , func ( u * unitDB ) ( pairs [ ] countPair ) { return u . Clients } ) ,
2019-08-22 13:34:58 +00:00
}
2021-02-09 16:38:31 +00:00
// Total counters:
sum := unitDB {
NResult : make ( [ ] uint64 , rLast ) ,
}
2019-08-22 13:34:58 +00:00
timeN := 0
for _ , u := range units {
sum . NTotal += u . NTotal
sum . TimeAvg += u . TimeAvg
if u . TimeAvg != 0 {
timeN ++
}
sum . NResult [ RFiltered ] += u . NResult [ RFiltered ]
sum . NResult [ RSafeBrowsing ] += u . NResult [ RSafeBrowsing ]
sum . NResult [ RSafeSearch ] += u . NResult [ RSafeSearch ]
sum . NResult [ RParental ] += u . NResult [ RParental ]
}
2021-01-21 16:55:41 +00:00
data . NumDNSQueries = sum . NTotal
data . NumBlockedFiltering = sum . NResult [ RFiltered ]
data . NumReplacedSafebrowsing = sum . NResult [ RSafeBrowsing ]
data . NumReplacedSafesearch = sum . NResult [ RSafeSearch ]
data . NumReplacedParental = sum . NResult [ RParental ]
2019-08-22 13:34:58 +00:00
if timeN != 0 {
2021-01-21 16:55:41 +00:00
data . AvgProcessingTime = float64 ( sum . TimeAvg / uint32 ( timeN ) ) / 1000000
2019-08-22 13:34:58 +00:00
}
2021-01-21 16:55:41 +00:00
data . TimeUnits = "hours"
2019-08-22 13:34:58 +00:00
if timeUnit == Days {
2021-01-21 16:55:41 +00:00
data . TimeUnits = "days"
2019-08-22 13:34:58 +00:00
}
2021-01-21 16:55:41 +00:00
return data , true
2019-08-22 13:34:58 +00:00
}
2019-10-07 12:56:33 +00:00
2021-01-20 14:27:53 +00:00
func ( s * statsCtx ) GetTopClientsIP ( maxCount uint ) [ ] net . IP {
2021-06-17 16:44:46 +00:00
if s . conf . limit == 0 {
return nil
}
2019-12-11 09:38:58 +00:00
units , _ := s . loadUnits ( s . conf . limit )
2019-10-07 12:56:33 +00:00
if units == nil {
return nil
}
// top clients
m := map [ string ] uint64 { }
for _ , u := range units {
for _ , it := range u . Clients {
m [ it . Name ] += it . Count
}
}
2021-01-27 15:32:13 +00:00
a := convertMapToSlice ( m , int ( maxCount ) )
2021-01-20 14:27:53 +00:00
d := [ ] net . IP { }
2019-10-07 12:56:33 +00:00
for _ , it := range a {
2021-06-29 12:53:28 +00:00
ip := net . ParseIP ( it . Name )
if ip != nil {
d = append ( d , ip )
}
2019-10-07 12:56:33 +00:00
}
return d
}