badguardhome/querylog/qlog_file.go
Simon Zolin 4efc464e98 - querylog: file rotation didn't work properly; fix entry searching algorithm
If AGH is restarted, file rotation timer is reset
which can lead to the situation when file rotation procedure is never started.

Squashed commit of the following:

commit 427ae91a512cd146ebfffad06ed24eb723cb9e7d
Merge: 067fac65 e56c746b
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Wed Sep 2 18:18:46 2020 +0300

    Merge remote-tracking branch 'origin/master' into qlogs-rotate

commit 067fac65b1a87d499900f4860ffa96ed8208967c
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Wed Sep 2 15:30:48 2020 +0300

    minor

commit c2059a15700e5696cb1bb5cd49129c6020d986f4
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Wed Sep 2 14:53:07 2020 +0300

    improve

commit a279438eaf1cf40b820652093fb56d56784de7d8
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Tue Sep 1 18:49:14 2020 +0300

    minor

commit 26ac130f139f565de39200e484b3bd4a04afcfcc
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Tue Sep 1 13:54:27 2020 +0300

    rename

commit 0fad7b88dbeadcddd4d77536a18da72f3203ea80
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Tue Sep 1 13:05:36 2020 +0300

    + TestQLogSeek

commit fa6afc6d4dc592b1fef67c4a069ea50fae600a58
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Tue Sep 1 13:05:34 2020 +0300

    minor

commit 11e6ab9131e5c37467e8530a2db95a82bbb0603b
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Mon Aug 31 19:45:47 2020 +0300

    fix tests

commit 7cbb89948df0e69b1bae8f8cde1879b5b1c4b1d6
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Mon Aug 31 19:29:43 2020 +0300

    - querylog: fix entry searching algorithm

commit 745d44863d88b321bd7001f24a68620f7ef05819
Author: Simon Zolin <s.zolin@adguard.com>
Date:   Mon Aug 31 18:34:14 2020 +0300

    - querylog: file rotation didn't work properly

    If AGH is restarted, file rotation timer is reset
     which can lead to the situation when file rotation procedure is never started.
2020-09-02 19:42:26 +03:00

338 lines
9.3 KiB
Go

package querylog
import (
"io"
"os"
"sync"
"time"
"github.com/AdguardTeam/golibs/log"
"github.com/pkg/errors"
)
// ErrSeekNotFound is returned from the Seek method
// if we failed to find the desired record
var ErrSeekNotFound = errors.New("Seek not found the record")
// TODO: Find a way to grow buffer instead of relying on this value when reading strings
const maxEntrySize = 16 * 1024
// buffer should be enough for at least this number of entries
const bufferSize = 100 * maxEntrySize
// QLogFile represents a single query log file
// It allows reading from the file in the reverse order
//
// Please note that this is a stateful object.
// Internally, it contains a pointer to a specific position in the file,
// and it reads lines in reverse order starting from that position.
type QLogFile struct {
file *os.File // the query log file
position int64 // current position in the file
buffer []byte // buffer that we've read from the file
bufferStart int64 // start of the buffer (in the file)
bufferLen int // buffer len
lock sync.Mutex // We use mutex to make it thread-safe
}
// NewQLogFile initializes a new instance of the QLogFile
func NewQLogFile(path string) (*QLogFile, error) {
f, err := os.OpenFile(path, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}
return &QLogFile{
file: f,
}, nil
}
// Seek performs binary search in the query log file looking for a record
// with the specified timestamp. Once the record is found, it sets
// "position" so that the next ReadNext call returned that record.
//
// The algorithm is rather simple:
// 1. It starts with the position in the middle of a file
// 2. Shifts back to the beginning of the line
// 3. Checks the log record timestamp
// 4. If it is lower than the timestamp we are looking for,
// it shifts seek position to 3/4 of the file. Otherwise, to 1/4 of the file.
// 5. It performs the search again, every time the search scope is narrowed twice.
//
// Returns:
// * It returns the position of the the line with the timestamp we were looking for
// so that when we call "ReadNext" this line was returned.
// * Depth of the search (how many times we compared timestamps).
// * If we could not find it, it returns ErrSeekNotFound
func (q *QLogFile) Seek(timestamp int64) (int64, int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Empty the buffer
q.buffer = nil
// First of all, check the file size
fileInfo, err := q.file.Stat()
if err != nil {
return 0, 0, err
}
// Define the search scope
start := int64(0) // start of the search interval (position in the file)
end := fileInfo.Size() // end of the search interval (position in the file)
probe := (end - start) / 2 // probe -- approximate index of the line we'll try to check
var line string
var lineIdx int64 // index of the probe line in the file
var lineEndIdx int64
var lastProbeLineIdx int64 // index of the last probe line
lastProbeLineIdx = -1
// Count seek depth in order to detect mistakes
// If depth is too large, we should stop the search
depth := 0
for {
// Get the line at the specified position
line, lineIdx, lineEndIdx, err = q.readProbeLine(probe)
if err != nil {
return 0, depth, err
}
if lineIdx < start || lineEndIdx > end || lineIdx == lastProbeLineIdx {
// If we're testing the same line twice then most likely
// the scope is too narrow and we won't find anything anymore
log.Error("querylog: didn't find timestamp:%v", timestamp)
return 0, depth, ErrSeekNotFound
}
// Save the last found idx
lastProbeLineIdx = lineIdx
// Get the timestamp from the query log record
ts := readQLogTimestamp(line)
if ts == 0 {
return 0, depth, ErrSeekNotFound
}
if ts == timestamp {
// Hurray, returning the result
break
}
// Narrow the scope and repeat the search
if ts > timestamp {
// If the timestamp we're looking for is OLDER than what we found
// Then the line is somewhere on the LEFT side from the current probe position
end = lineIdx
} else {
// If the timestamp we're looking for is NEWER than what we found
// Then the line is somewhere on the RIGHT side from the current probe position
start = lineEndIdx
}
probe = start + (end-start)/2
depth++
if depth >= 100 {
log.Error("Seek depth is too high, aborting. File %s, ts %v", q.file.Name(), timestamp)
return 0, depth, ErrSeekNotFound
}
}
q.position = lineIdx + int64(len(line))
return q.position, depth, nil
}
// SeekStart changes the current position to the end of the file
// Please note that we're reading query log in the reverse order
// and that's why log start is actually the end of file
//
// Returns nil if we were able to change the current position.
// Returns error in any other case.
func (q *QLogFile) SeekStart() (int64, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Empty the buffer
q.buffer = nil
// First of all, check the file size
fileInfo, err := q.file.Stat()
if err != nil {
return 0, err
}
// Place the position to the very end of file
q.position = fileInfo.Size() - 1
if q.position < 0 {
q.position = 0
}
return q.position, nil
}
// ReadNext reads the next line (in the reverse order) from the file
// and shifts the current position left to the next (actually prev) line.
// returns io.EOF if there's nothing to read more
func (q *QLogFile) ReadNext() (string, error) {
q.lock.Lock()
defer q.lock.Unlock()
if q.position == 0 {
return "", io.EOF
}
line, lineIdx, err := q.readNextLine(q.position)
if err != nil {
return "", err
}
// Shift position
if lineIdx == 0 {
q.position = 0
} else {
// there's usually a line break before the line
// so we should shift one more char left from the line
// line\nline
q.position = lineIdx - 1
}
return line, err
}
// Close frees the underlying resources
func (q *QLogFile) Close() error {
return q.file.Close()
}
// readNextLine reads the next line from the specified position
// this line actually have to END on that position.
//
// the algorithm is:
// 1. check if we have the buffer initialized
// 2. if it is, scan it and look for the line there
// 3. if we cannot find the line there, read the prev chunk into the buffer
// 4. read the line from the buffer
func (q *QLogFile) readNextLine(position int64) (string, int64, error) {
relativePos := position - q.bufferStart
if q.buffer == nil || (relativePos < maxEntrySize && q.bufferStart != 0) {
// Time to re-init the buffer
err := q.initBuffer(position)
if err != nil {
return "", 0, err
}
relativePos = position - q.bufferStart
}
// Look for the end of the prev line
// This is where we'll read from
var startLine = int64(0)
for i := relativePos - 1; i >= 0; i-- {
if q.buffer[i] == '\n' {
startLine = i + 1
break
}
}
line := string(q.buffer[startLine:relativePos])
lineIdx := q.bufferStart + startLine
return line, lineIdx, nil
}
// initBuffer initializes the QLogFile buffer.
// the goal is to read a chunk of file that includes the line with the specified position.
func (q *QLogFile) initBuffer(position int64) error {
q.bufferStart = int64(0)
if (position - bufferSize) > 0 {
q.bufferStart = position - bufferSize
}
// Seek to this position
_, err := q.file.Seek(q.bufferStart, io.SeekStart)
if err != nil {
return err
}
if q.buffer == nil {
q.buffer = make([]byte, bufferSize)
}
q.bufferLen, err = q.file.Read(q.buffer)
if err != nil {
return err
}
return nil
}
// readProbeLine reads a line that includes the specified position
// this method is supposed to be used when we use binary search in the Seek method
// in the case of consecutive reads, use readNext (it uses a better buffer)
func (q *QLogFile) readProbeLine(position int64) (string, int64, int64, error) {
// First of all, we should read a buffer that will include the query log line
// In order to do this, we'll define the boundaries
seekPosition := int64(0)
relativePos := position // position relative to the buffer we're going to read
if (position - maxEntrySize) > 0 {
seekPosition = position - maxEntrySize
relativePos = maxEntrySize
}
// Seek to this position
_, err := q.file.Seek(seekPosition, io.SeekStart)
if err != nil {
return "", 0, 0, err
}
// The buffer size is 2*maxEntrySize
buffer := make([]byte, maxEntrySize*2)
bufferLen, err := q.file.Read(buffer)
if err != nil {
return "", 0, 0, err
}
// Now start looking for the new line character starting
// from the relativePos and going left
var startLine = int64(0)
for i := relativePos - 1; i >= 0; i-- {
if buffer[i] == '\n' {
startLine = i + 1
break
}
}
// Looking for the end of line now
var endLine = int64(bufferLen)
lineEndIdx := endLine + seekPosition
for i := relativePos; i < int64(bufferLen); i++ {
if buffer[i] == '\n' {
endLine = i
lineEndIdx = endLine + seekPosition + 1
break
}
}
// Finally we can return the string we were looking for
lineIdx := startLine + seekPosition
return string(buffer[startLine:endLine]), lineIdx, lineEndIdx, nil
}
// readQLogTimestamp reads the timestamp field from the query log line
func readQLogTimestamp(str string) int64 {
val := readJSONValue(str, "T")
if len(val) == 0 {
val = readJSONValue(str, "Time")
}
if len(val) == 0 {
log.Error("Couldn't find timestamp: %s", str)
return 0
}
tm, err := time.Parse(time.RFC3339Nano, val)
if err != nil {
log.Error("Couldn't parse timestamp: %s", val)
return 0
}
return tm.UnixNano()
}