wip
This commit is contained in:
parent
0704b6d3a8
commit
e5127d1aa5
52
internal/storage/logs/compress_test.go
Normal file
52
internal/storage/logs/compress_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestBigFils(t *testing.T) {
|
||||
file, err := os.Open("./querylog.json")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
json_logs := make([]*LogEntry, 0, 1000)
|
||||
scn := bufio.NewScanner(file)
|
||||
idx := 0
|
||||
for scn.Scan() {
|
||||
idx++
|
||||
var entry LogEntry
|
||||
err = json.Unmarshal(scn.Bytes(), &entry)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
json_logs = append(json_logs, &entry)
|
||||
}
|
||||
file.Close()
|
||||
ls, err := New("./logdb/")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
_ = ls
|
||||
|
||||
func() {
|
||||
defer func() {
|
||||
ls.db.Close()
|
||||
ls.shapes.db.Close()
|
||||
}()
|
||||
for i, v := range json_logs {
|
||||
if i%10000 == 0 {
|
||||
log.Printf("%d / %d", i, len(json_logs))
|
||||
}
|
||||
err = ls.Store(v)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
}
|
@ -1 +0,0 @@
|
||||
package logfile
|
@ -62,7 +62,7 @@ type LogEntry struct {
|
||||
|
||||
func (s *LogStorage) CompressLog(l *LogEntry) *compressedLogEntry {
|
||||
o := &compressedLogEntry{}
|
||||
o.IP = l.IP.String()
|
||||
o.IP, _ = s.shapes.Add([]byte(l.IP.String()))
|
||||
o.Time = l.Time.UnixNano()
|
||||
o.Query, _ = s.shapes.Add([]byte(l.QClass + "%" + l.QHost + "%" + l.QType))
|
||||
o.ClientWithProto, _ = s.shapes.Add([]byte(l.ClientID + "%" + string(l.ClientProto)))
|
||||
@ -77,7 +77,8 @@ func (s *LogStorage) CompressLog(l *LogEntry) *compressedLogEntry {
|
||||
|
||||
func (s *LogStorage) DecompressLog(l *compressedLogEntry) *LogEntry {
|
||||
o := &LogEntry{}
|
||||
o.IP = net.ParseIP(l.IP)
|
||||
ipstr, _ := s.shapes.Get(l.IP)
|
||||
o.IP = net.ParseIP(string(ipstr))
|
||||
o.Time = time.Unix(0, l.Time)
|
||||
qstr, _ := s.shapes.Get(l.Query)
|
||||
qsplt := strings.Split(string(qstr), "%")
|
||||
@ -103,8 +104,8 @@ func (s *LogStorage) DecompressLog(l *compressedLogEntry) *LogEntry {
|
||||
}
|
||||
|
||||
type compressedLogEntry struct {
|
||||
IP string
|
||||
Time int64
|
||||
IP [4]byte
|
||||
Query [4]byte
|
||||
ClientWithProto [4]byte
|
||||
Ans [4]byte
|
||||
@ -124,14 +125,15 @@ func New(dir string) (*LogStorage, error) {
|
||||
shapes: new(ShapeDb),
|
||||
}
|
||||
var err error
|
||||
os.MkdirAll(dir, 0644)
|
||||
os.MkdirAll(dir, 0o644)
|
||||
err = out.shapes.Open(path.Join(dir, "dict.db"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
do := rosedb.DefaultOptions(path.Join(dir, "data.db"))
|
||||
do.Sync = true
|
||||
do.Sync = false
|
||||
do.IndexMode = rosedb.KeyValueMemMode
|
||||
do.LogFileSizeThreshold = 16 << 20
|
||||
out.db, err = rosedb.Open(do)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -142,12 +144,14 @@ func New(dir string) (*LogStorage, error) {
|
||||
|
||||
func (s *LogStorage) CompactEntry(l *LogEntry) []byte {
|
||||
buf := new(bytes.Buffer)
|
||||
gob.NewEncoder(buf).Encode(s.CompressLog(l))
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
func (s *LogStorage) Store(l *LogEntry) error {
|
||||
id := xid.NewWithTime(l.Time)
|
||||
s.db.Set(id.Bytes(), s.CompactEntry(l))
|
||||
gb := s.CompactEntry(l)
|
||||
s.db.Set(id.Bytes(), gb)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -168,7 +172,6 @@ func (s *LogStorage) All() ([]*LogEntry, error) {
|
||||
}
|
||||
out = append(out, s.DecompressLog(&compact))
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
@ -178,7 +181,7 @@ type ShapeDb struct {
|
||||
|
||||
func (b *ShapeDb) Open(name string) (err error) {
|
||||
do := rosedb.DefaultOptions(path.Join(name))
|
||||
do.Sync = true
|
||||
do.Sync = false
|
||||
do.IndexMode = rosedb.KeyValueMemMode
|
||||
do.LogFileSizeThreshold = 16 << 20
|
||||
b.db, err = rosedb.Open(do)
|
||||
|
@ -1,35 +0,0 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestDatabaseWrite(t *testing.T) {
|
||||
cases := []string{"280312038j1203j8120938j120938120938d12903h1207h13926410327h46012346073",
|
||||
"google.com",
|
||||
"localhost",
|
||||
"some.weird.absodas"}
|
||||
|
||||
var sb = new(ShapeDb)
|
||||
err := sb.Open("dict.db")
|
||||
if err != nil {
|
||||
t.Fatal("failed to open db")
|
||||
}
|
||||
hshs := make([][4]byte, len(cases))
|
||||
for i, c := range cases {
|
||||
hshs[i], err = sb.Add([]byte(c))
|
||||
if err != nil {
|
||||
t.Fatalf("failed to add %s %v", c, err)
|
||||
}
|
||||
}
|
||||
|
||||
for i, v := range hshs {
|
||||
val, err := sb.Get(v)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to get %s %v", val, err)
|
||||
}
|
||||
if string(val) != cases[i] {
|
||||
t.Fatalf("val no match %s", val)
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user