Merge: * querylog: skip decoding errors

Close #753

* commit 'c74ae0d0e7ca03a592663025dc644397c1e31d57':
  * querylog: skip decoding errors
This commit is contained in:
Simon Zolin 2019-10-24 16:00:07 +03:00
commit 8323c0c4b6
1 changed files with 16 additions and 31 deletions

View File

@ -1,11 +1,13 @@
package querylog package querylog
import ( import (
"bufio"
"bytes" "bytes"
"compress/gzip" "compress/gzip"
"encoding/json" "encoding/json"
"io" "io"
"os" "os"
"strings"
"time" "time"
"github.com/AdguardTeam/golibs/log" "github.com/AdguardTeam/golibs/log"
@ -146,7 +148,7 @@ type Reader struct {
ql *queryLog ql *queryLog
f *os.File f *os.File
jd *json.Decoder reader *bufio.Reader // reads file line by line
now time.Time now time.Time
validFrom int64 // UNIX time (ns) validFrom int64 // UNIX time (ns)
olderThan int64 // UNIX time (ns) olderThan int64 // UNIX time (ns)
@ -215,7 +217,6 @@ func (r *Reader) BeginRead(olderThan time.Time, count uint64) {
} }
r.filePrepared = false r.filePrepared = false
r.searching = false r.searching = false
r.jd = nil
} }
// BeginReadPrev - start reading the previous data chunk // BeginReadPrev - start reading the previous data chunk
@ -245,7 +246,6 @@ func (r *Reader) BeginReadPrev(olderThan time.Time, count uint64) {
r.filePrepared = true r.filePrepared = true
r.searching = false r.searching = false
r.jd = nil
} }
// Perform binary seek // Perform binary seek
@ -279,27 +279,17 @@ func (fs *fileSeeker) seekBinary(cur uint64) int32 {
// Seek to a new line // Seek to a new line
func (r *Reader) seekToNewLine() bool { func (r *Reader) seekToNewLine() bool {
b := make([]byte, maxEntrySize*2) r.reader = bufio.NewReader(r.f)
b, err := r.reader.ReadBytes('\n')
_, err := r.f.Read(b)
if err != nil { if err != nil {
r.reader = nil
log.Error("QueryLog: file.Read: %s: %s", r.files[r.ifile], err) log.Error("QueryLog: file.Read: %s: %s", r.files[r.ifile], err)
return false return false
} }
off := bytes.IndexByte(b, '\n') + 1 off := len(b)
if off == 0 {
log.Error("QueryLog: Can't find a new line: %s", r.files[r.ifile])
return false
}
r.fpos += uint64(off) r.fpos += uint64(off)
log.Debug("QueryLog: seek: %x (+%d)", r.fpos, off) log.Debug("QueryLog: seek: %x (+%d)", r.fpos, off)
_, err = r.f.Seek(int64(r.fpos), io.SeekStart)
if err != nil {
log.Error("QueryLog: file.Seek: %s: %s", r.files[r.ifile], err)
return false
}
return true return true
} }
@ -370,7 +360,6 @@ func (r *Reader) prepareRead() bool {
// Next - return the next entry or nil if reading is finished // Next - return the next entry or nil if reading is finished
func (r *Reader) Next() *logEntry { // nolint func (r *Reader) Next() *logEntry { // nolint
var err error
for { for {
// open file if needed // open file if needed
if r.f == nil { if r.f == nil {
@ -390,30 +379,26 @@ func (r *Reader) Next() *logEntry { // nolint
r.filePrepared = true r.filePrepared = true
} }
// open decoder if needed // open decoder
if r.jd == nil { b, err := r.reader.ReadBytes('\n')
r.jd = json.NewDecoder(r.f) if err != nil {
}
// check if there's data
if !r.jd.More() {
r.jd = nil
return nil return nil
} }
strReader := strings.NewReader(string(b))
jd := json.NewDecoder(strReader)
// read data // read data
var entry logEntry var entry logEntry
err = r.jd.Decode(&entry) err = jd.Decode(&entry)
if err != nil { if err != nil {
log.Error("QueryLog: Failed to decode: %s", err) log.Debug("QueryLog: Failed to decode: %s", err)
r.jd = nil continue
return nil
} }
t := entry.Time.UnixNano() t := entry.Time.UnixNano()
if r.searching { if r.searching {
r.jd = nil
r.reader = nil
rr := r.fseeker.seekBinary(uint64(t)) rr := r.fseeker.seekBinary(uint64(t))
r.fpos = r.fseeker.pos r.fpos = r.fseeker.pos
if rr < 0 { if rr < 0 {