251 lines
5.3 KiB
Go
251 lines
5.3 KiB
Go
package logfile
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"encoding/gob"
|
|
"encoding/json"
|
|
"net"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/AdguardTeam/AdGuardHome/internal/filtering"
|
|
"github.com/AdguardTeam/AdGuardHome/internal/querylog"
|
|
"github.com/flower-corp/rosedb"
|
|
"github.com/rs/xid"
|
|
)
|
|
|
|
var bp = sync.Pool{
|
|
New: func() any {
|
|
return new(bytes.Buffer)
|
|
},
|
|
}
|
|
|
|
func getBuf() *bytes.Buffer {
|
|
return bp.Get().(*bytes.Buffer)
|
|
}
|
|
|
|
func putBuf(b *bytes.Buffer) {
|
|
b.Reset()
|
|
bp.Put(b)
|
|
}
|
|
|
|
var magic = [8]byte{0xfe, 0xed, 0xbe, 0xef, 0x69, 0x00, 0x00, 0x00}
|
|
|
|
const BLOCK_SIZE = 1600
|
|
const BLOOM_SIZE_BITS = 500 * 64
|
|
const BLOOM_SIZE_BYTES = BLOOM_SIZE_BITS / 8
|
|
const BLOOM_SIZE_TOTAL = 8 + 8 + 8 + BLOOM_SIZE_BYTES
|
|
|
|
// represents a single log entry in storage
|
|
type LogEntry struct {
|
|
IP net.IP `json:"IP"` // Client IP
|
|
Time time.Time `json:"T"`
|
|
|
|
QHost string `json:"QH"`
|
|
QType string `json:"QT"`
|
|
QClass string `json:"QC"`
|
|
|
|
ClientID string `json:"CID,omitempty"`
|
|
ClientProto querylog.ClientProto `json:"CP"`
|
|
|
|
Answer []byte `json:",omitempty"` // sometimes empty answers happen like binerdunt.top or rev2.globalrootservers.net
|
|
OrigAnswer []byte `json:",omitempty"`
|
|
|
|
Result filtering.Result
|
|
Elapsed time.Duration
|
|
Upstream string `json:",omitempty"` // if empty, means it was cached
|
|
}
|
|
|
|
func (s *LogStorage) CompressLog(l *LogEntry) *compressedLogEntry {
|
|
o := &compressedLogEntry{}
|
|
o.IP = l.IP.String()
|
|
o.Time = l.Time.UnixNano()
|
|
o.Query, _ = s.shapes.Add([]byte(l.QClass + "%" + l.QHost + "%" + l.QType))
|
|
o.ClientWithProto, _ = s.shapes.Add([]byte(l.ClientID + "%" + string(l.ClientProto)))
|
|
o.Ans, _ = s.shapes.Add(l.Answer)
|
|
o.Oans, _ = s.shapes.Add(l.OrigAnswer)
|
|
o.Upstream, _ = s.shapes.Add([]byte(l.Upstream))
|
|
rb, _ := json.Marshal(l.Result)
|
|
o.Result, _ = s.shapes.Add(rb)
|
|
o.Elapsed = l.Elapsed
|
|
return o
|
|
}
|
|
|
|
func (s *LogStorage) DecompressLog(l *compressedLogEntry) *LogEntry {
|
|
o := &LogEntry{}
|
|
o.IP = net.ParseIP(l.IP)
|
|
o.Time = time.Unix(0, l.Time)
|
|
qstr, _ := s.shapes.Get(l.Query)
|
|
qsplt := strings.Split(string(qstr), "%")
|
|
if len(qsplt) > 2 {
|
|
o.QClass = qsplt[0]
|
|
o.QHost = qsplt[1]
|
|
o.QType = qsplt[2]
|
|
}
|
|
cstr, _ := s.shapes.Get(l.ClientWithProto)
|
|
csplt := strings.Split(string(cstr), "%")
|
|
if len(csplt) > 1 {
|
|
o.ClientID = csplt[0]
|
|
o.ClientProto = querylog.ClientProto(csplt[1])
|
|
}
|
|
o.Answer, _ = s.shapes.Get(l.Ans)
|
|
o.OrigAnswer, _ = s.shapes.Get(l.Oans)
|
|
upb, _ := s.shapes.Get(l.Upstream)
|
|
o.Upstream = string(upb)
|
|
rb, _ := s.shapes.Get(l.Result)
|
|
json.Unmarshal(rb, &o.Result)
|
|
o.Elapsed = l.Elapsed
|
|
return o
|
|
}
|
|
|
|
type compressedLogEntry struct {
|
|
IP string
|
|
Time int64
|
|
Query [4]byte
|
|
ClientWithProto [4]byte
|
|
Ans [4]byte
|
|
Oans [4]byte
|
|
Upstream [4]byte
|
|
Result [4]byte
|
|
Elapsed time.Duration
|
|
}
|
|
|
|
type LogStorage struct {
|
|
db *rosedb.RoseDB
|
|
shapes *ShapeDb
|
|
}
|
|
|
|
func New(dir string) (*LogStorage, error) {
|
|
out := &LogStorage{
|
|
shapes: new(ShapeDb),
|
|
}
|
|
var err error
|
|
os.MkdirAll(dir, 0644)
|
|
err = out.shapes.Open(path.Join(dir, "dict.db"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
do := rosedb.DefaultOptions(path.Join(dir, "data.db"))
|
|
do.Sync = true
|
|
do.IndexMode = rosedb.KeyValueMemMode
|
|
out.db, err = rosedb.Open(do)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (s *LogStorage) CompactEntry(l *LogEntry) []byte {
|
|
buf := new(bytes.Buffer)
|
|
return buf.Bytes()
|
|
}
|
|
|
|
func (s *LogStorage) Store(l *LogEntry) error {
|
|
id := xid.NewWithTime(l.Time)
|
|
s.db.Set(id.Bytes(), s.CompactEntry(l))
|
|
return nil
|
|
}
|
|
|
|
var zset_ts = []byte("z_ts")
|
|
|
|
func (s *LogStorage) All() ([]*LogEntry, error) {
|
|
out := []*LogEntry{}
|
|
total := s.db.ZCard(zset_ts)
|
|
bts, err := s.db.ZRange(zset_ts, 0, total)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, v := range bts {
|
|
var compact compressedLogEntry
|
|
err := gob.NewDecoder(bytes.NewBuffer(v)).Decode(&compact)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
out = append(out, s.DecompressLog(&compact))
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
type ShapeDb struct {
|
|
db *rosedb.RoseDB
|
|
}
|
|
|
|
func (b *ShapeDb) Open(name string) (err error) {
|
|
do := rosedb.DefaultOptions(path.Join(name))
|
|
do.Sync = true
|
|
do.IndexMode = rosedb.KeyValueMemMode
|
|
do.LogFileSizeThreshold = 16 << 20
|
|
b.db, err = rosedb.Open(do)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *ShapeDb) Add(k []byte) ([4]byte, error) {
|
|
o := [4]byte{}
|
|
if len(k) <= 4 {
|
|
for i, v := range k {
|
|
if i > 4 {
|
|
break
|
|
}
|
|
o[i] = v
|
|
}
|
|
return o, nil
|
|
}
|
|
h := hash(k)
|
|
_, err := b.db.Get(h[:])
|
|
if err == rosedb.ErrKeyNotFound {
|
|
err = b.db.Set(h[:], k)
|
|
if err != nil {
|
|
return o, err
|
|
}
|
|
return h, nil
|
|
} else if err != nil {
|
|
return o, err
|
|
}
|
|
return h, nil
|
|
}
|
|
|
|
func (b *ShapeDb) Get(h [4]byte) ([]byte, error) {
|
|
ans, err := b.db.Get(h[:])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return ans, nil
|
|
}
|
|
|
|
func hash(b []byte) [4]byte {
|
|
const (
|
|
seed = 0xbc9f1d34
|
|
m = 0xc6a4a793
|
|
)
|
|
h := uint32(seed) ^ uint32(uint64(uint32(len(b))*m))
|
|
for ; len(b) >= 4; b = b[4:] {
|
|
h += uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24
|
|
h *= m
|
|
h ^= h >> 16
|
|
}
|
|
switch len(b) {
|
|
case 3:
|
|
h += uint32(int8(b[2])) << 16
|
|
fallthrough
|
|
case 2:
|
|
h += uint32(int8(b[1])) << 8
|
|
fallthrough
|
|
case 1:
|
|
h += uint32(int8(b[0]))
|
|
h *= m
|
|
h ^= h >> 24
|
|
}
|
|
o := [4]byte{}
|
|
binary.LittleEndian.PutUint32(o[:], h)
|
|
return o
|
|
}
|