From 9d5f201ddf20985599aed9bc486a5f01f3226cfc Mon Sep 17 00:00:00 2001 From: Andrey Meshkov Date: Tue, 18 Feb 2020 13:02:59 +0300 Subject: [PATCH] *(dnsforward): added QLogFile struct This struct is supposed to use an easier algorithm for scanning query log files. The end goal is allow us scanning multiple query log files. --- go.mod | 1 + querylog/qlog_file.go | 279 +++++++++++++++++++++++++++++++++++++ querylog/qlog_file_test.go | 59 ++++++++ querylog/querylog_file.go | 2 + 4 files changed, 341 insertions(+) create mode 100644 querylog/qlog_file.go create mode 100644 querylog/qlog_file_test.go diff --git a/go.mod b/go.mod index 28b2d1a4..18e8de6e 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/kardianos/service v0.0.0-20181115005516-4c239ee84e7b github.com/krolaw/dhcp4 v0.0.0-20180925202202-7cead472c414 github.com/miekg/dns v1.1.26 + github.com/pkg/errors v0.8.1 github.com/sparrc/go-ping v0.0.0-20181106165434-ef3ab45e41b0 github.com/stretchr/testify v1.4.0 go.etcd.io/bbolt v1.3.3 // indirect diff --git a/querylog/qlog_file.go b/querylog/qlog_file.go new file mode 100644 index 00000000..83f0d30d --- /dev/null +++ b/querylog/qlog_file.go @@ -0,0 +1,279 @@ +package querylog + +import ( + "io" + "os" + "sync" + "time" + + "github.com/pkg/errors" +) + +var ErrSeekNotFound = errors.New("Seek not found the record") + +const bufferSize = 64 * 1024 // 64 KB is the buffer size + +type QLogFile struct { + file *os.File // the query log file + position int64 // current position in the file + + buffer []byte // buffer that we've read from the file + bufferStart int64 // start of the buffer (in the file) + bufferLen int // buffer len + + lock sync.Mutex // We use mutex to make it thread-safe +} + +// NewQLogFile initializes a new instance of the QLogFile +func NewQLogFile(path string) (*QLogFile, error) { + f, err := os.OpenFile(path, os.O_RDONLY, 0644) + + if err != nil { + return nil, err + } + + return &QLogFile{ + file: f, + }, nil +} + +// Seek performs binary search in the query log file looking for a record +// with the specified timestamp. +// +// The algorithm is rather simple: +// 1. It starts with the position in the middle of a file +// 2. Shifts back to the beginning of the line +// 3. Checks the log record timestamp +// 4. If it is lower than the timestamp we are looking for, +// it shifts seek position to 3/4 of the file. Otherwise, to 1/4 of the file. +// 5. It performs the search again, every time the search scope is narrowed twice. +// +// It returns the position of the line with the timestamp we were looking for. +// If we could not find it, it returns 0 and ErrSeekNotFound +func (q *QLogFile) Seek(timestamp uint64) (int64, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // First of all, check the file size + fileInfo, err := q.file.Stat() + if err != nil { + return 0, err + } + + // Define the search scope + start := int64(0) + end := fileInfo.Size() + probe := (end - start) / 2 + + // Get the line + line, _, err := q.readProbeLine(probe) + if err != nil { + return 0, err + } + + // Get the timestamp from the query log record + ts := q.readTimestamp(line) + + if ts == timestamp { + // Hurray, returning the result + return probe, nil + } + + // Narrow the scope and repeat the search + if ts > timestamp { + end := probe + probe = (end - start) / 2 + } else { + start := probe + probe = (end - start) / 2 + } + + // TODO: temp + q.position = probe + + // TODO: Check start/stop/probe values and loop this + return 0, ErrSeekNotFound +} + +// SeekStart changes the current position to the end of the file +// Please note that we're reading query log in the reverse order +// and that's why log start is actually the end of file +func (q *QLogFile) SeekStart() (int64, error) { + q.lock.Lock() + defer q.lock.Unlock() + + // First of all, check the file size + fileInfo, err := q.file.Stat() + if err != nil { + return 0, err + } + + // Place the position to the very end of file + q.position = fileInfo.Size() - 1 + if q.position < 0 { + // TODO: test empty file + q.position = 0 + } + return q.position, nil +} + +// ReadNext reads the next line (in the reverse order) from the file +// and shifts the current position left to the next (actually prev) line. +// returns io.EOF if there's nothing to read more +func (q *QLogFile) ReadNext() (string, error) { + q.lock.Lock() + defer q.lock.Unlock() + + if q.position == 0 { + return "", io.EOF + } + + line, lineIdx, err := q.readNextLine(q.position) + if err != nil { + return "", err + } + + // Shift position + if lineIdx == 0 { + q.position = 0 + } else { + // there's usually a line break before the line + // so we should shift one more char left from the line + // line\nline + q.position = lineIdx - 1 + } + return line, err +} + +// Close frees the underlying resources +func (q *QLogFile) Close() error { + return q.file.Close() +} + +// readNextLine reads the next line from the specified position +// this line actually have to END on that position. +// +// the algorithm is: +// 1. check if we have the buffer initialized +// 2. if it is, scan it and look for the line there +// 3. if we cannot find the line there, read the prev chunk into the buffer +// 4. read the line from the buffer +func (q *QLogFile) readNextLine(position int64) (string, int64, error) { + relativePos := position - q.bufferStart + if q.buffer == nil || relativePos < maxEntrySize { + // Time to re-init the buffer + err := q.initBuffer(position) + if err != nil { + return "", 0, err + } + } + + // Look for the end of the prev line + // This is where we'll read from + var startLine = int64(0) + for i := relativePos - 1; i >= 0; i-- { + if q.buffer[i] == '\n' { + startLine = i + 1 + break + } + } + + line := string(q.buffer[startLine:relativePos]) + lineIdx := q.bufferStart + startLine + return line, lineIdx, nil +} + +// initBuffer initializes the QLogFile buffer. +// the goal is to read a chunk of file that includes the line with the specified position. +func (q *QLogFile) initBuffer(position int64) error { + q.bufferStart = int64(0) + if (position - bufferSize) > 0 { + q.bufferStart = position - bufferSize + } + + // Seek to this position + _, err := q.file.Seek(q.bufferStart, io.SeekStart) + if err != nil { + return err + } + + if q.buffer == nil { + q.buffer = make([]byte, bufferSize) + } + q.bufferLen, err = q.file.Read(q.buffer) + // TODO: validate bufferLen + if err != nil { + return err + } + + return nil +} + +// readProbeLine reads a line that includes the specified position +// this method is supposed to be used when we use binary search in the Seek method +// in the case of consecutive reads, use readNext (it uses a better buffer) +func (q *QLogFile) readProbeLine(position int64) (string, int64, error) { + // First of all, we should read a buffer that will include the query log line + // In order to do this, we'll define the boundaries + seekPosition := int64(0) + relativePos := position // position relative to the buffer we're going to read + if (position - maxEntrySize) > 0 { + // TODO: cover this case in tests + seekPosition = position - maxEntrySize + relativePos = maxEntrySize + } + + // Seek to this position + _, err := q.file.Seek(seekPosition, io.SeekStart) + if err != nil { + return "", 0, err + } + + // The buffer size is 2*maxEntrySize + buffer := make([]byte, maxEntrySize*2) + bufferLen, err := q.file.Read(buffer) + if err != nil { + return "", 0, err + } + + // Now start looking for the new line character starting + // from the relativePos and going left + var startLine = int64(0) + for i := relativePos - 1; i >= 0; i-- { + if buffer[i] == '\n' { + startLine = i + 1 + break + } + } + // Looking for the end of line now + var endLine = int64(bufferLen) + for i := relativePos; i < int64(bufferLen); i++ { + if buffer[i] == '\n' { + endLine = i + break + } + } + + // Finally we can return the string we were looking for + lineIdx := startLine + seekPosition + return string(buffer[startLine:endLine]), lineIdx, nil +} + +// readTimestamp reads the timestamp field from the query log line +func (q *QLogFile) readTimestamp(str string) uint64 { + val := readJSONValue(str, "T") + if len(val) == 0 { + val = readJSONValue(str, "Time") + } + + if len(val) == 0 { + // TODO: log + return 0 + } + tm, err := time.Parse(time.RFC3339, val) + if err != nil { + // TODO: log + return 0 + } + return uint64(tm.UnixNano()) +} diff --git a/querylog/qlog_file_test.go b/querylog/qlog_file_test.go new file mode 100644 index 00000000..3d6d2600 --- /dev/null +++ b/querylog/qlog_file_test.go @@ -0,0 +1,59 @@ +package querylog + +import ( + "io" + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +// Check adding and loading (with filtering) entries from disk and memory +func TestQLogFile(t *testing.T) { + conf := Config{ + Enabled: true, + Interval: 1, + MemSize: 100, + } + conf.BaseDir = prepareTestDir() + defer func() { _ = os.RemoveAll(conf.BaseDir) }() + l := newQueryLog(conf) + + // add disk entries + addEntry(l, "example.org", "1.2.3.4", "0.1.2.4") + addEntry(l, "example.org", "1.2.3.4", "0.1.2.5") + + // write to disk + _ = l.flushLogBuffer(true) + + // create the new QLogFile instance + q, err := NewQLogFile(l.logFile) + assert.Nil(t, err) + assert.NotNil(t, q) + + // seek to the start + pos, err := q.SeekStart() + assert.Nil(t, err) + assert.True(t, pos > 0) + + // read first line + line, err := q.ReadNext() + assert.Nil(t, err) + assert.True(t, strings.Contains(line, "0.1.2.5"), line) + assert.True(t, strings.HasPrefix(line, "{"), line) + assert.True(t, strings.HasSuffix(line, "}"), line) + + // read second line + line, err = q.ReadNext() + assert.Nil(t, err) + assert.Equal(t, int64(0), q.position) + assert.True(t, strings.Contains(line, "0.1.2.4"), line) + assert.True(t, strings.HasPrefix(line, "{"), line) + assert.True(t, strings.HasSuffix(line, "}"), line) + + // try reading again (there's nothing to read anymore) + line, err = q.ReadNext() + assert.Equal(t, io.EOF, err) + assert.Equal(t, "", line) +} diff --git a/querylog/querylog_file.go b/querylog/querylog_file.go index 02296a98..3cf2ae9e 100644 --- a/querylog/querylog_file.go +++ b/querylog/querylog_file.go @@ -18,6 +18,8 @@ import ( ) const enableGzip = false + +// TODO: Check this when we append a new line -- we don't want to have a line longer than this const maxEntrySize = 1000 // flushLogBuffer flushes the current buffer to file and resets the current buffer