From 0704b6d3a82d8d23c117f7382cd2253c67429773 Mon Sep 17 00:00:00 2001 From: a Date: Fri, 3 Jun 2022 14:24:56 -0500 Subject: [PATCH] wip --- go.mod | 4 +- go.sum | 7 + internal/storage/logs/index.go | 1 + internal/storage/logs/logfile.go | 253 ++++++++++++++++++++-------- internal/storage/logs/shape_test.go | 18 +- 5 files changed, 211 insertions(+), 72 deletions(-) create mode 100644 internal/storage/logs/index.go diff --git a/go.mod b/go.mod index b78357f5..fac9edfa 100644 --- a/go.mod +++ b/go.mod @@ -44,7 +44,6 @@ require ( github.com/aead/poly1305 v0.0.0-20180717145839-3fee0db0b635 // indirect github.com/ameshkov/dnsstamps v1.0.3 // indirect github.com/beefsack/go-rate v0.0.0-20220214233405-116f4ca011a0 // indirect - github.com/bits-and-blooms/bloom v2.0.3+incompatible // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/cheekybits/genny v1.0.0 // indirect github.com/cockroachdb/errors v1.8.1 // indirect @@ -53,6 +52,7 @@ require ( github.com/cockroachdb/redact v1.0.8 // indirect github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/flower-corp/rosedb v1.0.0 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/gogo/protobuf v1.3.1 // indirect github.com/golang/snappy v0.0.3 // indirect @@ -69,7 +69,9 @@ require ( github.com/onsi/ginkgo v1.16.5 // indirect github.com/patrickmn/go-cache v2.1.0+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/plar/go-adaptive-radix-tree v1.0.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rs/xid v1.4.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.1.1 // indirect github.com/u-root/uio v0.0.0-20220204230159-dac05f7d2cb4 // indirect diff --git a/go.sum b/go.sum index c83a81cf..f0aff128 100644 --- a/go.sum +++ b/go.sum @@ -106,6 +106,8 @@ github.com/fanliao/go-promise v0.0.0-20141029170127-1890db352a72/go.mod h1:Pjfxu github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= +github.com/flower-corp/rosedb v1.0.0 h1:xwaYFxhJkKpKQrNdBKQ3RLB2f41vNxX6Z7u8wWtuvgs= +github.com/flower-corp/rosedb v1.0.0/go.mod h1:L/AgDvJxcsDZK37/ux+5hlpkgDjJQiLh9r2NFyJyPsM= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= @@ -322,6 +324,8 @@ github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTw github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/plar/go-adaptive-radix-tree v1.0.4 h1:Ucd8R6RH2E7RW8ZtDKrsWyOD3paG2qqJO0I20WQ8oWQ= +github.com/plar/go-adaptive-radix-tree v1.0.4/go.mod h1:Ot8d28EII3i7Lv4PSvBlF8ejiD/CtRYDuPsySJbSaK8= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -329,6 +333,8 @@ github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1: github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= +github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= @@ -559,6 +565,7 @@ golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210909193231-528a39cd75f3/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/storage/logs/index.go b/internal/storage/logs/index.go new file mode 100644 index 00000000..b04fcc5c --- /dev/null +++ b/internal/storage/logs/index.go @@ -0,0 +1 @@ +package logfile diff --git a/internal/storage/logs/logfile.go b/internal/storage/logs/logfile.go index becf5226..95e21ad6 100644 --- a/internal/storage/logs/logfile.go +++ b/internal/storage/logs/logfile.go @@ -1,16 +1,38 @@ package logfile import ( + "bytes" "encoding/binary" + "encoding/gob" + "encoding/json" "net" + "os" + "path" + "strings" "sync" "time" "github.com/AdguardTeam/AdGuardHome/internal/filtering" "github.com/AdguardTeam/AdGuardHome/internal/querylog" - "github.com/cockroachdb/pebble" + "github.com/flower-corp/rosedb" + "github.com/rs/xid" ) +var bp = sync.Pool{ + New: func() any { + return new(bytes.Buffer) + }, +} + +func getBuf() *bytes.Buffer { + return bp.Get().(*bytes.Buffer) +} + +func putBuf(b *bytes.Buffer) { + b.Reset() + bp.Put(b) +} + var magic = [8]byte{0xfe, 0xed, 0xbe, 0xef, 0x69, 0x00, 0x00, 0x00} const BLOCK_SIZE = 1600 @@ -18,70 +40,6 @@ const BLOOM_SIZE_BITS = 500 * 64 const BLOOM_SIZE_BYTES = BLOOM_SIZE_BITS / 8 const BLOOM_SIZE_TOTAL = 8 + 8 + 8 + BLOOM_SIZE_BYTES -type LogStorage struct { -} - -type LogBlock struct { -} - -type ShapeStorage struct { - blocks []ShapeBlock - - shapeCache map[string][8]byte -} - -func (s *ShapeStorage) FindOrAdd(k []byte) [8]byte { - if len(k) <= 8 { - o := [8]byte{} - for i, v := range k { - o[i] = v - } - return o - } - return [8]byte{} -} - -type ShapeBlock struct { - db *pebble.DB - - sync.RWMutex -} - -func (b *ShapeBlock) Open(name string) (err error) { - b.db, err = pebble.Open(name, &pebble.Options{}) - if err != nil { - return nil - } - return nil -} - -func (b *ShapeBlock) Add(k []byte) ([]byte, error) { - h := hash(k) - _, c, err := b.db.Get(h) - if err == pebble.ErrNotFound { - err = b.db.Set(h, k, pebble.Sync) - if err != nil { - return nil, err - } - return h, nil - } else if err != nil { - return nil, err - } - c.Close() - return h, nil -} - -func (b *ShapeBlock) Get(h [4]byte) ([]byte, error) { - ans, c, err := b.db.Get(h[:]) - if err != nil { - return nil, err - } - defer c.Close() - ansCopy := make([]byte, len(ans)) - copy(ansCopy, ans) - return ansCopy, nil -} - // represents a single log entry in storage type LogEntry struct { IP net.IP `json:"IP"` // Client IP @@ -102,7 +60,168 @@ type LogEntry struct { Upstream string `json:",omitempty"` // if empty, means it was cached } -func hash(b []byte) []byte { +func (s *LogStorage) CompressLog(l *LogEntry) *compressedLogEntry { + o := &compressedLogEntry{} + o.IP = l.IP.String() + o.Time = l.Time.UnixNano() + o.Query, _ = s.shapes.Add([]byte(l.QClass + "%" + l.QHost + "%" + l.QType)) + o.ClientWithProto, _ = s.shapes.Add([]byte(l.ClientID + "%" + string(l.ClientProto))) + o.Ans, _ = s.shapes.Add(l.Answer) + o.Oans, _ = s.shapes.Add(l.OrigAnswer) + o.Upstream, _ = s.shapes.Add([]byte(l.Upstream)) + rb, _ := json.Marshal(l.Result) + o.Result, _ = s.shapes.Add(rb) + o.Elapsed = l.Elapsed + return o +} + +func (s *LogStorage) DecompressLog(l *compressedLogEntry) *LogEntry { + o := &LogEntry{} + o.IP = net.ParseIP(l.IP) + o.Time = time.Unix(0, l.Time) + qstr, _ := s.shapes.Get(l.Query) + qsplt := strings.Split(string(qstr), "%") + if len(qsplt) > 2 { + o.QClass = qsplt[0] + o.QHost = qsplt[1] + o.QType = qsplt[2] + } + cstr, _ := s.shapes.Get(l.ClientWithProto) + csplt := strings.Split(string(cstr), "%") + if len(csplt) > 1 { + o.ClientID = csplt[0] + o.ClientProto = querylog.ClientProto(csplt[1]) + } + o.Answer, _ = s.shapes.Get(l.Ans) + o.OrigAnswer, _ = s.shapes.Get(l.Oans) + upb, _ := s.shapes.Get(l.Upstream) + o.Upstream = string(upb) + rb, _ := s.shapes.Get(l.Result) + json.Unmarshal(rb, &o.Result) + o.Elapsed = l.Elapsed + return o +} + +type compressedLogEntry struct { + IP string + Time int64 + Query [4]byte + ClientWithProto [4]byte + Ans [4]byte + Oans [4]byte + Upstream [4]byte + Result [4]byte + Elapsed time.Duration +} + +type LogStorage struct { + db *rosedb.RoseDB + shapes *ShapeDb +} + +func New(dir string) (*LogStorage, error) { + out := &LogStorage{ + shapes: new(ShapeDb), + } + var err error + os.MkdirAll(dir, 0644) + err = out.shapes.Open(path.Join(dir, "dict.db")) + if err != nil { + return nil, err + } + do := rosedb.DefaultOptions(path.Join(dir, "data.db")) + do.Sync = true + do.IndexMode = rosedb.KeyValueMemMode + out.db, err = rosedb.Open(do) + if err != nil { + return nil, err + } + + return out, nil +} + +func (s *LogStorage) CompactEntry(l *LogEntry) []byte { + buf := new(bytes.Buffer) + return buf.Bytes() +} + +func (s *LogStorage) Store(l *LogEntry) error { + id := xid.NewWithTime(l.Time) + s.db.Set(id.Bytes(), s.CompactEntry(l)) + return nil +} + +var zset_ts = []byte("z_ts") + +func (s *LogStorage) All() ([]*LogEntry, error) { + out := []*LogEntry{} + total := s.db.ZCard(zset_ts) + bts, err := s.db.ZRange(zset_ts, 0, total) + if err != nil { + return nil, err + } + for _, v := range bts { + var compact compressedLogEntry + err := gob.NewDecoder(bytes.NewBuffer(v)).Decode(&compact) + if err != nil { + continue + } + out = append(out, s.DecompressLog(&compact)) + } + + return out, nil +} + +type ShapeDb struct { + db *rosedb.RoseDB +} + +func (b *ShapeDb) Open(name string) (err error) { + do := rosedb.DefaultOptions(path.Join(name)) + do.Sync = true + do.IndexMode = rosedb.KeyValueMemMode + do.LogFileSizeThreshold = 16 << 20 + b.db, err = rosedb.Open(do) + if err != nil { + return nil + } + return nil +} + +func (b *ShapeDb) Add(k []byte) ([4]byte, error) { + o := [4]byte{} + if len(k) <= 4 { + for i, v := range k { + if i > 4 { + break + } + o[i] = v + } + return o, nil + } + h := hash(k) + _, err := b.db.Get(h[:]) + if err == rosedb.ErrKeyNotFound { + err = b.db.Set(h[:], k) + if err != nil { + return o, err + } + return h, nil + } else if err != nil { + return o, err + } + return h, nil +} + +func (b *ShapeDb) Get(h [4]byte) ([]byte, error) { + ans, err := b.db.Get(h[:]) + if err != nil { + return nil, err + } + return ans, nil +} + +func hash(b []byte) [4]byte { const ( seed = 0xbc9f1d34 m = 0xc6a4a793 @@ -127,5 +246,5 @@ func hash(b []byte) []byte { } o := [4]byte{} binary.LittleEndian.PutUint32(o[:], h) - return o[:] + return o } diff --git a/internal/storage/logs/shape_test.go b/internal/storage/logs/shape_test.go index af9a381a..fe5c28d4 100644 --- a/internal/storage/logs/shape_test.go +++ b/internal/storage/logs/shape_test.go @@ -10,16 +10,26 @@ func TestDatabaseWrite(t *testing.T) { "localhost", "some.weird.absodas"} - var sb = new(ShapeBlock) - err := sb.Open("shapes.db") + var sb = new(ShapeDb) + err := sb.Open("dict.db") if err != nil { t.Fatal("failed to open db") } - hshs := make([][]byte, len(cases)) + hshs := make([][4]byte, len(cases)) for i, c := range cases { hshs[i], err = sb.Add([]byte(c)) if err != nil { - t.Fatalf("failed to add %s", c) + t.Fatalf("failed to add %s %v", c, err) + } + } + + for i, v := range hshs { + val, err := sb.Get(v) + if err != nil { + t.Fatalf("failed to get %s %v", val, err) + } + if string(val) != cases[i] { + t.Fatalf("val no match %s", val) } } }