coredns querylog -- since we read entire querylog json once at startup, fill querylog cache from it and then rotate it on each incoming DNS query
This commit is contained in:
parent
a5d1053520
commit
5ae2a32d6e
@ -25,16 +25,15 @@ const (
|
|||||||
queryLogRotationPeriod = time.Hour * 24 // rotate the log every 24 hours
|
queryLogRotationPeriod = time.Hour * 24 // rotate the log every 24 hours
|
||||||
queryLogFileName = "querylog.json" // .gz added during compression
|
queryLogFileName = "querylog.json" // .gz added during compression
|
||||||
queryLogSize = 5000 // maximum API response for /querylog
|
queryLogSize = 5000 // maximum API response for /querylog
|
||||||
queryLogCacheTime = time.Minute // if requested more often than this, give out cached response
|
|
||||||
queryLogTopSize = 500 // Keep in memory only top N values
|
queryLogTopSize = 500 // Keep in memory only top N values
|
||||||
queryLogAPIPort = "8618" // 8618 is sha512sum of "querylog" then each byte summed
|
queryLogAPIPort = "8618" // 8618 is sha512sum of "querylog" then each byte summed
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
logBufferLock sync.RWMutex
|
logBufferLock sync.RWMutex
|
||||||
logBuffer []logEntry
|
logBuffer []*logEntry
|
||||||
|
|
||||||
queryLogCache []logEntry
|
queryLogCache []*logEntry
|
||||||
queryLogLock sync.RWMutex
|
queryLogLock sync.RWMutex
|
||||||
queryLogTime time.Time
|
queryLogTime time.Time
|
||||||
)
|
)
|
||||||
@ -77,15 +76,22 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela
|
|||||||
Elapsed: elapsed,
|
Elapsed: elapsed,
|
||||||
IP: ip,
|
IP: ip,
|
||||||
}
|
}
|
||||||
var flushBuffer []logEntry
|
var flushBuffer []*logEntry
|
||||||
|
|
||||||
logBufferLock.Lock()
|
logBufferLock.Lock()
|
||||||
logBuffer = append(logBuffer, entry)
|
logBuffer = append(logBuffer, &entry)
|
||||||
if len(logBuffer) >= logBufferCap {
|
if len(logBuffer) >= logBufferCap {
|
||||||
flushBuffer = logBuffer
|
flushBuffer = logBuffer
|
||||||
logBuffer = nil
|
logBuffer = nil
|
||||||
}
|
}
|
||||||
logBufferLock.Unlock()
|
logBufferLock.Unlock()
|
||||||
|
queryLogLock.Lock()
|
||||||
|
queryLogCache = append(queryLogCache, &entry)
|
||||||
|
if len(queryLogCache) > queryLogSize {
|
||||||
|
toremove := len(queryLogCache) - queryLogSize
|
||||||
|
queryLogCache = queryLogCache[toremove:]
|
||||||
|
}
|
||||||
|
queryLogLock.Unlock()
|
||||||
|
|
||||||
// add it to running top
|
// add it to running top
|
||||||
err = runningTop.addEntry(&entry, question, now)
|
err = runningTop.addEntry(&entry, question, now)
|
||||||
@ -103,26 +109,14 @@ func logRequest(question *dns.Msg, answer *dns.Msg, result dnsfilter.Result, ela
|
|||||||
}
|
}
|
||||||
|
|
||||||
func handleQueryLog(w http.ResponseWriter, r *http.Request) {
|
func handleQueryLog(w http.ResponseWriter, r *http.Request) {
|
||||||
now := time.Now()
|
|
||||||
|
|
||||||
queryLogLock.RLock()
|
queryLogLock.RLock()
|
||||||
values := queryLogCache
|
values := make([]*logEntry, len(queryLogCache))
|
||||||
needRefresh := now.Sub(queryLogTime) >= queryLogCacheTime
|
copy(values, queryLogCache)
|
||||||
queryLogLock.RUnlock()
|
queryLogLock.RUnlock()
|
||||||
|
|
||||||
if needRefresh {
|
// reverse it so that newest is first
|
||||||
// need to get fresh data
|
for left, right := 0, len(values)-1; left < right; left, right = left+1, right-1 {
|
||||||
logBufferLock.RLock()
|
values[left], values[right] = values[right], values[left]
|
||||||
values = logBuffer
|
|
||||||
logBufferLock.RUnlock()
|
|
||||||
|
|
||||||
if len(values) < queryLogSize {
|
|
||||||
values = appendFromLogFile(values, queryLogSize, queryLogTimeLimit)
|
|
||||||
}
|
|
||||||
queryLogLock.Lock()
|
|
||||||
queryLogCache = values
|
|
||||||
queryLogTime = now
|
|
||||||
queryLogLock.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var data = []map[string]interface{}{}
|
var data = []map[string]interface{}{}
|
||||||
|
@ -19,7 +19,7 @@ var (
|
|||||||
|
|
||||||
const enableGzip = false
|
const enableGzip = false
|
||||||
|
|
||||||
func flushToFile(buffer []logEntry) error {
|
func flushToFile(buffer []*logEntry) error {
|
||||||
if len(buffer) == 0 {
|
if len(buffer) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -90,7 +90,7 @@ func flushToFile(buffer []logEntry) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkBuffer(buffer []logEntry, b bytes.Buffer) error {
|
func checkBuffer(buffer []*logEntry, b bytes.Buffer) error {
|
||||||
l := len(buffer)
|
l := len(buffer)
|
||||||
d := json.NewDecoder(&b)
|
d := json.NewDecoder(&b)
|
||||||
|
|
||||||
@ -237,11 +237,11 @@ func genericLoader(onEntry func(entry *logEntry) error, needMore func() bool, ti
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func appendFromLogFile(values []logEntry, maxLen int, timeWindow time.Duration) []logEntry {
|
func appendFromLogFile(values []*logEntry, maxLen int, timeWindow time.Duration) []*logEntry {
|
||||||
a := []logEntry{}
|
a := []*logEntry{}
|
||||||
|
|
||||||
onEntry := func(entry *logEntry) error {
|
onEntry := func(entry *logEntry) error {
|
||||||
a = append(a, *entry)
|
a = append(a, entry)
|
||||||
if len(a) > maxLen {
|
if len(a) > maxLen {
|
||||||
toskip := len(a) - maxLen
|
toskip := len(a) - maxLen
|
||||||
a = a[toskip:]
|
a = a[toskip:]
|
||||||
|
@ -223,6 +223,14 @@ func fillStatsFromQueryLog() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
queryLogLock.Lock()
|
||||||
|
queryLogCache = append(queryLogCache, entry)
|
||||||
|
if len(queryLogCache) > queryLogSize {
|
||||||
|
toremove := len(queryLogCache) - queryLogSize
|
||||||
|
queryLogCache = queryLogCache[toremove:]
|
||||||
|
}
|
||||||
|
queryLogLock.Unlock()
|
||||||
|
|
||||||
requests.IncWithTime(entry.Time)
|
requests.IncWithTime(entry.Time)
|
||||||
if entry.Result.IsFiltered {
|
if entry.Result.IsFiltered {
|
||||||
filtered.IncWithTime(entry.Time)
|
filtered.IncWithTime(entry.Time)
|
||||||
|
Loading…
Reference in New Issue
Block a user