*(dnsforward): qlogreader implementation

This commit is contained in:
Andrey Meshkov 2020-02-20 21:12:51 +03:00
parent 712023112d
commit 90f2c18353
5 changed files with 355 additions and 63 deletions

View File

@ -11,10 +11,18 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// ErrSeekNotFound is returned from the Seek method
// if we failed to find the desired record
var ErrSeekNotFound = errors.New("Seek not found the record") var ErrSeekNotFound = errors.New("Seek not found the record")
const bufferSize = 256 * 1024 // 256 KB is the buffer size const bufferSize = 256 * 1024 // 256 KB is the buffer size
// QLogFile represents a single query log file
// It allows reading from the file in the reverse order
//
// Please note that this is a stateful object.
// Internally, it contains a pointer to a specific position in the file,
// and it reads lines in reverse order starting from that position.
type QLogFile struct { type QLogFile struct {
file *os.File // the query log file file *os.File // the query log file
position int64 // current position in the file position int64 // current position in the file
@ -40,7 +48,8 @@ func NewQLogFile(path string) (*QLogFile, error) {
} }
// Seek performs binary search in the query log file looking for a record // Seek performs binary search in the query log file looking for a record
// with the specified timestamp. // with the specified timestamp. Once the record is found, it sets
// "position" so that the next ReadNext call returned that record.
// //
// The algorithm is rather simple: // The algorithm is rather simple:
// 1. It starts with the position in the middle of a file // 1. It starts with the position in the middle of a file
@ -86,7 +95,7 @@ func (q *QLogFile) Seek(timestamp uint64) (int64, error) {
} }
// Get the timestamp from the query log record // Get the timestamp from the query log record
ts := q.readTimestamp(line) ts := readQLogTimestamp(line)
if ts == 0 { if ts == 0 {
return 0, ErrSeekNotFound return 0, ErrSeekNotFound
@ -130,6 +139,9 @@ func (q *QLogFile) Seek(timestamp uint64) (int64, error) {
// SeekStart changes the current position to the end of the file // SeekStart changes the current position to the end of the file
// Please note that we're reading query log in the reverse order // Please note that we're reading query log in the reverse order
// and that's why log start is actually the end of file // and that's why log start is actually the end of file
//
// Returns nil if we were able to change the current position.
// Returns error in any other case.
func (q *QLogFile) SeekStart() (int64, error) { func (q *QLogFile) SeekStart() (int64, error) {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
@ -292,20 +304,20 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, error) {
return string(buffer[startLine:endLine]), lineIdx, nil return string(buffer[startLine:endLine]), lineIdx, nil
} }
// readTimestamp reads the timestamp field from the query log line // readQLogTimestamp reads the timestamp field from the query log line
func (q *QLogFile) readTimestamp(str string) uint64 { func readQLogTimestamp(str string) uint64 {
val := readJSONValue(str, "T") val := readJSONValue(str, "T")
if len(val) == 0 { if len(val) == 0 {
val = readJSONValue(str, "Time") val = readJSONValue(str, "Time")
} }
if len(val) == 0 { if len(val) == 0 {
log.Error("Couldn't find timestamp in %s: %s", q.file.Name(), str) log.Error("Couldn't find timestamp: %s", str)
return 0 return 0
} }
tm, err := time.Parse(time.RFC3339, val) tm, err := time.Parse(time.RFC3339Nano, val)
if err != nil { if err != nil {
log.Error("Couldn't parse timestamp in %s: %s", q.file.Name(), val) log.Error("Couldn't parse timestamp: %s", val)
return 0 return 0
} }
return uint64(tm.UnixNano()) return uint64(tm.UnixNano())

View File

@ -22,6 +22,7 @@ func TestQLogFileEmpty(t *testing.T) {
q, err := NewQLogFile(testFile) q, err := NewQLogFile(testFile)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, q) assert.NotNil(t, q)
defer q.Close()
// seek to the start // seek to the start
pos, err := q.SeekStart() pos, err := q.SeekStart()
@ -46,6 +47,7 @@ func TestQLogFileLarge(t *testing.T) {
q, err := NewQLogFile(testFile) q, err := NewQLogFile(testFile)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, q) assert.NotNil(t, q)
defer q.Close()
// seek to the start // seek to the start
pos, err := q.SeekStart() pos, err := q.SeekStart()
@ -78,18 +80,19 @@ func TestQLogFileSeekLargeFile(t *testing.T) {
q, err := NewQLogFile(testFile) q, err := NewQLogFile(testFile)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, q) assert.NotNil(t, q)
defer q.Close()
// CASE 1: NOT TOO OLD LINE // CASE 1: NOT TOO OLD LINE
testSeekLine(t, q, 300) testSeekLineQLogFile(t, q, 300)
// CASE 2: OLD LINE // CASE 2: OLD LINE
testSeekLine(t, q, count-300) testSeekLineQLogFile(t, q, count-300)
// CASE 3: FIRST LINE // CASE 3: FIRST LINE
testSeekLine(t, q, 0) testSeekLineQLogFile(t, q, 0)
// CASE 4: LAST LINE // CASE 4: LAST LINE
testSeekLine(t, q, count) testSeekLineQLogFile(t, q, count)
// CASE 5: Seek non-existent (too low) // CASE 5: Seek non-existent (too low)
_, err = q.Seek(123) _, err = q.Seek(123)
@ -113,18 +116,19 @@ func TestQLogFileSeekSmallFile(t *testing.T) {
q, err := NewQLogFile(testFile) q, err := NewQLogFile(testFile)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, q) assert.NotNil(t, q)
defer q.Close()
// CASE 1: NOT TOO OLD LINE // CASE 1: NOT TOO OLD LINE
testSeekLine(t, q, 2) testSeekLineQLogFile(t, q, 2)
// CASE 2: OLD LINE // CASE 2: OLD LINE
testSeekLine(t, q, count-2) testSeekLineQLogFile(t, q, count-2)
// CASE 3: FIRST LINE // CASE 3: FIRST LINE
testSeekLine(t, q, 0) testSeekLineQLogFile(t, q, 0)
// CASE 4: LAST LINE // CASE 4: LAST LINE
testSeekLine(t, q, count) testSeekLineQLogFile(t, q, count)
// CASE 5: Seek non-existent (too low) // CASE 5: Seek non-existent (too low)
_, err = q.Seek(123) _, err = q.Seek(123)
@ -136,10 +140,10 @@ func TestQLogFileSeekSmallFile(t *testing.T) {
assert.NotNil(t, err) assert.NotNil(t, err)
} }
func testSeekLine(t *testing.T, q *QLogFile, lineNumber int) { func testSeekLineQLogFile(t *testing.T, q *QLogFile, lineNumber int) {
line, err := getQLogLine(q, lineNumber) line, err := getQLogFileLine(q, lineNumber)
assert.Nil(t, err) assert.Nil(t, err)
ts := q.readTimestamp(line) ts := readQLogTimestamp(line)
assert.NotEqual(t, uint64(0), ts) assert.NotEqual(t, uint64(0), ts)
// try seeking to that line now // try seeking to that line now
@ -152,7 +156,7 @@ func testSeekLine(t *testing.T, q *QLogFile, lineNumber int) {
assert.Equal(t, line, testLine) assert.Equal(t, line, testLine)
} }
func getQLogLine(q *QLogFile, lineNumber int) (string, error) { func getQLogFileLine(q *QLogFile, lineNumber int) (string, error) {
_, err := q.SeekStart() _, err := q.SeekStart()
if err != nil { if err != nil {
return "", err return "", err
@ -177,6 +181,7 @@ func TestQLogFile(t *testing.T) {
q, err := NewQLogFile(testFile) q, err := NewQLogFile(testFile)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, q) assert.NotNil(t, q)
defer q.Close()
// seek to the start // seek to the start
pos, err := q.SeekStart() pos, err := q.SeekStart()
@ -206,12 +211,21 @@ func TestQLogFile(t *testing.T) {
// prepareTestFile - prepares a test query log file with the specified number of lines // prepareTestFile - prepares a test query log file with the specified number of lines
func prepareTestFile(dir string, linesCount int) string { func prepareTestFile(dir string, linesCount int) string {
return prepareTestFiles(dir, 1, linesCount)[0]
}
// prepareTestFiles - prepares several test query log files
// each of them -- with the specified linesCount
func prepareTestFiles(dir string, filesCount, linesCount int) []string {
format := `{"IP":"${IP}","T":"${TIMESTAMP}","QH":"example.org","QT":"A","QC":"IN","Answer":"AAAAAAABAAEAAAAAB2V4YW1wbGUDb3JnAAABAAEHZXhhbXBsZQNvcmcAAAEAAQAAAAAABAECAwQ=","Result":{},"Elapsed":0,"Upstream":"upstream"}` format := `{"IP":"${IP}","T":"${TIMESTAMP}","QH":"example.org","QT":"A","QC":"IN","Answer":"AAAAAAABAAEAAAAAB2V4YW1wbGUDb3JnAAABAAEHZXhhbXBsZQNvcmcAAAEAAQAAAAAABAECAwQ=","Result":{},"Elapsed":0,"Upstream":"upstream"}`
lineTime, _ := time.Parse(time.RFC3339Nano, "2020-02-18T22:36:35.920973+03:00") lineTime, _ := time.Parse(time.RFC3339Nano, "2020-02-18T22:36:35.920973+03:00")
lineIP := uint32(0) lineIP := uint32(0)
files := make([]string, 0)
for j := 0; j < filesCount; j++ {
f, _ := ioutil.TempFile(dir, "*.txt") f, _ := ioutil.TempFile(dir, "*.txt")
files = append(files, f.Name())
for i := 0; i < linesCount; i++ { for i := 0; i < linesCount; i++ {
lineIP += 1 lineIP += 1
@ -227,6 +241,7 @@ func prepareTestFile(dir string, linesCount int) string {
_, _ = f.WriteString(line) _, _ = f.WriteString(line)
_, _ = f.WriteString("\n") _, _ = f.WriteString("\n")
} }
}
return f.Name()
return files
} }

139
querylog/qlog_reader.go Normal file
View File

@ -0,0 +1,139 @@
package querylog
import (
"io"
"github.com/joomcode/errorx"
)
// QLogReader allows reading from multiple query log files in the reverse order.
//
// Please note that this is a stateful object.
// Internally, it contains a pointer to a particular query log file, and
// to a specific position in this file, and it reads lines in reverse order
// starting from that position.
type QLogReader struct {
// qFiles - array with the query log files
// The order is - from oldest to newest
qFiles []*QLogFile
currentFile int // Index of the current file
}
// NewQLogReader initializes a QLogReader instance
// with the specified files
func NewQLogReader(files []string) (*QLogReader, error) {
qFiles := make([]*QLogFile, 0)
for _, f := range files {
q, err := NewQLogFile(f)
if err != nil {
// Close what we've already opened
_ = closeQFiles(qFiles)
return nil, err
}
qFiles = append(qFiles, q)
}
return &QLogReader{
qFiles: qFiles,
currentFile: (len(qFiles) - 1),
}, nil
}
// Seek performs binary search of a query log record with the specified timestamp.
// If the record is found, it sets QLogReader's position to point to that line,
// so that the next ReadNext call returned this line.
//
// Returns nil if the record is successfully found.
// Returns an error if for some reason we could not find a record with the specified timestamp.
func (r *QLogReader) Seek(timestamp uint64) error {
for i := len(r.qFiles) - 1; i >= 0; i-- {
q := r.qFiles[i]
_, err := q.Seek(timestamp)
if err == nil {
// Our search is finished, we found the element we were looking for
// Update currentFile only, position is already set properly in the QLogFile
r.currentFile = i
return nil
}
}
return ErrSeekNotFound
}
// SeekStart changes the current position to the end of the newest file
// Please note that we're reading query log in the reverse order
// and that's why log start is actually the end of file
//
// Returns nil if we were able to change the current position.
// Returns error in any other case.
func (r *QLogReader) SeekStart() error {
if len(r.qFiles) == 0 {
return nil
}
r.currentFile = len(r.qFiles) - 1
_, err := r.qFiles[r.currentFile].SeekStart()
return err
}
// ReadNext reads the next line (in the reverse order) from the query log files.
// and shifts the current position left to the next (actually prev) line (or the next file).
// returns io.EOF if there's nothing to read more.
func (r *QLogReader) ReadNext() (string, error) {
if len(r.qFiles) == 0 {
return "", io.EOF
}
for r.currentFile >= 0 {
q := r.qFiles[r.currentFile]
line, err := q.ReadNext()
if err != nil {
// Shift to the older file
r.currentFile--
if r.currentFile < 0 {
break
}
q = r.qFiles[r.currentFile]
// Set it's position to the start right away
_, err = q.SeekStart()
// This is unexpected, return an error right away
if err != nil {
return "", err
}
} else {
return line, nil
}
}
// Nothing to read anymore
return "", io.EOF
}
// Close closes the QLogReader
func (r *QLogReader) Close() error {
return closeQFiles(r.qFiles)
}
// closeQFiles - helper method to close multiple QLogFile instances
func closeQFiles(qFiles []*QLogFile) error {
var errs []error
for _, q := range qFiles {
err := q.Close()
if err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
return errorx.DecorateMany("Error while closing QLogReader", errs...)
}
return nil
}

View File

@ -0,0 +1,157 @@
package querylog
import (
"io"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestQLogReaderEmpty(t *testing.T) {
r, err := NewQLogReader([]string{})
assert.Nil(t, err)
assert.NotNil(t, r)
defer r.Close()
// seek to the start
err = r.SeekStart()
assert.Nil(t, err)
line, err := r.ReadNext()
assert.Equal(t, "", line)
assert.Equal(t, io.EOF, err)
}
func TestQLogReaderOneFile(t *testing.T) {
// let's do one small file
count := 10
filesCount := 1
testDir := prepareTestDir()
defer func() { _ = os.RemoveAll(testDir) }()
testFiles := prepareTestFiles(testDir, filesCount, count)
r, err := NewQLogReader(testFiles)
assert.Nil(t, err)
assert.NotNil(t, r)
defer r.Close()
// seek to the start
err = r.SeekStart()
assert.Nil(t, err)
// read everything
read := 0
var line string
for err == nil {
line, err = r.ReadNext()
if err == nil {
assert.True(t, len(line) > 0)
read += 1
}
}
assert.Equal(t, count*filesCount, read)
assert.Equal(t, io.EOF, err)
}
func TestQLogReaderMultipleFiles(t *testing.T) {
// should be large enough
count := 10000
filesCount := 5
testDir := prepareTestDir()
defer func() { _ = os.RemoveAll(testDir) }()
testFiles := prepareTestFiles(testDir, filesCount, count)
r, err := NewQLogReader(testFiles)
assert.Nil(t, err)
assert.NotNil(t, r)
defer r.Close()
// seek to the start
err = r.SeekStart()
assert.Nil(t, err)
// read everything
read := 0
var line string
for err == nil {
line, err = r.ReadNext()
if err == nil {
assert.True(t, len(line) > 0)
read += 1
}
}
assert.Equal(t, count*filesCount, read)
assert.Equal(t, io.EOF, err)
}
func TestQLogReaderSeek(t *testing.T) {
// more or less big file
count := 10000
filesCount := 2
testDir := prepareTestDir()
defer func() { _ = os.RemoveAll(testDir) }()
testFiles := prepareTestFiles(testDir, filesCount, count)
r, err := NewQLogReader(testFiles)
assert.Nil(t, err)
assert.NotNil(t, r)
defer r.Close()
// CASE 1: NOT TOO OLD LINE
testSeekLineQLogReader(t, r, 300)
// CASE 2: OLD LINE
testSeekLineQLogReader(t, r, count-300)
// CASE 3: FIRST LINE
testSeekLineQLogReader(t, r, 0)
// CASE 4: LAST LINE
testSeekLineQLogReader(t, r, count)
// CASE 5: Seek non-existent (too low)
err = r.Seek(123)
assert.NotNil(t, err)
// CASE 6: Seek non-existent (too high)
ts, _ := time.Parse(time.RFC3339, "2100-01-02T15:04:05Z07:00")
err = r.Seek(uint64(ts.UnixNano()))
assert.NotNil(t, err)
}
func testSeekLineQLogReader(t *testing.T, r *QLogReader, lineNumber int) {
line, err := getQLogReaderLine(r, lineNumber)
assert.Nil(t, err)
ts := readQLogTimestamp(line)
assert.NotEqual(t, uint64(0), ts)
// try seeking to that line now
err = r.Seek(ts)
assert.Nil(t, err)
testLine, err := r.ReadNext()
assert.Nil(t, err)
assert.Equal(t, line, testLine)
}
func getQLogReaderLine(r *QLogReader, lineNumber int) (string, error) {
err := r.SeekStart()
if err != nil {
return "", err
}
for i := 1; i < lineNumber; i++ {
_, err := r.ReadNext()
if err != nil {
return "", err
}
}
return r.ReadNext()
}

View File

@ -3,7 +3,6 @@ package querylog
import ( import (
"bufio" "bufio"
"bytes" "bytes"
"compress/gzip"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"io" "io"
@ -17,8 +16,6 @@ import (
"github.com/miekg/dns" "github.com/miekg/dns"
) )
const enableGzip = false
// TODO: Check this when we append a new line -- we don't want to have a line longer than this // TODO: Check this when we append a new line -- we don't want to have a line longer than this
const maxEntrySize = 1000 const maxEntrySize = 1000
@ -70,29 +67,7 @@ func (l *queryLog) flushToFile(buffer []*logEntry) error {
var err error var err error
var zb bytes.Buffer var zb bytes.Buffer
filename := l.logFile filename := l.logFile
// gzip enabled?
if enableGzip {
filename += ".gz"
zw := gzip.NewWriter(&zb)
zw.Name = l.logFile
zw.ModTime = time.Now()
_, err = zw.Write(b.Bytes())
if err != nil {
log.Error("Couldn't compress to gzip: %s", err)
zw.Close()
return err
}
if err = zw.Close(); err != nil {
log.Error("Couldn't close gzip writer: %s", err)
return err
}
} else {
zb = b zb = b
}
l.fileWriteLock.Lock() l.fileWriteLock.Lock()
defer l.fileWriteLock.Unlock() defer l.fileWriteLock.Unlock()
@ -118,11 +93,6 @@ func (l *queryLog) rotate() error {
from := l.logFile from := l.logFile
to := l.logFile + ".1" to := l.logFile + ".1"
if enableGzip {
from = l.logFile + ".gz"
to = l.logFile + ".gz.1"
}
if _, err := os.Stat(from); os.IsNotExist(err) { if _, err := os.Stat(from); os.IsNotExist(err) {
// do nothing, file doesn't exist // do nothing, file doesn't exist
return nil return nil
@ -135,7 +105,6 @@ func (l *queryLog) rotate() error {
} }
log.Debug("Rotated from %s to %s successfully", from, to) log.Debug("Rotated from %s to %s successfully", from, to)
return nil return nil
} }