2019-08-26 08:54:38 +00:00
package querylog
2018-12-05 11:03:41 +00:00
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"os"
"sync"
"time"
2019-02-25 13:44:22 +00:00
"github.com/AdguardTeam/golibs/log"
2018-12-05 11:03:41 +00:00
"github.com/go-test/deep"
2019-08-26 08:54:38 +00:00
"github.com/miekg/dns"
2018-12-05 11:03:41 +00:00
)
var (
fileWriteLock sync . Mutex
)
const enableGzip = false
2019-02-11 11:22:36 +00:00
// flushLogBuffer flushes the current buffer to file and resets the current buffer
2019-05-15 10:11:36 +00:00
func ( l * queryLog ) flushLogBuffer ( fullFlush bool ) error {
l . fileFlushLock . Lock ( )
defer l . fileFlushLock . Unlock ( )
2019-02-10 17:47:43 +00:00
// flush remainder to file
l . logBufferLock . Lock ( )
2019-05-15 10:11:36 +00:00
needFlush := len ( l . logBuffer ) >= logBufferCap
if ! needFlush && ! fullFlush {
l . logBufferLock . Unlock ( )
return nil
}
2019-02-10 17:47:43 +00:00
flushBuffer := l . logBuffer
l . logBuffer = nil
2019-05-15 10:11:36 +00:00
l . flushPending = false
2019-02-10 17:47:43 +00:00
l . logBufferLock . Unlock ( )
err := l . flushToFile ( flushBuffer )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Saving querylog to file failed: %s" , err )
2019-02-10 17:47:43 +00:00
return err
}
return nil
}
// flushToFile saves the specified log entries to the query log file
func ( l * queryLog ) flushToFile ( buffer [ ] * logEntry ) error {
2018-12-05 11:03:41 +00:00
if len ( buffer ) == 0 {
2019-05-08 07:43:47 +00:00
log . Debug ( "querylog: there's nothing to write to a file" )
2018-12-05 11:03:41 +00:00
return nil
}
start := time . Now ( )
var b bytes . Buffer
e := json . NewEncoder ( & b )
for _ , entry := range buffer {
err := e . Encode ( entry )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Failed to marshal entry: %s" , err )
2018-12-05 11:03:41 +00:00
return err
}
}
elapsed := time . Since ( start )
2019-02-25 13:44:22 +00:00
log . Debug ( "%d elements serialized via json in %v: %d kB, %v/entry, %v/entry" , len ( buffer ) , elapsed , b . Len ( ) / 1024 , float64 ( b . Len ( ) ) / float64 ( len ( buffer ) ) , elapsed / time . Duration ( len ( buffer ) ) )
2018-12-05 11:03:41 +00:00
err := checkBuffer ( buffer , b )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "failed to check buffer: %s" , err )
2018-12-05 11:03:41 +00:00
return err
}
var zb bytes . Buffer
2019-02-10 17:47:43 +00:00
filename := l . logFile
2018-12-05 11:03:41 +00:00
// gzip enabled?
if enableGzip {
filename += ".gz"
zw := gzip . NewWriter ( & zb )
2019-02-10 17:47:43 +00:00
zw . Name = l . logFile
2018-12-05 11:03:41 +00:00
zw . ModTime = time . Now ( )
_ , err = zw . Write ( b . Bytes ( ) )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Couldn't compress to gzip: %s" , err )
2018-12-05 11:03:41 +00:00
zw . Close ( )
return err
}
if err = zw . Close ( ) ; err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Couldn't close gzip writer: %s" , err )
2018-12-05 11:03:41 +00:00
return err
}
} else {
zb = b
}
fileWriteLock . Lock ( )
defer fileWriteLock . Unlock ( )
f , err := os . OpenFile ( filename , os . O_WRONLY | os . O_CREATE | os . O_APPEND , 0644 )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "failed to create file \"%s\": %s" , filename , err )
2018-12-05 11:03:41 +00:00
return err
}
defer f . Close ( )
n , err := f . Write ( zb . Bytes ( ) )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Couldn't write to file: %s" , err )
2018-12-05 11:03:41 +00:00
return err
}
2019-02-25 13:44:22 +00:00
log . Debug ( "ok \"%s\": %v bytes written" , filename , n )
2018-12-05 11:03:41 +00:00
return nil
}
func checkBuffer ( buffer [ ] * logEntry , b bytes . Buffer ) error {
l := len ( buffer )
d := json . NewDecoder ( & b )
i := 0
for d . More ( ) {
entry := & logEntry { }
err := d . Decode ( entry )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Failed to decode: %s" , err )
2018-12-05 11:03:41 +00:00
return err
}
if diff := deep . Equal ( entry , buffer [ i ] ) ; diff != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "decoded buffer differs: %s" , diff )
2018-12-05 11:03:41 +00:00
return fmt . Errorf ( "decoded buffer differs: %s" , diff )
}
i ++
}
if i != l {
err := fmt . Errorf ( "check fail: %d vs %d entries" , l , i )
2019-02-25 13:44:22 +00:00
log . Error ( "%v" , err )
2018-12-05 11:03:41 +00:00
return err
}
2019-02-25 13:44:22 +00:00
log . Debug ( "check ok: %d entries" , i )
2018-12-05 11:03:41 +00:00
return nil
}
2019-02-10 17:47:43 +00:00
func ( l * queryLog ) rotateQueryLog ( ) error {
from := l . logFile
to := l . logFile + ".1"
2018-12-05 11:03:41 +00:00
if enableGzip {
2019-02-10 17:47:43 +00:00
from = l . logFile + ".gz"
to = l . logFile + ".gz.1"
2018-12-05 11:03:41 +00:00
}
if _ , err := os . Stat ( from ) ; os . IsNotExist ( err ) {
// do nothing, file doesn't exist
return nil
}
err := os . Rename ( from , to )
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Failed to rename querylog: %s" , err )
2018-12-05 11:03:41 +00:00
return err
}
2019-02-25 13:44:22 +00:00
log . Debug ( "Rotated from %s to %s successfully" , from , to )
2018-12-05 11:03:41 +00:00
return nil
}
2019-02-10 17:47:43 +00:00
func ( l * queryLog ) periodicQueryLogRotate ( ) {
2019-08-26 08:54:38 +00:00
for range time . Tick ( time . Duration ( l . conf . Interval ) * time . Hour ) {
2019-02-10 17:47:43 +00:00
err := l . rotateQueryLog ( )
2018-12-05 11:03:41 +00:00
if err != nil {
2019-02-25 13:44:22 +00:00
log . Error ( "Failed to rotate querylog: %s" , err )
2018-12-05 11:03:41 +00:00
// do nothing, continue rotating
}
}
}
2019-08-26 08:54:38 +00:00
// Reader is the DB reader context
type Reader struct {
f * os . File
jd * json . Decoder
now time . Time
ql * queryLog
files [ ] string
ifile int
count uint64 // returned elements counter
}
// OpenReader locks the file and returns reader object or nil on error
func ( l * queryLog ) OpenReader ( ) * Reader {
r := Reader { }
r . ql = l
r . now = time . Now ( )
return & r
}
// Close closes the reader
func ( r * Reader ) Close ( ) {
elapsed := time . Since ( r . now )
var perunit time . Duration
if r . count > 0 {
perunit = elapsed / time . Duration ( r . count )
}
log . Debug ( "querylog: read %d entries in %v, %v/entry" ,
r . count , elapsed , perunit )
if r . f != nil {
r . f . Close ( )
}
}
// BeginRead starts reading
func ( r * Reader ) BeginRead ( ) {
r . files = [ ] string {
r . ql . logFile ,
r . ql . logFile + ".1" ,
}
}
// Next returns the next entry or nil if reading is finished
func ( r * Reader ) Next ( ) * logEntry { // nolint
var err error
for {
// open file if needed
if r . f == nil {
if r . ifile == len ( r . files ) {
return nil
}
fn := r . files [ r . ifile ]
r . f , err = os . Open ( fn )
if err != nil {
log . Error ( "Failed to open file \"%s\": %s" , fn , err )
r . ifile ++
continue
}
}
// open decoder if needed
if r . jd == nil {
r . jd = json . NewDecoder ( r . f )
}
// check if there's data
if ! r . jd . More ( ) {
r . jd = nil
r . f . Close ( )
r . f = nil
r . ifile ++
continue
}
// read data
var entry logEntry
err = r . jd . Decode ( & entry )
if err != nil {
log . Error ( "Failed to decode: %s" , err )
// next entry can be fine, try more
continue
}
r . count ++
return & entry
}
}
// Total returns the total number of items
func ( r * Reader ) Total ( ) int {
return 0
}
// Fill cache from file
func ( l * queryLog ) fillFromFile ( ) {
now := time . Now ( )
validFrom := now . Unix ( ) - int64 ( l . conf . Interval * 60 * 60 )
r := l . OpenReader ( )
if r == nil {
return
}
r . BeginRead ( )
for {
entry := r . Next ( )
if entry == nil {
break
}
if entry . Time . Unix ( ) < validFrom {
continue
}
if len ( entry . Question ) == 0 {
log . Printf ( "entry question is absent, skipping" )
continue
}
if entry . Time . After ( now ) {
log . Printf ( "t %v vs %v is in the future, ignoring" , entry . Time , now )
continue
}
q := new ( dns . Msg )
if err := q . Unpack ( entry . Question ) ; err != nil {
log . Printf ( "failed to unpack dns message question: %s" , err )
continue
}
if len ( q . Question ) != 1 {
log . Printf ( "malformed dns message, has no questions, skipping" )
continue
}
l . lock . Lock ( )
l . cache = append ( l . cache , entry )
if len ( l . cache ) > queryLogSize {
toremove := len ( l . cache ) - queryLogSize
l . cache = l . cache [ toremove : ]
}
l . lock . Unlock ( )
}
r . Close ( )
}