* querylog: major refactor: change on-disk format and API

speed up decoding
speed up search
compatible with previous format (when not searching)
This commit is contained in:
Simon Zolin 2019-10-24 20:00:58 +03:00
parent a65f983aac
commit 2f5d6593f2
5 changed files with 387 additions and 125 deletions

View File

@ -1012,17 +1012,20 @@ Response:
When a new DNS request is received and processed, we store information about this event in "query log". It is a file on disk in JSON format: When a new DNS request is received and processed, we store information about this event in "query log". It is a file on disk in JSON format:
{ {
"Question":"..."," "IP":"127.0.0.1", // client IP
Answer":"...", "T":"...", // response time
"QH":"...", // target host name without the last dot
"QT":"...", // question type
"QC":"...", // question class
"Answer":"...",
"Result":{ "Result":{
"IsFiltered":true, "IsFiltered":true,
"Reason":3, "Reason":3,
"Rule":"...", "Rule":"...",
"FilterID":1 "FilterID":1
}, },
"Time":"...",
"Elapsed":12345, "Elapsed":12345,
"IP":"127.0.0.1" "Upstream":"...",
} }
@ -1052,7 +1055,7 @@ Request:
&filter_question_type=A | AAAA &filter_question_type=A | AAAA
&filter_response_status= | filtered &filter_response_status= | filtered
If `older_than` value is set, server returns the next chunk of entries that are older than this time stamp. This setting is used for paging. UI sets the empty value on the first request and gets the latest log entries. To get the older entries, UI sets this value to the timestamp of the last (the oldest) entry from the previous response from Server. `older_than` setting is used for paging. UI uses an empty value for `older_than` on the first request and gets the latest log entries. To get the older entries, UI sets `older_than` to the `oldest` value from the server's response.
If "filter" settings are set, server returns only entries that match the specified request. If "filter" settings are set, server returns only entries that match the specified request.
@ -1060,7 +1063,9 @@ For `filter.domain` and `filter.client` the server matches substrings by default
Response: Response:
[ {
"oldest":"2006-01-02T15:04:05.999999999Z07:00"
"data":[
{ {
"answer":[ "answer":[
{ {
@ -1085,6 +1090,7 @@ Response:
} }
... ...
] ]
}
The most recent entries are at the top of list. The most recent entries are at the top of list.

View File

@ -20,8 +20,8 @@ const (
queryLogFileName = "querylog.json" // .gz added during compression queryLogFileName = "querylog.json" // .gz added during compression
getDataLimit = 500 // GetData(): maximum log entries to return getDataLimit = 500 // GetData(): maximum log entries to return
// maximum data chunks to parse when filtering entries // maximum entries to parse when searching
maxFilteringChunks = 10 maxSearchEntries = 50000
) )
// queryLog is a structure that writes and reads the DNS query log // queryLog is a structure that writes and reads the DNS query log
@ -94,12 +94,16 @@ func (l *queryLog) clear() {
} }
type logEntry struct { type logEntry struct {
Question []byte IP string `json:"IP"`
Time time.Time `json:"T"`
QHost string `json:"QH"`
QType string `json:"QT"`
QClass string `json:"QC"`
Answer []byte `json:",omitempty"` // sometimes empty answers happen like binerdunt.top or rev2.globalrootservers.net Answer []byte `json:",omitempty"` // sometimes empty answers happen like binerdunt.top or rev2.globalrootservers.net
Result dnsfilter.Result Result dnsfilter.Result
Time time.Time
Elapsed time.Duration Elapsed time.Duration
IP string
Upstream string `json:",omitempty"` // if empty, means it was cached Upstream string `json:",omitempty"` // if empty, means it was cached
} }
@ -119,21 +123,15 @@ func (l *queryLog) Add(question *dns.Msg, answer *dns.Msg, result *dnsfilter.Res
return return
} }
var q []byte if question == nil || len(question.Question) != 1 || len(question.Question[0].Name) == 0 ||
ip == nil {
return
}
var a []byte var a []byte
var err error var err error
ip := getIPString(addr) ip := getIPString(addr)
if question == nil {
return
}
q, err = question.Pack()
if err != nil {
log.Printf("failed to pack question for querylog: %s", err)
return
}
if answer != nil { if answer != nil {
a, err = answer.Pack() a, err = answer.Pack()
if err != nil { if err != nil {
@ -148,14 +146,18 @@ func (l *queryLog) Add(question *dns.Msg, answer *dns.Msg, result *dnsfilter.Res
now := time.Now() now := time.Now()
entry := logEntry{ entry := logEntry{
Question: q, IP: ip,
Time: now,
Answer: a, Answer: a,
Result: *result, Result: *result,
Time: now,
Elapsed: elapsed, Elapsed: elapsed,
IP: ip,
Upstream: upstream, Upstream: upstream,
} }
q := question.Question[0]
entry.QHost = strings.ToLower(q.Name[:len(q.Name)-1]) // remove the last dot
entry.QType = dns.Type(q.Qtype).String()
entry.QClass = dns.Class(q.Qclass).String()
l.bufferLock.Lock() l.bufferLock.Lock()
l.buffer = append(l.buffer, &entry) l.buffer = append(l.buffer, &entry)
@ -182,33 +184,22 @@ func isNeeded(entry *logEntry, params getDataParams) bool {
return false return false
} }
if len(params.Domain) != 0 || params.QuestionType != 0 { if len(params.QuestionType) != 0 {
m := dns.Msg{} if entry.QType != params.QuestionType {
_ = m.Unpack(entry.Question) return false
if params.QuestionType != 0 {
if m.Question[0].Qtype != params.QuestionType {
return false
}
}
if len(params.Domain) != 0 && params.StrictMatchDomain {
if m.Question[0].Name != params.Domain {
return false
}
} else if len(params.Domain) != 0 {
if strings.Index(m.Question[0].Name, params.Domain) == -1 {
return false
}
} }
} }
if len(params.Client) != 0 && params.StrictMatchClient { if len(params.Domain) != 0 {
if entry.IP != params.Client { if (params.StrictMatchDomain && entry.QHost != params.Domain) ||
(!params.StrictMatchDomain && strings.Index(entry.QHost, params.Domain) == -1) {
return false return false
} }
} else if len(params.Client) != 0 { }
if strings.Index(entry.IP, params.Client) == -1 {
if len(params.Client) != 0 {
if (params.StrictMatchClient && entry.IP != params.Client) ||
(!params.StrictMatchClient && strings.Index(entry.IP, params.Client) == -1) {
return false return false
} }
} }
@ -216,31 +207,23 @@ func isNeeded(entry *logEntry, params getDataParams) bool {
return true return true
} }
func (l *queryLog) readFromFile(params getDataParams) ([]*logEntry, int) { func (l *queryLog) readFromFile(params getDataParams) ([]*logEntry, time.Time, int) {
entries := []*logEntry{} entries := []*logEntry{}
olderThan := params.OlderThan oldest := time.Time{}
totalChunks := 0
total := 0
r := l.OpenReader() r := l.OpenReader()
if r == nil { if r == nil {
return entries, 0 return entries, time.Time{}, 0
} }
r.BeginRead(olderThan, getDataLimit) r.BeginRead(params.OlderThan, getDataLimit, &params)
for totalChunks < maxFilteringChunks { total := uint64(0)
first := true for total <= maxSearchEntries {
newEntries := []*logEntry{} newEntries := []*logEntry{}
for { for {
entry := r.Next() entry := r.Next()
if entry == nil { if entry == nil {
break break
} }
total++
if first {
first = false
olderThan = entry.Time
}
if !isNeeded(entry, params) { if !isNeeded(entry, params) {
continue continue
@ -251,7 +234,7 @@ func (l *queryLog) readFromFile(params getDataParams) ([]*logEntry, int) {
newEntries = append(newEntries, entry) newEntries = append(newEntries, entry)
} }
log.Debug("entries: +%d (%d) older-than:%s", len(newEntries), len(entries), olderThan) log.Debug("entries: +%d (%d) [%d]", len(newEntries), len(entries), r.Total())
entries = append(newEntries, entries...) entries = append(newEntries, entries...)
if len(entries) > getDataLimit { if len(entries) > getDataLimit {
@ -259,15 +242,16 @@ func (l *queryLog) readFromFile(params getDataParams) ([]*logEntry, int) {
entries = entries[toremove:] entries = entries[toremove:]
break break
} }
if first || len(entries) == getDataLimit { if r.Total() == 0 || len(entries) == getDataLimit {
break break
} }
totalChunks++ total += r.Total()
r.BeginReadPrev(olderThan, getDataLimit) oldest = r.Oldest()
r.BeginReadPrev(getDataLimit)
} }
r.Close() r.Close()
return entries, total return entries, oldest, int(total)
} }
// Parameters for getData() // Parameters for getData()
@ -275,7 +259,7 @@ type getDataParams struct {
OlderThan time.Time // return entries that are older than this value OlderThan time.Time // return entries that are older than this value
Domain string // filter by domain name in question Domain string // filter by domain name in question
Client string // filter by client IP Client string // filter by client IP
QuestionType uint16 // filter by question type QuestionType string // filter by question type
ResponseStatus responseStatusType // filter by response status ResponseStatus responseStatusType // filter by response status
StrictMatchDomain bool // if Domain value must be matched strictly StrictMatchDomain bool // if Domain value must be matched strictly
StrictMatchClient bool // if Client value must be matched strictly StrictMatchClient bool // if Client value must be matched strictly
@ -291,19 +275,16 @@ const (
) )
// Get log entries // Get log entries
func (l *queryLog) getData(params getDataParams) []map[string]interface{} { func (l *queryLog) getData(params getDataParams) map[string]interface{} {
var data = []map[string]interface{}{} var data = []map[string]interface{}{}
if len(params.Domain) != 0 && params.StrictMatchDomain { var oldest time.Time
params.Domain = params.Domain + "."
}
now := time.Now() now := time.Now()
entries := []*logEntry{} entries := []*logEntry{}
total := 0 total := 0
// add from file // add from file
entries, total = l.readFromFile(params) entries, oldest, total = l.readFromFile(params)
if params.OlderThan.IsZero() { if params.OlderThan.IsZero() {
params.OlderThan = now params.OlderThan = now
@ -332,26 +313,12 @@ func (l *queryLog) getData(params getDataParams) []map[string]interface{} {
// process the elements from latest to oldest // process the elements from latest to oldest
for i := len(entries) - 1; i >= 0; i-- { for i := len(entries) - 1; i >= 0; i-- {
entry := entries[i] entry := entries[i]
var q *dns.Msg
var a *dns.Msg var a *dns.Msg
if len(entry.Question) == 0 {
continue
}
q = new(dns.Msg)
if err := q.Unpack(entry.Question); err != nil {
log.Tracef("q.Unpack(): %s", err)
continue
}
if len(q.Question) != 1 {
log.Tracef("len(q.Question) != 1")
continue
}
if len(entry.Answer) > 0 { if len(entry.Answer) > 0 {
a = new(dns.Msg) a = new(dns.Msg)
if err := a.Unpack(entry.Answer); err != nil { if err := a.Unpack(entry.Answer); err != nil {
log.Debug("Failed to unpack dns message answer: %s", err) log.Debug("Failed to unpack dns message answer: %s: %s", err, string(entry.Answer))
a = nil a = nil
} }
} }
@ -363,9 +330,9 @@ func (l *queryLog) getData(params getDataParams) []map[string]interface{} {
"client": entry.IP, "client": entry.IP,
} }
jsonEntry["question"] = map[string]interface{}{ jsonEntry["question"] = map[string]interface{}{
"host": strings.ToLower(strings.TrimSuffix(q.Question[0].Name, ".")), "host": entry.QHost,
"type": dns.Type(q.Question[0].Qtype).String(), "type": entry.QType,
"class": dns.Class(q.Question[0].Qclass).String(), "class": entry.QClass,
} }
if a != nil { if a != nil {
@ -390,7 +357,17 @@ func (l *queryLog) getData(params getDataParams) []map[string]interface{} {
log.Debug("QueryLog: prepared data (%d/%d) older than %s in %s", log.Debug("QueryLog: prepared data (%d/%d) older than %s in %s",
len(entries), total, params.OlderThan, time.Since(now)) len(entries), total, params.OlderThan, time.Since(now))
return data
var result = map[string]interface{}{}
if len(entries) == getDataLimit {
oldest = entries[0].Time
}
result["oldest"] = ""
if !oldest.IsZero() {
result["oldest"] = oldest.Format(time.RFC3339Nano)
}
result["data"] = data
return result
} }
func answerToMap(a *dns.Msg) []map[string]interface{} { func answerToMap(a *dns.Msg) []map[string]interface{} {

View File

@ -67,12 +67,12 @@ func (l *queryLog) handleQueryLog(w http.ResponseWriter, r *http.Request) {
} }
if len(req.filterQuestionType) != 0 { if len(req.filterQuestionType) != 0 {
qtype, ok := dns.StringToType[req.filterQuestionType] _, ok := dns.StringToType[req.filterQuestionType]
if !ok { if !ok {
httpError(r, w, http.StatusBadRequest, "invalid question_type") httpError(r, w, http.StatusBadRequest, "invalid question_type")
return return
} }
params.QuestionType = qtype params.QuestionType = req.filterQuestionType
} }
if len(req.filterResponseStatus) != 0 { if len(req.filterResponseStatus) != 0 {

View File

@ -4,13 +4,17 @@ import (
"bufio" "bufio"
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"encoding/base64"
"encoding/json" "encoding/json"
"io" "io"
"os" "os"
"strconv"
"strings" "strings"
"time" "time"
"github.com/AdguardTeam/AdGuardHome/dnsfilter"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
"github.com/miekg/dns"
) )
const enableGzip = false const enableGzip = false
@ -145,13 +149,15 @@ func (l *queryLog) periodicRotate() {
// Reader is the DB reader context // Reader is the DB reader context
type Reader struct { type Reader struct {
ql *queryLog ql *queryLog
search *getDataParams
f *os.File f *os.File
reader *bufio.Reader // reads file line by line reader *bufio.Reader // reads file line by line
now time.Time now time.Time
validFrom int64 // UNIX time (ns) validFrom int64 // UNIX time (ns)
olderThan int64 // UNIX time (ns) olderThan int64 // UNIX time (ns)
oldest time.Time
files []string files []string
ifile int ifile int
@ -161,10 +167,12 @@ type Reader struct {
latest bool // return the latest entries latest bool // return the latest entries
filePrepared bool filePrepared bool
searching bool // we're seaching for an entry with exact time stamp seeking bool // we're seaching for an entry with exact time stamp
fseeker fileSeeker // file seeker object fseeker fileSeeker // file seeker object
fpos uint64 // current file offset fpos uint64 // current file offset
nSeekRequests uint32 // number of Seek() requests made (finding a new line doesn't count) nSeekRequests uint32 // number of Seek() requests made (finding a new line doesn't count)
timecnt uint64
} }
type fileSeeker struct { type fileSeeker struct {
@ -197,8 +205,8 @@ func (r *Reader) Close() {
if r.count > 0 { if r.count > 0 {
perunit = elapsed / time.Duration(r.count) perunit = elapsed / time.Duration(r.count)
} }
log.Debug("querylog: read %d entries in %v, %v/entry, seek-reqs:%d", log.Debug("querylog: read %d entries in %v, %v/entry, seek-reqs:%d time:%dus (%d%%)",
r.count, elapsed, perunit, r.nSeekRequests) r.count, elapsed, perunit, r.nSeekRequests, r.timecnt/1000, r.timecnt*100/uint64(elapsed.Nanoseconds()))
if r.f != nil { if r.f != nil {
r.f.Close() r.f.Close()
@ -208,25 +216,26 @@ func (r *Reader) Close() {
// BeginRead - start reading // BeginRead - start reading
// olderThan: stop returning entries when an entry with this time is reached // olderThan: stop returning entries when an entry with this time is reached
// count: minimum number of entries to return // count: minimum number of entries to return
func (r *Reader) BeginRead(olderThan time.Time, count uint64) { func (r *Reader) BeginRead(olderThan time.Time, count uint64, search *getDataParams) {
r.olderThan = olderThan.UnixNano() r.olderThan = olderThan.UnixNano()
r.latest = olderThan.IsZero() r.latest = olderThan.IsZero()
r.oldest = time.Time{}
r.search = search
r.limit = count r.limit = count
if r.latest { if r.latest {
r.olderThan = r.now.UnixNano() r.olderThan = r.now.UnixNano()
} }
r.filePrepared = false r.filePrepared = false
r.searching = false r.seeking = false
} }
// BeginReadPrev - start reading the previous data chunk // BeginReadPrev - start reading the previous data chunk
func (r *Reader) BeginReadPrev(olderThan time.Time, count uint64) { func (r *Reader) BeginReadPrev(count uint64) {
r.olderThan = olderThan.UnixNano() r.olderThan = r.oldest.UnixNano()
r.latest = olderThan.IsZero() r.oldest = time.Time{}
r.latest = false
r.limit = count r.limit = count
if r.latest { r.count = 0
r.olderThan = r.now.UnixNano()
}
off := r.fpos - maxEntrySize*(r.limit+1) off := r.fpos - maxEntrySize*(r.limit+1)
if int64(off) < maxEntrySize { if int64(off) < maxEntrySize {
@ -245,7 +254,7 @@ func (r *Reader) BeginReadPrev(olderThan time.Time, count uint64) {
r.fseeker.pos = r.fpos r.fseeker.pos = r.fpos
r.filePrepared = true r.filePrepared = true
r.searching = false r.seeking = false
} }
// Perform binary seek // Perform binary seek
@ -335,7 +344,7 @@ func (r *Reader) prepareRead() bool {
} }
} else { } else {
// start searching in file: we'll read the first chunk of data from the middle of file // start searching in file: we'll read the first chunk of data from the middle of file
r.searching = true r.seeking = true
r.fseeker = fileSeeker{} r.fseeker = fileSeeker{}
r.fseeker.target = uint64(r.olderThan) r.fseeker.target = uint64(r.olderThan)
r.fseeker.hi = fsize r.fseeker.hi = fsize
@ -358,6 +367,226 @@ func (r *Reader) prepareRead() bool {
return true return true
} }
// Get bool value from "key":bool
func readJSONBool(s, name string) (bool, bool) {
i := strings.Index(s, "\""+name+"\":")
if i == -1 {
return false, false
}
start := i + 1 + len(name) + 2
b := false
if strings.HasPrefix(s[start:], "true") {
b = true
} else if !strings.HasPrefix(s[start:], "false") {
return false, false
}
return b, true
}
// Get value from "key":"value"
func readJSONValue(s, name string) string {
i := strings.Index(s, "\""+name+"\":\"")
if i == -1 {
return ""
}
start := i + 1 + len(name) + 3
i = strings.IndexByte(s[start:], '"')
if i == -1 {
return ""
}
end := start + i
return s[start:end]
}
func (r *Reader) applySearch(str string) bool {
if r.search.ResponseStatus == responseStatusFiltered {
boolVal, ok := readJSONBool(str, "IsFiltered")
if !ok || !boolVal {
return false
}
}
if len(r.search.Domain) != 0 {
val := readJSONValue(str, "QH")
if len(val) == 0 {
return false
}
if (r.search.StrictMatchDomain && val != r.search.Domain) ||
(!r.search.StrictMatchDomain && strings.Index(val, r.search.Domain) == -1) {
return false
}
}
if len(r.search.QuestionType) != 0 {
val := readJSONValue(str, "QT")
if len(val) == 0 {
return false
}
if val != r.search.QuestionType {
return false
}
}
if len(r.search.Client) != 0 {
val := readJSONValue(str, "IP")
if len(val) == 0 {
log.Debug("QueryLog: failed to decode")
return false
}
if (r.search.StrictMatchClient && val != r.search.Client) ||
(!r.search.StrictMatchClient && strings.Index(val, r.search.Client) == -1) {
return false
}
}
return true
}
const (
jsonTErr = iota
jsonTObj
jsonTStr
jsonTNum
jsonTBool
)
// Parse JSON key-value pair
// e.g.: "key":VALUE where VALUE is "string", true|false (boolean), or 123.456 (number)
// Note the limitations:
// . doesn't support whitespace
// . doesn't support "null"
// . doesn't validate boolean or number
// . no proper handling of {} braces
// . no handling of [] brackets
// Return (key, value, type)
func readJSON(ps *string) (string, string, int32) {
s := *ps
k := ""
v := ""
t := int32(jsonTErr)
q1 := strings.IndexByte(s, '"')
if q1 == -1 {
return k, v, t
}
q2 := strings.IndexByte(s[q1+1:], '"')
if q2 == -1 {
return k, v, t
}
k = s[q1+1 : q1+1+q2]
s = s[q1+1+q2+1:]
if len(s) < 2 || s[0] != ':' {
return k, v, t
}
if s[1] == '"' {
q2 = strings.IndexByte(s[2:], '"')
if q2 == -1 {
return k, v, t
}
v = s[2 : 2+q2]
t = jsonTStr
s = s[2+q2+1:]
} else if s[1] == '{' {
t = jsonTObj
s = s[1+1:]
} else {
sep := strings.IndexAny(s[1:], ",}")
if sep == -1 {
return k, v, t
}
v = s[1 : 1+sep]
if s[1] == 't' || s[1] == 'f' {
t = jsonTBool
} else if s[1] == '.' || (s[1] >= '0' && s[1] <= '9') {
t = jsonTNum
}
s = s[1+sep+1:]
}
*ps = s
return k, v, t
}
// nolint (gocyclo)
func decode(ent *logEntry, str string) {
var b bool
var i int
var err error
for {
k, v, t := readJSON(&str)
if t == jsonTErr {
break
}
switch k {
case "IP":
ent.IP = v
case "T":
ent.Time, err = time.Parse(time.RFC3339, v)
case "QH":
ent.QHost = v
case "QT":
ent.QType = v
case "QC":
ent.QClass = v
case "Answer":
ent.Answer, err = base64.StdEncoding.DecodeString(v)
case "IsFiltered":
b, err = strconv.ParseBool(v)
ent.Result.IsFiltered = b
case "Rule":
ent.Result.Rule = v
case "FilterID":
i, err = strconv.Atoi(v)
ent.Result.FilterID = int64(i)
case "Reason":
i, err = strconv.Atoi(v)
ent.Result.Reason = dnsfilter.Reason(i)
case "Upstream":
ent.Upstream = v
case "Elapsed":
i, err = strconv.Atoi(v)
ent.Elapsed = time.Duration(i)
// pre-v0.99.3 compatibility:
case "Question":
var qstr []byte
qstr, err = base64.StdEncoding.DecodeString(v)
if err != nil {
break
}
q := new(dns.Msg)
err = q.Unpack(qstr)
if err != nil {
break
}
ent.QHost = q.Question[0].Name
if len(ent.QHost) == 0 {
break
}
ent.QHost = ent.QHost[:len(ent.QHost)-1]
ent.QType = dns.TypeToString[q.Question[0].Qtype]
ent.QClass = dns.ClassToString[q.Question[0].Qclass]
case "Time":
ent.Time, err = time.Parse(time.RFC3339, v)
}
if err != nil {
log.Debug("decode err: %s", err)
break
}
}
}
// Next - return the next entry or nil if reading is finished // Next - return the next entry or nil if reading is finished
func (r *Reader) Next() *logEntry { // nolint func (r *Reader) Next() *logEntry { // nolint
for { for {
@ -379,24 +608,28 @@ func (r *Reader) Next() *logEntry { // nolint
r.filePrepared = true r.filePrepared = true
} }
// open decoder
b, err := r.reader.ReadBytes('\n') b, err := r.reader.ReadBytes('\n')
if err != nil { if err != nil {
return nil return nil
} }
strReader := strings.NewReader(string(b)) str := string(b)
jd := json.NewDecoder(strReader)
// read data val := readJSONValue(str, "T")
var entry logEntry if len(val) == 0 {
err = jd.Decode(&entry) val = readJSONValue(str, "Time")
if err != nil { }
log.Debug("QueryLog: Failed to decode: %s", err) if len(val) == 0 {
log.Debug("QueryLog: failed to decode")
continue continue
} }
tm, err := time.Parse(time.RFC3339, val)
if err != nil {
log.Debug("QueryLog: failed to decode")
continue
}
t := tm.UnixNano()
t := entry.Time.UnixNano() if r.seeking {
if r.searching {
r.reader = nil r.reader = nil
rr := r.fseeker.seekBinary(uint64(t)) rr := r.fseeker.seekBinary(uint64(t))
@ -407,7 +640,7 @@ func (r *Reader) Next() *logEntry { // nolint
} else if rr == 0 { } else if rr == 0 {
// We found the target entry. // We found the target entry.
// We'll start reading the previous chunk of data. // We'll start reading the previous chunk of data.
r.searching = false r.seeking = false
off := r.fpos - (maxEntrySize * (r.limit + 1)) off := r.fpos - (maxEntrySize * (r.limit + 1))
if int64(off) < maxEntrySize { if int64(off) < maxEntrySize {
@ -430,19 +663,37 @@ func (r *Reader) Next() *logEntry { // nolint
continue continue
} }
if r.oldest.IsZero() {
r.oldest = tm
}
if t < r.validFrom { if t < r.validFrom {
continue continue
} }
if t >= r.olderThan { if t >= r.olderThan {
return nil return nil
} }
r.count++ r.count++
return &entry
if !r.applySearch(str) {
continue
}
st := time.Now()
var ent logEntry
decode(&ent, str)
r.timecnt += uint64(time.Now().Sub(st).Nanoseconds())
return &ent
} }
} }
// Total returns the total number of items // Total returns the total number of processed items
func (r *Reader) Total() int { func (r *Reader) Total() uint64 {
return 0 return r.count
}
// Oldest returns the time of the oldest processed entry
func (r *Reader) Oldest() time.Time {
return r.oldest
} }

View File

@ -42,7 +42,35 @@ func TestQueryLog(t *testing.T) {
OlderThan: time.Now(), OlderThan: time.Now(),
} }
d := l.getData(params) d := l.getData(params)
m := d[0] mdata := d["data"].([]map[string]interface{})
m := mdata[0]
mq := m["question"].(map[string]interface{}) mq := m["question"].(map[string]interface{})
assert.True(t, mq["host"].(string) == "example.org") assert.True(t, mq["host"].(string) == "example.org")
} }
func TestJSON(t *testing.T) {
s := `
{"keystr":"val","obj":{"keybool":true,"keyint":123456}}
`
k, v, jtype := readJSON(&s)
assert.Equal(t, jtype, int32(jsonTStr))
assert.Equal(t, "keystr", k)
assert.Equal(t, "val", v)
k, v, jtype = readJSON(&s)
assert.Equal(t, jtype, int32(jsonTObj))
assert.Equal(t, "obj", k)
k, v, jtype = readJSON(&s)
assert.Equal(t, jtype, int32(jsonTBool))
assert.Equal(t, "keybool", k)
assert.Equal(t, "true", v)
k, v, jtype = readJSON(&s)
assert.Equal(t, jtype, int32(jsonTNum))
assert.Equal(t, "keyint", k)
assert.Equal(t, "123456", v)
k, v, jtype = readJSON(&s)
assert.True(t, jtype == jsonTErr)
}