datamanager: implement data files splitting

split data files in multiple files of a max size (default 10Mib)
In this way every data snapshot will change only the datafiles that have some
changes instead of the whole single file.
This commit is contained in:
Simone Gotti 2019-06-03 16:17:27 +02:00
parent 8e4555373d
commit 22bd181fc8
6 changed files with 828 additions and 187 deletions

View File

@ -20,21 +20,35 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"sort"
"strings" "strings"
ostypes "github.com/sorintlab/agola/internal/objectstorage/types" ostypes "github.com/sorintlab/agola/internal/objectstorage/types"
"github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/sequence"
uuid "github.com/satori/go.uuid"
errors "golang.org/x/xerrors" errors "golang.org/x/xerrors"
) )
const (
DefaultMaxDataFileSize = 10 * 1024 * 1024
)
type DataStatus struct { type DataStatus struct {
DataSequence string `json:"data_sequence,omitempty"` DataSequence string `json:"data_sequence,omitempty"`
WalSequence string `json:"wal_sequence,omitempty"` WalSequence string `json:"wal_sequence,omitempty"`
Files map[string][]string `json:"files,omitempty"` // an entry id ordered list of files for a specific data type (map key)
Files map[string][]*DataStatusFile `json:"files,omitempty"`
}
type DataStatusFile struct {
ID string `json:"id,omitempty"`
// the last entry id in this file
LastEntryID string `json:"last_entry_id,omitempty"`
} }
type DataFileIndex struct { type DataFileIndex struct {
Index map[string]int `json:"index,omitempty"` Index map[string]int64 `json:"index,omitempty"`
} }
type DataEntry struct { type DataEntry struct {
@ -47,105 +61,50 @@ func dataStatusPath(sequence string) string {
return fmt.Sprintf("%s/%s.status", storageDataDir, sequence) return fmt.Sprintf("%s/%s.status", storageDataDir, sequence)
} }
func dataFileIndexPath(datatype, sequence string) string { func DataFileIndexPath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.index", storageDataDir, datatype, sequence) return fmt.Sprintf("%s/%s/%s.index", storageDataDir, dataType, id)
} }
func dataFilePath(datatype, sequence string) string { func DataFilePath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.data", storageDataDir, datatype, sequence) return fmt.Sprintf("%s/%s/%s.data", storageDataDir, dataType, id)
} }
// TODO(sgotti) this implementation could be heavily optimized to store less data in memory
// TODO(sgotti) // TODO(sgotti)
// split/merge data files at max N bytes (i.e 16MiB) so we'll rewrite only files // split/merge data files at max N bytes (i.e 16MiB) so we'll rewrite only files
// with changed data // with changed data
func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { // walIndex is a map of dataType of id of walEntry
dataSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey) // TODO(sgotti) write this index to local disk (a temporary sqlite lite) instead of storing all in memory
if err != nil { type walIndex map[string]walActions
return err
}
for _, dataType := range d.dataTypes { // walDataEntries is an order by id list of data entries
if err := d.writeDataType(ctx, wals, dataType, dataSequence.String()); err != nil { type walActions []*Action
return err
}
}
var lastWalSequence string func (w walActions) Len() int { return len(w) }
for _, walData := range wals { func (w walActions) Less(i, j int) bool { return w[i].ID < w[j].ID }
lastWalSequence = walData.WalSequence func (w walActions) Swap(i, j int) { w[i], w[j] = w[j], w[i] }
}
dataStatus := &DataStatus{ func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex, error) {
DataSequence: dataSequence.String(), wimap := map[string]map[string]*Action{}
WalSequence: lastWalSequence,
Files: make(map[string][]string),
}
for _, dataType := range d.dataTypes {
dataStatus.Files[dataType] = []string{dataFilePath(dataType, dataSequence.String())}
}
dataStatusj, err := json.Marshal(dataStatus)
if err != nil {
return err
}
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err
}
return nil
}
func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, datatype, dataSequence string) error {
curDataStatus, err := d.GetLastDataStatus()
if err != nil && err != ostypes.ErrNotExist {
return err
}
dataEntriesMap := map[string]*DataEntry{}
if err != ostypes.ErrNotExist {
curDataSequence := curDataStatus.DataSequence
oldDataf, err := d.ost.ReadObject(dataFilePath(datatype, curDataSequence))
if err != nil && err != ostypes.ErrNotExist {
return err
}
if err != ostypes.ErrNotExist {
dec := json.NewDecoder(oldDataf)
for {
var de *DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil {
oldDataf.Close()
return err
}
dataEntriesMap[de.ID] = de
}
oldDataf.Close()
}
}
for _, walData := range wals { for _, walData := range wals {
walFilef, err := d.ReadWal(walData.WalSequence) walFilef, err := d.ReadWal(walData.WalSequence)
if err != nil { if err != nil {
return err return nil, err
} }
dec := json.NewDecoder(walFilef) dec := json.NewDecoder(walFilef)
var header *WalHeader var header *WalHeader
if err = dec.Decode(&header); err != nil && err != io.EOF { if err = dec.Decode(&header); err != nil && err != io.EOF {
walFilef.Close() walFilef.Close()
return err return nil, err
} }
walFilef.Close() walFilef.Close()
walFile, err := d.ReadWalData(header.WalDataFileID) walFile, err := d.ReadWalData(header.WalDataFileID)
if err != nil { if err != nil {
return errors.Errorf("cannot read wal data file %q: %w", header.WalDataFileID, err) return nil, errors.Errorf("cannot read wal data file %q: %w", header.WalDataFileID, err)
} }
defer walFile.Close() defer walFile.Close()
@ -159,51 +118,92 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
break break
} }
if err != nil { if err != nil {
return errors.Errorf("failed to decode wal file: %w", err) return nil, errors.Errorf("failed to decode wal file: %w", err)
}
if action.DataType != datatype {
continue
} }
switch action.ActionType { if _, ok := wimap[action.DataType]; !ok {
case ActionTypePut: wimap[action.DataType] = map[string]*Action{}
de := &DataEntry{
ID: action.ID,
DataType: action.DataType,
Data: action.Data,
}
dataEntriesMap[de.ID] = de
case ActionTypeDelete:
delete(dataEntriesMap, action.ID)
} }
// only keep the last action for every entry id
wimap[action.DataType][action.ID] = action
} }
} }
dataEntries := []*DataEntry{} wi := map[string]walActions{}
for _, de := range dataEntriesMap { for dataType, dd := range wimap {
dataEntries = append(dataEntries, de) for _, de := range dd {
wi[dataType] = append(wi[dataType], de)
}
sort.Sort(wi[dataType])
} }
dataFileIndex := &DataFileIndex{ return wi, nil
Index: make(map[string]int), }
// writeDataSnapshot will create a new data snapshot merging the uncheckpointed
// wals. It will split data files at maxDataFileSize bytes so we'll rewrite only
// files with changed data.
// Only new files will be created, previous snapshot data files won't be touched
//
// TODO(sgotti) add a function to merge small data files (i.e after deletions) to avoid fragmentation
// TODO(sgotti) add a function to delete old data files keeping only N snapshots
func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) error {
dataSequence, err := sequence.IncSequence(ctx, d.e, etcdCheckpointSeqKey)
if err != nil {
return err
} }
var buf bytes.Buffer var lastWalSequence string
pos := 0 for _, walData := range wals {
for _, de := range dataEntries { lastWalSequence = walData.WalSequence
dataFileIndex.Index[de.ID] = pos }
dataEntryj, err := json.Marshal(de) dataStatus := &DataStatus{
DataSequence: dataSequence.String(),
WalSequence: lastWalSequence,
Files: make(map[string][]*DataStatusFile),
}
wi, err := d.walIndex(ctx, wals)
if err != nil {
return err
}
curDataStatus, err := d.GetLastDataStatus()
if err != nil && err != ostypes.ErrNotExist {
return err
}
for _, dataType := range d.dataTypes {
var curDataStatusFiles []*DataStatusFile
if curDataStatus != nil {
curDataStatusFiles = curDataStatus.Files[dataType]
}
dataStatusFiles, err := d.writeDataType(ctx, wi, dataType, curDataStatusFiles)
if err != nil { if err != nil {
return err return err
} }
if _, err := buf.Write(dataEntryj); err != nil { dataStatus.Files[dataType] = dataStatusFiles
return err
}
pos += len(dataEntryj)
} }
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, int64(buf.Len()), true); err != nil {
dataStatusj, err := json.Marshal(dataStatus)
if err != nil {
return err
}
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err
}
return nil
}
func (d *DataManager) writeDataFile(ctx context.Context, buf *bytes.Buffer, size int64, dataFileIndex *DataFileIndex, dataFileID, dataType string) error {
if buf.Len() == 0 {
return fmt.Errorf("empty data entries")
}
if err := d.ost.WriteObject(DataFilePath(dataType, dataFileID), buf, size, true); err != nil {
return err return err
} }
@ -211,21 +211,293 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
if err != nil { if err != nil {
return err return err
} }
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil { if err := d.ost.WriteObject(DataFileIndexPath(dataType, dataFileID), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil {
return err return err
} }
return nil return nil
} }
type ActionGroup struct {
DataStatusFile *DataStatusFile
StartActionIndex int
ActionsSize int
PreviousDataStatusFiles []*DataStatusFile
}
func (d *DataManager) actionGroups(ctx context.Context, wi walIndex, dataType string, curDataStatusFiles []*DataStatusFile) ([]*ActionGroup, []*DataStatusFile) {
dataStatusFiles := []*DataStatusFile{}
remainingDataStatusFiles := []*DataStatusFile{}
actionGroups := []*ActionGroup{}
var startActionIndex int
var actionsSize int
var actionIndex int
var curDataStatusFileIndex int
for {
var action *Action
if actionIndex <= len(wi[dataType])-1 {
action = wi[dataType][actionIndex]
}
var curDataStatusFile *DataStatusFile
if curDataStatusFileIndex <= len(curDataStatusFiles)-1 {
curDataStatusFile = curDataStatusFiles[curDataStatusFileIndex]
}
if action == nil {
if actionsSize > 0 {
actionGroup := &ActionGroup{
DataStatusFile: curDataStatusFile,
StartActionIndex: startActionIndex,
ActionsSize: actionsSize,
PreviousDataStatusFiles: dataStatusFiles,
}
actionGroups = append(actionGroups, actionGroup)
curDataStatusFileIndex++
if curDataStatusFileIndex <= len(curDataStatusFiles)-1 {
remainingDataStatusFiles = curDataStatusFiles[curDataStatusFileIndex:]
}
}
break
}
if curDataStatusFile != nil {
if curDataStatusFile.LastEntryID >= action.ID || curDataStatusFileIndex == len(curDataStatusFiles)-1 {
// continue using this status file
actionIndex++
actionsSize++
} else {
// find new status file
if actionsSize > 0 {
actionGroup := &ActionGroup{
DataStatusFile: curDataStatusFile,
StartActionIndex: startActionIndex,
ActionsSize: actionsSize,
PreviousDataStatusFiles: dataStatusFiles,
}
actionGroups = append(actionGroups, actionGroup)
startActionIndex = actionIndex
actionsSize = 0
dataStatusFiles = []*DataStatusFile{}
} else {
dataStatusFiles = append(dataStatusFiles, curDataStatusFile)
}
curDataStatusFileIndex++
}
} else {
actionIndex++
actionsSize++
}
}
return actionGroups, remainingDataStatusFiles
}
func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType string, curDataStatusFiles []*DataStatusFile) ([]*DataStatusFile, error) {
type SplitPoint struct {
pos int64
lastEntryID string
}
if len(wi[dataType]) == 0 {
// no actions
return curDataStatusFiles, nil
}
actionGroups, remainingDataStatusFiles := d.actionGroups(ctx, wi, dataType, curDataStatusFiles)
dataStatusFiles := []*DataStatusFile{}
for _, actionGroup := range actionGroups {
dataStatusFiles = append(dataStatusFiles, actionGroup.PreviousDataStatusFiles...)
splitPoints := []SplitPoint{}
dataFileIndexes := []*DataFileIndex{}
dataFileIndex := &DataFileIndex{
Index: make(map[string]int64),
}
dataEntries := []*DataEntry{}
var buf bytes.Buffer
var pos int64
var lastEntryID string
if actionGroup.DataStatusFile != nil {
// TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed
oldDataf, err := d.ost.ReadObject(DataFilePath(dataType, actionGroup.DataStatusFile.ID))
if err != nil && err != ostypes.ErrNotExist {
return nil, err
}
if err != ostypes.ErrNotExist {
dec := json.NewDecoder(oldDataf)
for {
var de *DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil {
oldDataf.Close()
return nil, err
}
dataEntries = append(dataEntries, de)
}
oldDataf.Close()
}
}
dataEntryIndex := 0
actionIndex := actionGroup.StartActionIndex
// iterate over data entries and action in order
for {
exists := false
useAction := false
var action *Action
if actionIndex < actionGroup.StartActionIndex+actionGroup.ActionsSize {
action = wi[dataType][actionIndex]
}
var de *DataEntry
if dataEntryIndex <= len(dataEntries)-1 {
de = dataEntries[dataEntryIndex]
}
if de == nil && action == nil {
break
}
if action != nil {
if de != nil {
if de.ID == action.ID {
exists = true
useAction = true
}
if de.ID > action.ID {
useAction = true
}
} else {
useAction = true
}
if useAction {
de = nil
switch action.ActionType {
case ActionTypePut:
de = &DataEntry{
ID: action.ID,
DataType: action.DataType,
Data: action.Data,
}
if exists {
// replace current data entry with the action data
dataEntryIndex++
}
case ActionTypeDelete:
if exists {
// skip current data entry
dataEntryIndex++
}
}
actionIndex++
} else {
dataEntryIndex++
}
} else {
dataEntryIndex++
}
if de != nil {
lastEntryID = de.ID
dataEntryj, err := json.Marshal(de)
if err != nil {
return nil, err
}
if _, err := buf.Write(dataEntryj); err != nil {
return nil, err
}
dataFileIndex.Index[de.ID] = pos
prevPos := pos
pos += int64(len(dataEntryj))
var lastSplitPos int64
if len(splitPoints) > 0 {
lastSplitPos = splitPoints[len(splitPoints)-1].pos
}
if pos-lastSplitPos > d.maxDataFileSize {
// add split point only if it's different (less) than the previous one
if lastSplitPos < prevPos {
splitPoints = append(splitPoints, SplitPoint{pos: int64(buf.Len()), lastEntryID: lastEntryID})
dataFileIndexes = append(dataFileIndexes, dataFileIndex)
dataFileIndex = &DataFileIndex{
Index: make(map[string]int64),
}
}
}
}
}
// save remaining data
if buf.Len() != 0 {
var curPos int64
var lastSplitPos int64
if len(splitPoints) > 0 {
lastSplitPos = splitPoints[len(splitPoints)-1].pos
}
// add final split point if there's something left in the buffer
if lastSplitPos != int64(buf.Len()) {
splitPoints = append(splitPoints, SplitPoint{pos: int64(buf.Len()), lastEntryID: lastEntryID})
}
dataFileIndexes = append(dataFileIndexes, dataFileIndex)
for i, sp := range splitPoints {
curDataFileID := uuid.NewV4().String()
if err := d.writeDataFile(ctx, &buf, sp.pos-curPos, dataFileIndexes[i], curDataFileID, dataType); err != nil {
return nil, err
}
// insert new dataStatusFile
dataStatusFiles = append(dataStatusFiles, &DataStatusFile{
ID: curDataFileID,
LastEntryID: sp.lastEntryID,
})
curPos = sp.pos
}
}
}
dataStatusFiles = append(dataStatusFiles, remainingDataStatusFiles...)
return dataStatusFiles, nil
}
func (d *DataManager) Read(dataType, id string) (io.Reader, error) { func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
curDataStatus, err := d.GetLastDataStatus() curDataStatus, err := d.GetLastDataStatus()
if err != nil { if err != nil {
return nil, err return nil, err
} }
dataSequence := curDataStatus.DataSequence curFiles := curDataStatus.Files
dataFileIndexf, err := d.ost.ReadObject(dataFileIndexPath(dataType, dataSequence)) var matchingDataFileID string
// get the matching data file for the action entry ID
if len(curFiles[dataType]) == 0 {
return nil, ostypes.ErrNotExist
}
matchingDataFileID = curFiles[dataType][0].ID
for _, dataStatusFile := range curFiles[dataType] {
if dataStatusFile.LastEntryID > id {
matchingDataFileID = dataStatusFile.ID
break
}
}
dataFileIndexf, err := d.ost.ReadObject(DataFileIndexPath(dataType, matchingDataFileID))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -243,7 +515,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
return nil, ostypes.ErrNotExist return nil, ostypes.ErrNotExist
} }
dataf, err := d.ost.ReadObject(dataFilePath(dataType, dataSequence)) dataf, err := d.ost.ReadObject(DataFilePath(dataType, matchingDataFileID))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -38,7 +38,7 @@ import (
// wals/{walSeq} // wals/{walSeq}
const ( const (
DefaultCheckpointInterval = 1 * time.Minute DefaultCheckpointInterval = 10 * time.Second
DefaultEtcdWalsKeepNum = 100 DefaultEtcdWalsKeepNum = 100
DefaultMinCheckpointWalsNum = 100 DefaultMinCheckpointWalsNum = 100
) )
@ -63,6 +63,8 @@ var (
etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq")
etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq")
etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq")
etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock")
etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock")
etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock")
@ -88,6 +90,7 @@ type DataManagerConfig struct {
CheckpointInterval time.Duration CheckpointInterval time.Duration
// MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint // MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint
MinCheckpointWalsNum int MinCheckpointWalsNum int
MaxDataFileSize int64
} }
type DataManager struct { type DataManager struct {
@ -100,6 +103,7 @@ type DataManager struct {
etcdWalsKeepNum int etcdWalsKeepNum int
checkpointInterval time.Duration checkpointInterval time.Duration
minCheckpointWalsNum int minCheckpointWalsNum int
maxDataFileSize int64
} }
func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) { func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) {
@ -118,6 +122,9 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
if conf.MinCheckpointWalsNum < 1 { if conf.MinCheckpointWalsNum < 1 {
return nil, errors.New("minCheckpointWalsNum must be greater than 0") return nil, errors.New("minCheckpointWalsNum must be greater than 0")
} }
if conf.MaxDataFileSize == 0 {
conf.MaxDataFileSize = DefaultMaxDataFileSize
}
d := &DataManager{ d := &DataManager{
basePath: conf.BasePath, basePath: conf.BasePath,
@ -129,6 +136,7 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
etcdWalsKeepNum: conf.EtcdWalsKeepNum, etcdWalsKeepNum: conf.EtcdWalsKeepNum,
checkpointInterval: conf.CheckpointInterval, checkpointInterval: conf.CheckpointInterval,
minCheckpointWalsNum: conf.MinCheckpointWalsNum, minCheckpointWalsNum: conf.MinCheckpointWalsNum,
maxDataFileSize: conf.MaxDataFileSize,
} }
// add trailing slash the basepath // add trailing slash the basepath

View File

@ -16,9 +16,13 @@ package datamanager
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect"
"sort"
"testing" "testing"
"time" "time"
@ -147,7 +151,7 @@ func TestEtcdReset(t *testing.T) {
dm, err = NewDataManager(ctx, logger, dmConfig) dm, err = NewDataManager(ctx, logger, dmConfig)
dmReadyCh = make(chan struct{}) dmReadyCh = make(chan struct{})
t.Logf("starting wal") t.Logf("starting datamanager")
go dm.Run(ctx, dmReadyCh) go dm.Run(ctx, dmReadyCh)
<-dmReadyCh <-dmReadyCh
@ -293,8 +297,12 @@ func TestWalCleaner(t *testing.T) {
} }
} }
dm.checkpoint(ctx) if err := dm.checkpoint(ctx); err != nil {
dm.walCleaner(ctx) t.Fatalf("unexpected err: %v", err)
}
if err := dm.walCleaner(ctx); err != nil {
t.Fatalf("unexpected err: %v", err)
}
walsCount := 0 walsCount := 0
for range dm.ListEtcdWals(ctx, 0) { for range dm.ListEtcdWals(ctx, 0) {
@ -333,6 +341,12 @@ func TestReadObject(t *testing.T) {
} }
dm, err := NewDataManager(ctx, logger, dmConfig) dm, err := NewDataManager(ctx, logger, dmConfig)
dmReadyCh := make(chan struct{})
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
time.Sleep(5 * time.Second)
actions := []*Action{} actions := []*Action{}
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
actions = append(actions, &Action{ actions = append(actions, &Action{
@ -343,12 +357,6 @@ func TestReadObject(t *testing.T) {
}) })
} }
dmReadyCh := make(chan struct{})
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
time.Sleep(5 * time.Second)
// populate with a wal // populate with a wal
_, err = dm.WriteWal(ctx, actions, nil) _, err = dm.WriteWal(ctx, actions, nil)
if err != nil { if err != nil {
@ -398,8 +406,12 @@ func TestReadObject(t *testing.T) {
} }
// do a checkpoint and wal clean // do a checkpoint and wal clean
dm.checkpoint(ctx) if err := dm.checkpoint(ctx); err != nil {
dm.walCleaner(ctx) t.Fatalf("unexpected err: %v", err)
}
if err := dm.walCleaner(ctx); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// wait for the event to be read // wait for the event to be read
time.Sleep(500 * time.Millisecond) time.Sleep(500 * time.Millisecond)
@ -417,3 +429,348 @@ func TestReadObject(t *testing.T) {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
} }
func testCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGroups [][]*Action, currentEntries map[string]*DataEntry) (map[string]*DataEntry, error) {
expectedEntries := map[string]*DataEntry{}
for _, e := range currentEntries {
expectedEntries[e.ID] = e
}
for _, actionGroup := range actionGroups {
for _, action := range actionGroup {
switch action.ActionType {
case ActionTypePut:
expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
case ActionTypeDelete:
delete(expectedEntries, action.ID)
}
}
}
for _, actionGroup := range actionGroups {
// populate with a wal
_, err := dm.WriteWal(ctx, actionGroup, nil)
if err != nil {
return nil, err
}
}
// wait for the event to be read
time.Sleep(500 * time.Millisecond)
// do a checkpoint
if err := dm.checkpoint(ctx); err != nil {
return nil, err
}
if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
return nil, err
}
return expectedEntries, nil
}
// TODO(sgotti) some fuzzy testing will be really good
func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx := context.Background()
ostDir, err := ioutil.TempDir(dir, "ost")
ost, err := posix.New(ostDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmConfig := &DataManagerConfig{
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01"},
// checkpoint also with only one wal
MinCheckpointWalsNum: 1,
// use a small maxDataFileSize
MaxDataFileSize: 10 * 1024,
}
dm, err := NewDataManager(ctx, logger, dmConfig)
dmReadyCh := make(chan struct{})
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
time.Sleep(5 * time.Second)
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
// test insert from scratch (no current entries)
actions := []*Action{}
for i := 200; i < 400; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
currentEntries, err := testCheckpoint(t, ctx, dm, [][]*Action{actions}, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// test delete of all existing entries
actions = []*Action{}
for i := 200; i < 400; i++ {
actions = append(actions, &Action{
ActionType: ActionTypeDelete,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
})
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// test insert from scratch again (no current entries)
actions = []*Action{}
for i := 200; i < 400; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// test delete some existing entries in the middle
actions = []*Action{}
for i := 250; i < 350; i++ {
action := &Action{
ActionType: ActionTypeDelete,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
}
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// test delete of unexisting entries
actions = []*Action{}
for i := 1000; i < 1010; i++ {
action := &Action{
ActionType: ActionTypeDelete,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
}
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// test update and insert at the end
actions = []*Action{}
for i := 300; i < 500; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// test update and insert at the start
actions = []*Action{}
for i := 0; i < 300; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// test multiple wals with different insert, updated, deletes
actionGroups := [][]*Action{}
for i := 0; i < 150; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
actionGroups = append(actionGroups, actions)
for i := 50; i < 100; i++ {
action := &Action{
ActionType: ActionTypeDelete,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
actionGroups = append(actionGroups, actions)
for i := 250; i < 300; i++ {
action := &Action{
ActionType: ActionTypeDelete,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
for i := 70; i < 80; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
actionGroups = append(actionGroups, actions)
currentEntries, err = testCheckpoint(t, ctx, dm, actionGroups, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error {
// read the data file
curDataStatus, err := dm.GetLastDataStatus()
if err != nil {
return err
}
allEntriesMap := map[string]*DataEntry{}
var prevLastEntryID string
for i, file := range curDataStatus.Files["datatype01"] {
dataFileIndexf, err := dm.ost.ReadObject(DataFileIndexPath("datatype01", file.ID))
if err != nil {
return err
}
var dataFileIndex *DataFileIndex
dec := json.NewDecoder(dataFileIndexf)
err = dec.Decode(&dataFileIndex)
if err != nil {
dataFileIndexf.Close()
return err
}
dataFileIndexf.Close()
dataEntriesMap := map[string]*DataEntry{}
dataEntries := []*DataEntry{}
dataf, err := dm.ost.ReadObject(DataFilePath("datatype01", file.ID))
if err != nil {
return err
}
dec = json.NewDecoder(dataf)
var prevEntryID string
for {
var de *DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil {
dataf.Close()
return err
}
// check that there are no duplicate entries
if _, ok := allEntriesMap[de.ID]; ok {
return fmt.Errorf("duplicate entry id: %s", de.ID)
}
// check that the entries are in order
if de.ID < prevEntryID {
return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID)
}
dataEntriesMap[de.ID] = de
dataEntries = append(dataEntries, de)
allEntriesMap[de.ID] = de
}
dataf.Close()
// check that the index matches the entries
if len(dataFileIndex.Index) != len(dataEntriesMap) {
return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap))
}
indexIDs := make([]string, len(dataFileIndex.Index))
entriesIDs := make([]string, len(dataEntriesMap))
for id := range dataFileIndex.Index {
indexIDs = append(indexIDs, id)
}
for id := range dataEntriesMap {
entriesIDs = append(entriesIDs, id)
}
sort.Strings(indexIDs)
sort.Strings(entriesIDs)
if !reflect.DeepEqual(indexIDs, entriesIDs) {
return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs)
}
if file.LastEntryID != dataEntries[len(dataEntries)-1].ID {
return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID)
}
// check that all the files are in order
if file.LastEntryID == prevLastEntryID {
return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID)
}
if file.LastEntryID < prevLastEntryID {
return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID)
}
prevLastEntryID = file.LastEntryID
}
// check that the number of entries is right
if len(allEntriesMap) != len(expectedEntriesMap) {
return fmt.Errorf("expected %d total entries, got %d", len(expectedEntriesMap), len(allEntriesMap))
}
if !reflect.DeepEqual(expectedEntriesMap, allEntriesMap) {
return fmt.Errorf("expected entries don't match current entries")
}
return nil
}

View File

@ -723,7 +723,7 @@ func (d *DataManager) checkpoint(ctx context.Context) error {
return nil return nil
} }
if err := d.writeData(ctx, walsData); err != nil { if err := d.writeDataSnapshot(ctx, walsData); err != nil {
return errors.Errorf("checkpoint function error: %w", err) return errors.Errorf("checkpoint function error: %w", err)
} }

View File

@ -133,44 +133,46 @@ func (r *ReadDB) SyncFromDump() (string, error) {
return "", nil return "", nil
} }
for dataType, files := range dumpIndex.Files { for dataType, files := range dumpIndex.Files {
dumpf, err := r.ost.ReadObject(files[0]) for _, file := range files {
if err != nil { dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID))
return "", err
}
dumpEntries := []*datamanager.DataEntry{}
dec := json.NewDecoder(dumpf)
for {
var de *datamanager.DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil { if err != nil {
dumpf.Close()
return "", err return "", err
} }
dumpEntries = append(dumpEntries, de) dumpEntries := []*datamanager.DataEntry{}
} dec := json.NewDecoder(dumpf)
dumpf.Close() for {
var de *datamanager.DataEntry
err = r.rdb.Do(func(tx *db.Tx) error { err := dec.Decode(&de)
for _, de := range dumpEntries { if err == io.EOF {
action := &datamanager.Action{ // all done
ActionType: datamanager.ActionTypePut, break
ID: de.ID,
DataType: dataType,
Data: de.Data,
} }
if err := r.applyAction(tx, action); err != nil { if err != nil {
return err dumpf.Close()
return "", err
} }
dumpEntries = append(dumpEntries, de)
}
dumpf.Close()
err = r.rdb.Do(func(tx *db.Tx) error {
for _, de := range dumpEntries {
action := &datamanager.Action{
ActionType: datamanager.ActionTypePut,
ID: de.ID,
DataType: dataType,
Data: de.Data,
}
if err := r.applyAction(tx, action); err != nil {
return err
}
}
return nil
})
if err != nil {
return "", err
} }
return nil
})
if err != nil {
return "", err
} }
} }

View File

@ -660,44 +660,46 @@ func (r *ReadDB) SyncFromDump() (string, error) {
return "", nil return "", nil
} }
for dataType, files := range dumpIndex.Files { for dataType, files := range dumpIndex.Files {
dumpf, err := r.ost.ReadObject(files[0]) for _, file := range files {
if err != nil { dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID))
return "", err
}
dumpEntries := []*datamanager.DataEntry{}
dec := json.NewDecoder(dumpf)
for {
var de *datamanager.DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil { if err != nil {
dumpf.Close()
return "", err return "", err
} }
dumpEntries = append(dumpEntries, de) dumpEntries := []*datamanager.DataEntry{}
} dec := json.NewDecoder(dumpf)
dumpf.Close() for {
var de *datamanager.DataEntry
err = r.rdb.Do(func(tx *db.Tx) error { err := dec.Decode(&de)
for _, de := range dumpEntries { if err == io.EOF {
action := &datamanager.Action{ // all done
ActionType: datamanager.ActionTypePut, break
ID: de.ID,
DataType: dataType,
Data: de.Data,
} }
if err := r.applyAction(tx, action); err != nil { if err != nil {
return err dumpf.Close()
return "", err
} }
dumpEntries = append(dumpEntries, de)
}
dumpf.Close()
err = r.rdb.Do(func(tx *db.Tx) error {
for _, de := range dumpEntries {
action := &datamanager.Action{
ActionType: datamanager.ActionTypePut,
ID: de.ID,
DataType: dataType,
Data: de.Data,
}
if err := r.applyAction(tx, action); err != nil {
return err
}
}
return nil
})
if err != nil {
return "", err
} }
return nil
})
if err != nil {
return "", err
} }
} }