From e5127d1aa5fb45e32abc3b72753f7f0ee19927bf Mon Sep 17 00:00:00 2001 From: a Date: Fri, 3 Jun 2022 15:02:14 -0500 Subject: [PATCH] wip --- internal/storage/logs/compress_test.go | 52 ++++++++++++++++++++++++++ internal/storage/logs/index.go | 1 - internal/storage/logs/logfile.go | 19 ++++++---- internal/storage/logs/shape_test.go | 35 ----------------- 4 files changed, 63 insertions(+), 44 deletions(-) create mode 100644 internal/storage/logs/compress_test.go delete mode 100644 internal/storage/logs/index.go delete mode 100644 internal/storage/logs/shape_test.go diff --git a/internal/storage/logs/compress_test.go b/internal/storage/logs/compress_test.go new file mode 100644 index 00000000..1c612a6c --- /dev/null +++ b/internal/storage/logs/compress_test.go @@ -0,0 +1,52 @@ +package logfile + +import ( + "bufio" + "encoding/json" + "log" + "os" + "testing" +) + +func TestBigFils(t *testing.T) { + file, err := os.Open("./querylog.json") + if err != nil { + t.Fatal(err) + } + json_logs := make([]*LogEntry, 0, 1000) + scn := bufio.NewScanner(file) + idx := 0 + for scn.Scan() { + idx++ + var entry LogEntry + err = json.Unmarshal(scn.Bytes(), &entry) + if err != nil { + continue + } + json_logs = append(json_logs, &entry) + } + file.Close() + ls, err := New("./logdb/") + if err != nil { + t.Fatal(err) + } + _ = ls + + func() { + defer func() { + ls.db.Close() + ls.shapes.db.Close() + }() + for i, v := range json_logs { + if i%10000 == 0 { + log.Printf("%d / %d", i, len(json_logs)) + } + err = ls.Store(v) + if err != nil { + log.Println(err) + continue + } + } + }() + +} diff --git a/internal/storage/logs/index.go b/internal/storage/logs/index.go deleted file mode 100644 index b04fcc5c..00000000 --- a/internal/storage/logs/index.go +++ /dev/null @@ -1 +0,0 @@ -package logfile diff --git a/internal/storage/logs/logfile.go b/internal/storage/logs/logfile.go index 95e21ad6..9b264e88 100644 --- a/internal/storage/logs/logfile.go +++ b/internal/storage/logs/logfile.go @@ -62,7 +62,7 @@ type LogEntry struct { func (s *LogStorage) CompressLog(l *LogEntry) *compressedLogEntry { o := &compressedLogEntry{} - o.IP = l.IP.String() + o.IP, _ = s.shapes.Add([]byte(l.IP.String())) o.Time = l.Time.UnixNano() o.Query, _ = s.shapes.Add([]byte(l.QClass + "%" + l.QHost + "%" + l.QType)) o.ClientWithProto, _ = s.shapes.Add([]byte(l.ClientID + "%" + string(l.ClientProto))) @@ -77,7 +77,8 @@ func (s *LogStorage) CompressLog(l *LogEntry) *compressedLogEntry { func (s *LogStorage) DecompressLog(l *compressedLogEntry) *LogEntry { o := &LogEntry{} - o.IP = net.ParseIP(l.IP) + ipstr, _ := s.shapes.Get(l.IP) + o.IP = net.ParseIP(string(ipstr)) o.Time = time.Unix(0, l.Time) qstr, _ := s.shapes.Get(l.Query) qsplt := strings.Split(string(qstr), "%") @@ -103,8 +104,8 @@ func (s *LogStorage) DecompressLog(l *compressedLogEntry) *LogEntry { } type compressedLogEntry struct { - IP string Time int64 + IP [4]byte Query [4]byte ClientWithProto [4]byte Ans [4]byte @@ -124,14 +125,15 @@ func New(dir string) (*LogStorage, error) { shapes: new(ShapeDb), } var err error - os.MkdirAll(dir, 0644) + os.MkdirAll(dir, 0o644) err = out.shapes.Open(path.Join(dir, "dict.db")) if err != nil { return nil, err } do := rosedb.DefaultOptions(path.Join(dir, "data.db")) - do.Sync = true + do.Sync = false do.IndexMode = rosedb.KeyValueMemMode + do.LogFileSizeThreshold = 16 << 20 out.db, err = rosedb.Open(do) if err != nil { return nil, err @@ -142,12 +144,14 @@ func New(dir string) (*LogStorage, error) { func (s *LogStorage) CompactEntry(l *LogEntry) []byte { buf := new(bytes.Buffer) + gob.NewEncoder(buf).Encode(s.CompressLog(l)) return buf.Bytes() } func (s *LogStorage) Store(l *LogEntry) error { id := xid.NewWithTime(l.Time) - s.db.Set(id.Bytes(), s.CompactEntry(l)) + gb := s.CompactEntry(l) + s.db.Set(id.Bytes(), gb) return nil } @@ -168,7 +172,6 @@ func (s *LogStorage) All() ([]*LogEntry, error) { } out = append(out, s.DecompressLog(&compact)) } - return out, nil } @@ -178,7 +181,7 @@ type ShapeDb struct { func (b *ShapeDb) Open(name string) (err error) { do := rosedb.DefaultOptions(path.Join(name)) - do.Sync = true + do.Sync = false do.IndexMode = rosedb.KeyValueMemMode do.LogFileSizeThreshold = 16 << 20 b.db, err = rosedb.Open(do) diff --git a/internal/storage/logs/shape_test.go b/internal/storage/logs/shape_test.go deleted file mode 100644 index fe5c28d4..00000000 --- a/internal/storage/logs/shape_test.go +++ /dev/null @@ -1,35 +0,0 @@ -package logfile - -import ( - "testing" -) - -func TestDatabaseWrite(t *testing.T) { - cases := []string{"280312038j1203j8120938j120938120938d12903h1207h13926410327h46012346073", - "google.com", - "localhost", - "some.weird.absodas"} - - var sb = new(ShapeDb) - err := sb.Open("dict.db") - if err != nil { - t.Fatal("failed to open db") - } - hshs := make([][4]byte, len(cases)) - for i, c := range cases { - hshs[i], err = sb.Add([]byte(c)) - if err != nil { - t.Fatalf("failed to add %s %v", c, err) - } - } - - for i, v := range hshs { - val, err := sb.Get(v) - if err != nil { - t.Fatalf("failed to get %s %v", val, err) - } - if string(val) != cases[i] { - t.Fatalf("val no match %s", val) - } - } -}