diff --git a/querylog/qlog_file.go b/querylog/qlog_file.go index 63557098..14ae2b32 100644 --- a/querylog/qlog_file.go +++ b/querylog/qlog_file.go @@ -11,10 +11,18 @@ import ( "github.com/pkg/errors" ) +// ErrSeekNotFound is returned from the Seek method +// if we failed to find the desired record var ErrSeekNotFound = errors.New("Seek not found the record") const bufferSize = 256 * 1024 // 256 KB is the buffer size +// QLogFile represents a single query log file +// It allows reading from the file in the reverse order +// +// Please note that this is a stateful object. +// Internally, it contains a pointer to a specific position in the file, +// and it reads lines in reverse order starting from that position. type QLogFile struct { file *os.File // the query log file position int64 // current position in the file @@ -40,7 +48,8 @@ func NewQLogFile(path string) (*QLogFile, error) { } // Seek performs binary search in the query log file looking for a record -// with the specified timestamp. +// with the specified timestamp. Once the record is found, it sets +// "position" so that the next ReadNext call returned that record. // // The algorithm is rather simple: // 1. It starts with the position in the middle of a file @@ -86,7 +95,7 @@ func (q *QLogFile) Seek(timestamp uint64) (int64, error) { } // Get the timestamp from the query log record - ts := q.readTimestamp(line) + ts := readQLogTimestamp(line) if ts == 0 { return 0, ErrSeekNotFound @@ -130,6 +139,9 @@ func (q *QLogFile) Seek(timestamp uint64) (int64, error) { // SeekStart changes the current position to the end of the file // Please note that we're reading query log in the reverse order // and that's why log start is actually the end of file +// +// Returns nil if we were able to change the current position. +// Returns error in any other case. func (q *QLogFile) SeekStart() (int64, error) { q.lock.Lock() defer q.lock.Unlock() @@ -292,20 +304,20 @@ func (q *QLogFile) readProbeLine(position int64) (string, int64, error) { return string(buffer[startLine:endLine]), lineIdx, nil } -// readTimestamp reads the timestamp field from the query log line -func (q *QLogFile) readTimestamp(str string) uint64 { +// readQLogTimestamp reads the timestamp field from the query log line +func readQLogTimestamp(str string) uint64 { val := readJSONValue(str, "T") if len(val) == 0 { val = readJSONValue(str, "Time") } if len(val) == 0 { - log.Error("Couldn't find timestamp in %s: %s", q.file.Name(), str) + log.Error("Couldn't find timestamp: %s", str) return 0 } - tm, err := time.Parse(time.RFC3339, val) + tm, err := time.Parse(time.RFC3339Nano, val) if err != nil { - log.Error("Couldn't parse timestamp in %s: %s", q.file.Name(), val) + log.Error("Couldn't parse timestamp: %s", val) return 0 } return uint64(tm.UnixNano()) diff --git a/querylog/qlog_file_test.go b/querylog/qlog_file_test.go index e9bc70e8..2ba6f38d 100644 --- a/querylog/qlog_file_test.go +++ b/querylog/qlog_file_test.go @@ -22,6 +22,7 @@ func TestQLogFileEmpty(t *testing.T) { q, err := NewQLogFile(testFile) assert.Nil(t, err) assert.NotNil(t, q) + defer q.Close() // seek to the start pos, err := q.SeekStart() @@ -46,6 +47,7 @@ func TestQLogFileLarge(t *testing.T) { q, err := NewQLogFile(testFile) assert.Nil(t, err) assert.NotNil(t, q) + defer q.Close() // seek to the start pos, err := q.SeekStart() @@ -78,18 +80,19 @@ func TestQLogFileSeekLargeFile(t *testing.T) { q, err := NewQLogFile(testFile) assert.Nil(t, err) assert.NotNil(t, q) + defer q.Close() // CASE 1: NOT TOO OLD LINE - testSeekLine(t, q, 300) + testSeekLineQLogFile(t, q, 300) // CASE 2: OLD LINE - testSeekLine(t, q, count-300) + testSeekLineQLogFile(t, q, count-300) // CASE 3: FIRST LINE - testSeekLine(t, q, 0) + testSeekLineQLogFile(t, q, 0) // CASE 4: LAST LINE - testSeekLine(t, q, count) + testSeekLineQLogFile(t, q, count) // CASE 5: Seek non-existent (too low) _, err = q.Seek(123) @@ -113,18 +116,19 @@ func TestQLogFileSeekSmallFile(t *testing.T) { q, err := NewQLogFile(testFile) assert.Nil(t, err) assert.NotNil(t, q) + defer q.Close() // CASE 1: NOT TOO OLD LINE - testSeekLine(t, q, 2) + testSeekLineQLogFile(t, q, 2) // CASE 2: OLD LINE - testSeekLine(t, q, count-2) + testSeekLineQLogFile(t, q, count-2) // CASE 3: FIRST LINE - testSeekLine(t, q, 0) + testSeekLineQLogFile(t, q, 0) // CASE 4: LAST LINE - testSeekLine(t, q, count) + testSeekLineQLogFile(t, q, count) // CASE 5: Seek non-existent (too low) _, err = q.Seek(123) @@ -136,10 +140,10 @@ func TestQLogFileSeekSmallFile(t *testing.T) { assert.NotNil(t, err) } -func testSeekLine(t *testing.T, q *QLogFile, lineNumber int) { - line, err := getQLogLine(q, lineNumber) +func testSeekLineQLogFile(t *testing.T, q *QLogFile, lineNumber int) { + line, err := getQLogFileLine(q, lineNumber) assert.Nil(t, err) - ts := q.readTimestamp(line) + ts := readQLogTimestamp(line) assert.NotEqual(t, uint64(0), ts) // try seeking to that line now @@ -152,7 +156,7 @@ func testSeekLine(t *testing.T, q *QLogFile, lineNumber int) { assert.Equal(t, line, testLine) } -func getQLogLine(q *QLogFile, lineNumber int) (string, error) { +func getQLogFileLine(q *QLogFile, lineNumber int) (string, error) { _, err := q.SeekStart() if err != nil { return "", err @@ -177,6 +181,7 @@ func TestQLogFile(t *testing.T) { q, err := NewQLogFile(testFile) assert.Nil(t, err) assert.NotNil(t, q) + defer q.Close() // seek to the start pos, err := q.SeekStart() @@ -206,27 +211,37 @@ func TestQLogFile(t *testing.T) { // prepareTestFile - prepares a test query log file with the specified number of lines func prepareTestFile(dir string, linesCount int) string { + return prepareTestFiles(dir, 1, linesCount)[0] +} + +// prepareTestFiles - prepares several test query log files +// each of them -- with the specified linesCount +func prepareTestFiles(dir string, filesCount, linesCount int) []string { format := `{"IP":"${IP}","T":"${TIMESTAMP}","QH":"example.org","QT":"A","QC":"IN","Answer":"AAAAAAABAAEAAAAAB2V4YW1wbGUDb3JnAAABAAEHZXhhbXBsZQNvcmcAAAEAAQAAAAAABAECAwQ=","Result":{},"Elapsed":0,"Upstream":"upstream"}` lineTime, _ := time.Parse(time.RFC3339Nano, "2020-02-18T22:36:35.920973+03:00") lineIP := uint32(0) - f, _ := ioutil.TempFile(dir, "*.txt") + files := make([]string, 0) + for j := 0; j < filesCount; j++ { + f, _ := ioutil.TempFile(dir, "*.txt") + files = append(files, f.Name()) - for i := 0; i < linesCount; i++ { - lineIP += 1 - lineTime = lineTime.Add(time.Second) + for i := 0; i < linesCount; i++ { + lineIP += 1 + lineTime = lineTime.Add(time.Second) - ip := make(net.IP, 4) - binary.BigEndian.PutUint32(ip, lineIP) + ip := make(net.IP, 4) + binary.BigEndian.PutUint32(ip, lineIP) - line := format - line = strings.ReplaceAll(line, "${IP}", ip.String()) - line = strings.ReplaceAll(line, "${TIMESTAMP}", lineTime.Format(time.RFC3339Nano)) + line := format + line = strings.ReplaceAll(line, "${IP}", ip.String()) + line = strings.ReplaceAll(line, "${TIMESTAMP}", lineTime.Format(time.RFC3339Nano)) - _, _ = f.WriteString(line) - _, _ = f.WriteString("\n") + _, _ = f.WriteString(line) + _, _ = f.WriteString("\n") + } } - return f.Name() + return files } diff --git a/querylog/qlog_reader.go b/querylog/qlog_reader.go new file mode 100644 index 00000000..6b081cc2 --- /dev/null +++ b/querylog/qlog_reader.go @@ -0,0 +1,139 @@ +package querylog + +import ( + "io" + + "github.com/joomcode/errorx" +) + +// QLogReader allows reading from multiple query log files in the reverse order. +// +// Please note that this is a stateful object. +// Internally, it contains a pointer to a particular query log file, and +// to a specific position in this file, and it reads lines in reverse order +// starting from that position. +type QLogReader struct { + // qFiles - array with the query log files + // The order is - from oldest to newest + qFiles []*QLogFile + + currentFile int // Index of the current file +} + +// NewQLogReader initializes a QLogReader instance +// with the specified files +func NewQLogReader(files []string) (*QLogReader, error) { + qFiles := make([]*QLogFile, 0) + + for _, f := range files { + q, err := NewQLogFile(f) + if err != nil { + // Close what we've already opened + _ = closeQFiles(qFiles) + return nil, err + } + + qFiles = append(qFiles, q) + } + + return &QLogReader{ + qFiles: qFiles, + currentFile: (len(qFiles) - 1), + }, nil +} + +// Seek performs binary search of a query log record with the specified timestamp. +// If the record is found, it sets QLogReader's position to point to that line, +// so that the next ReadNext call returned this line. +// +// Returns nil if the record is successfully found. +// Returns an error if for some reason we could not find a record with the specified timestamp. +func (r *QLogReader) Seek(timestamp uint64) error { + for i := len(r.qFiles) - 1; i >= 0; i-- { + q := r.qFiles[i] + _, err := q.Seek(timestamp) + if err == nil { + // Our search is finished, we found the element we were looking for + // Update currentFile only, position is already set properly in the QLogFile + r.currentFile = i + return nil + } + } + + return ErrSeekNotFound +} + +// SeekStart changes the current position to the end of the newest file +// Please note that we're reading query log in the reverse order +// and that's why log start is actually the end of file +// +// Returns nil if we were able to change the current position. +// Returns error in any other case. +func (r *QLogReader) SeekStart() error { + if len(r.qFiles) == 0 { + return nil + } + + r.currentFile = len(r.qFiles) - 1 + _, err := r.qFiles[r.currentFile].SeekStart() + return err +} + +// ReadNext reads the next line (in the reverse order) from the query log files. +// and shifts the current position left to the next (actually prev) line (or the next file). +// returns io.EOF if there's nothing to read more. +func (r *QLogReader) ReadNext() (string, error) { + if len(r.qFiles) == 0 { + return "", io.EOF + } + + for r.currentFile >= 0 { + q := r.qFiles[r.currentFile] + line, err := q.ReadNext() + if err != nil { + // Shift to the older file + r.currentFile-- + if r.currentFile < 0 { + break + } + + q = r.qFiles[r.currentFile] + + // Set it's position to the start right away + _, err = q.SeekStart() + + // This is unexpected, return an error right away + if err != nil { + return "", err + } + } else { + return line, nil + } + } + + // Nothing to read anymore + return "", io.EOF +} + +// Close closes the QLogReader +func (r *QLogReader) Close() error { + return closeQFiles(r.qFiles) +} + +// closeQFiles - helper method to close multiple QLogFile instances +func closeQFiles(qFiles []*QLogFile) error { + var errs []error + + for _, q := range qFiles { + err := q.Close() + if err != nil { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return errorx.DecorateMany("Error while closing QLogReader", errs...) + } + + return nil +} diff --git a/querylog/qlog_reader_test.go b/querylog/qlog_reader_test.go new file mode 100644 index 00000000..592ab4b9 --- /dev/null +++ b/querylog/qlog_reader_test.go @@ -0,0 +1,157 @@ +package querylog + +import ( + "io" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestQLogReaderEmpty(t *testing.T) { + r, err := NewQLogReader([]string{}) + assert.Nil(t, err) + assert.NotNil(t, r) + defer r.Close() + + // seek to the start + err = r.SeekStart() + assert.Nil(t, err) + + line, err := r.ReadNext() + assert.Equal(t, "", line) + assert.Equal(t, io.EOF, err) +} + +func TestQLogReaderOneFile(t *testing.T) { + // let's do one small file + count := 10 + filesCount := 1 + + testDir := prepareTestDir() + defer func() { _ = os.RemoveAll(testDir) }() + testFiles := prepareTestFiles(testDir, filesCount, count) + + r, err := NewQLogReader(testFiles) + assert.Nil(t, err) + assert.NotNil(t, r) + defer r.Close() + + // seek to the start + err = r.SeekStart() + assert.Nil(t, err) + + // read everything + read := 0 + var line string + for err == nil { + line, err = r.ReadNext() + if err == nil { + assert.True(t, len(line) > 0) + read += 1 + } + } + + assert.Equal(t, count*filesCount, read) + assert.Equal(t, io.EOF, err) +} + +func TestQLogReaderMultipleFiles(t *testing.T) { + // should be large enough + count := 10000 + filesCount := 5 + + testDir := prepareTestDir() + defer func() { _ = os.RemoveAll(testDir) }() + testFiles := prepareTestFiles(testDir, filesCount, count) + + r, err := NewQLogReader(testFiles) + assert.Nil(t, err) + assert.NotNil(t, r) + defer r.Close() + + // seek to the start + err = r.SeekStart() + assert.Nil(t, err) + + // read everything + read := 0 + var line string + for err == nil { + line, err = r.ReadNext() + if err == nil { + assert.True(t, len(line) > 0) + read += 1 + } + } + + assert.Equal(t, count*filesCount, read) + assert.Equal(t, io.EOF, err) +} + +func TestQLogReaderSeek(t *testing.T) { + // more or less big file + count := 10000 + filesCount := 2 + + testDir := prepareTestDir() + defer func() { _ = os.RemoveAll(testDir) }() + testFiles := prepareTestFiles(testDir, filesCount, count) + + r, err := NewQLogReader(testFiles) + assert.Nil(t, err) + assert.NotNil(t, r) + defer r.Close() + + // CASE 1: NOT TOO OLD LINE + testSeekLineQLogReader(t, r, 300) + + // CASE 2: OLD LINE + testSeekLineQLogReader(t, r, count-300) + + // CASE 3: FIRST LINE + testSeekLineQLogReader(t, r, 0) + + // CASE 4: LAST LINE + testSeekLineQLogReader(t, r, count) + + // CASE 5: Seek non-existent (too low) + err = r.Seek(123) + assert.NotNil(t, err) + + // CASE 6: Seek non-existent (too high) + ts, _ := time.Parse(time.RFC3339, "2100-01-02T15:04:05Z07:00") + err = r.Seek(uint64(ts.UnixNano())) + assert.NotNil(t, err) +} + +func testSeekLineQLogReader(t *testing.T, r *QLogReader, lineNumber int) { + line, err := getQLogReaderLine(r, lineNumber) + assert.Nil(t, err) + ts := readQLogTimestamp(line) + assert.NotEqual(t, uint64(0), ts) + + // try seeking to that line now + err = r.Seek(ts) + assert.Nil(t, err) + + testLine, err := r.ReadNext() + assert.Nil(t, err) + assert.Equal(t, line, testLine) +} + +func getQLogReaderLine(r *QLogReader, lineNumber int) (string, error) { + err := r.SeekStart() + if err != nil { + return "", err + } + + for i := 1; i < lineNumber; i++ { + _, err := r.ReadNext() + if err != nil { + return "", err + } + } + return r.ReadNext() +} diff --git a/querylog/querylog_file.go b/querylog/querylog_file.go index 3cf2ae9e..e5544fc4 100644 --- a/querylog/querylog_file.go +++ b/querylog/querylog_file.go @@ -3,7 +3,6 @@ package querylog import ( "bufio" "bytes" - "compress/gzip" "encoding/base64" "encoding/json" "io" @@ -17,8 +16,6 @@ import ( "github.com/miekg/dns" ) -const enableGzip = false - // TODO: Check this when we append a new line -- we don't want to have a line longer than this const maxEntrySize = 1000 @@ -70,29 +67,7 @@ func (l *queryLog) flushToFile(buffer []*logEntry) error { var err error var zb bytes.Buffer filename := l.logFile - - // gzip enabled? - if enableGzip { - filename += ".gz" - - zw := gzip.NewWriter(&zb) - zw.Name = l.logFile - zw.ModTime = time.Now() - - _, err = zw.Write(b.Bytes()) - if err != nil { - log.Error("Couldn't compress to gzip: %s", err) - zw.Close() - return err - } - - if err = zw.Close(); err != nil { - log.Error("Couldn't close gzip writer: %s", err) - return err - } - } else { - zb = b - } + zb = b l.fileWriteLock.Lock() defer l.fileWriteLock.Unlock() @@ -118,11 +93,6 @@ func (l *queryLog) rotate() error { from := l.logFile to := l.logFile + ".1" - if enableGzip { - from = l.logFile + ".gz" - to = l.logFile + ".gz.1" - } - if _, err := os.Stat(from); os.IsNotExist(err) { // do nothing, file doesn't exist return nil @@ -135,7 +105,6 @@ func (l *queryLog) rotate() error { } log.Debug("Rotated from %s to %s successfully", from, to) - return nil }