From c74ae0d0e7ca03a592663025dc644397c1e31d57 Mon Sep 17 00:00:00 2001 From: Simon Zolin Date: Tue, 22 Oct 2019 15:51:48 +0300 Subject: [PATCH] * querylog: skip decoding errors We read line from file and pass it to a JSON decoder. JSON decoder is now a local object. --- querylog/querylog_file.go | 47 +++++++++++++-------------------------- 1 file changed, 16 insertions(+), 31 deletions(-) diff --git a/querylog/querylog_file.go b/querylog/querylog_file.go index 1a6c4481..c65d03ed 100644 --- a/querylog/querylog_file.go +++ b/querylog/querylog_file.go @@ -1,12 +1,14 @@ package querylog import ( + "bufio" "bytes" "compress/gzip" "encoding/json" "fmt" "io" "os" + "strings" "time" "github.com/AdguardTeam/golibs/log" @@ -181,7 +183,7 @@ type Reader struct { ql *queryLog f *os.File - jd *json.Decoder + reader *bufio.Reader // reads file line by line now time.Time validFrom int64 // UNIX time (ns) olderThan int64 // UNIX time (ns) @@ -250,7 +252,6 @@ func (r *Reader) BeginRead(olderThan time.Time, count uint64) { } r.filePrepared = false r.searching = false - r.jd = nil } // BeginReadPrev - start reading the previous data chunk @@ -280,7 +281,6 @@ func (r *Reader) BeginReadPrev(olderThan time.Time, count uint64) { r.filePrepared = true r.searching = false - r.jd = nil } // Perform binary seek @@ -314,27 +314,17 @@ func (fs *fileSeeker) seekBinary(cur uint64) int32 { // Seek to a new line func (r *Reader) seekToNewLine() bool { - b := make([]byte, maxEntrySize*2) - - _, err := r.f.Read(b) + r.reader = bufio.NewReader(r.f) + b, err := r.reader.ReadBytes('\n') if err != nil { + r.reader = nil log.Error("QueryLog: file.Read: %s: %s", r.files[r.ifile], err) return false } - off := bytes.IndexByte(b, '\n') + 1 - if off == 0 { - log.Error("QueryLog: Can't find a new line: %s", r.files[r.ifile]) - return false - } - + off := len(b) r.fpos += uint64(off) log.Debug("QueryLog: seek: %x (+%d)", r.fpos, off) - _, err = r.f.Seek(int64(r.fpos), io.SeekStart) - if err != nil { - log.Error("QueryLog: file.Seek: %s: %s", r.files[r.ifile], err) - return false - } return true } @@ -405,7 +395,6 @@ func (r *Reader) prepareRead() bool { // Next - return the next entry or nil if reading is finished func (r *Reader) Next() *logEntry { // nolint - var err error for { // open file if needed if r.f == nil { @@ -425,30 +414,26 @@ func (r *Reader) Next() *logEntry { // nolint r.filePrepared = true } - // open decoder if needed - if r.jd == nil { - r.jd = json.NewDecoder(r.f) - } - - // check if there's data - if !r.jd.More() { - r.jd = nil + // open decoder + b, err := r.reader.ReadBytes('\n') + if err != nil { return nil } + strReader := strings.NewReader(string(b)) + jd := json.NewDecoder(strReader) // read data var entry logEntry - err = r.jd.Decode(&entry) + err = jd.Decode(&entry) if err != nil { - log.Error("QueryLog: Failed to decode: %s", err) - r.jd = nil - return nil + log.Debug("QueryLog: Failed to decode: %s", err) + continue } t := entry.Time.UnixNano() if r.searching { - r.jd = nil + r.reader = nil rr := r.fseeker.seekBinary(uint64(t)) r.fpos = r.fseeker.pos if rr < 0 {