datamanager: always handle basepath

Currently we aren't setting a basepath and it wasn't always correctly handled.
Fix missing basepath handling and improve tests to also use a non empty
basepath.
This commit is contained in:
Simone Gotti 2019-07-03 17:03:37 +02:00
parent abab40c6f5
commit d989fe9639
6 changed files with 74 additions and 52 deletions

View File

@ -57,18 +57,6 @@ type DataEntry struct {
Data []byte `json:"data,omitempty"`
}
func dataStatusPath(sequence string) string {
return fmt.Sprintf("%s/%s.status", storageDataDir, sequence)
}
func DataFileIndexPath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.index", storageDataDir, dataType, id)
}
func DataFilePath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.data", storageDataDir, dataType, id)
}
// TODO(sgotti) this implementation could be heavily optimized to store less data in memory
// TODO(sgotti)
@ -191,7 +179,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er
if err != nil {
return err
}
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err
}
@ -203,7 +191,7 @@ func (d *DataManager) writeDataFile(ctx context.Context, buf *bytes.Buffer, size
return fmt.Errorf("empty data entries")
}
if err := d.ost.WriteObject(DataFilePath(dataType, dataFileID), buf, size, true); err != nil {
if err := d.ost.WriteObject(d.DataFilePath(dataType, dataFileID), buf, size, true); err != nil {
return err
}
@ -211,7 +199,7 @@ func (d *DataManager) writeDataFile(ctx context.Context, buf *bytes.Buffer, size
if err != nil {
return err
}
if err := d.ost.WriteObject(DataFileIndexPath(dataType, dataFileID), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil {
if err := d.ost.WriteObject(d.DataFileIndexPath(dataType, dataFileID), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil {
return err
}
@ -326,7 +314,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
if actionGroup.DataStatusFile != nil {
// TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed
oldDataf, err := d.ost.ReadObject(DataFilePath(dataType, actionGroup.DataStatusFile.ID))
oldDataf, err := d.ost.ReadObject(d.DataFilePath(dataType, actionGroup.DataStatusFile.ID))
if err != nil && err != ostypes.ErrNotExist {
return nil, err
}
@ -497,7 +485,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
}
}
dataFileIndexf, err := d.ost.ReadObject(DataFileIndexPath(dataType, matchingDataFileID))
dataFileIndexf, err := d.ost.ReadObject(d.DataFileIndexPath(dataType, matchingDataFileID))
if err != nil {
return nil, err
}
@ -515,7 +503,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
return nil, ostypes.ErrNotExist
}
dataf, err := d.ost.ReadObject(DataFilePath(dataType, matchingDataFileID))
dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, matchingDataFileID))
if err != nil {
return nil, err
}
@ -539,7 +527,7 @@ func (d *DataManager) GetLastDataStatusPath() (string, error) {
defer close(doneCh)
var dataStatusPath string
for object := range d.ost.List(storageDataDir+"/", "", false, doneCh) {
for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) {
if object.Err != nil {
return "", object.Err
}

View File

@ -16,6 +16,7 @@ package datamanager
import (
"context"
"fmt"
"path"
"strings"
"time"
@ -31,12 +32,6 @@ import (
// * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it.
// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one
// Storage paths
// wals/{walSeq}
//
// Etcd paths
// wals/{walSeq}
const (
DefaultCheckpointInterval = 10 * time.Second
DefaultEtcdWalsKeepNum = 100
@ -147,6 +142,34 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
return d, nil
}
func (d *DataManager) storageWalStatusFile(walSeq string) string {
return path.Join(d.basePath, storageWalsStatusDir, walSeq)
}
func (d *DataManager) storageWalDataFile(walFileID string) string {
return path.Join(d.basePath, storageWalsDataDir, walFileID)
}
func (d *DataManager) storageDataDir() string {
return path.Join(d.basePath, storageDataDir)
}
func (d *DataManager) dataStatusPath(sequence string) string {
return fmt.Sprintf("%s/%s.status", d.storageDataDir(), sequence)
}
func (d *DataManager) DataFileIndexPath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.index", d.storageDataDir(), dataType, id)
}
func (d *DataManager) DataFilePath(dataType, id string) string {
return fmt.Sprintf("%s/%s/%s.data", d.storageDataDir(), dataType, id)
}
func etcdWalKey(walSeq string) string {
return path.Join(etcdWalsDir, walSeq)
}
func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
for {
err := d.InitEtcd(ctx)

View File

@ -461,7 +461,7 @@ func TestReadObject(t *testing.T) {
}
}
func testCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGroups [][]*Action, currentEntries map[string]*DataEntry) (map[string]*DataEntry, error) {
func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGroups [][]*Action, currentEntries map[string]*DataEntry) (map[string]*DataEntry, error) {
expectedEntries := map[string]*DataEntry{}
for _, e := range currentEntries {
expectedEntries[e.ID] = e
@ -503,6 +503,28 @@ func testCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGr
// TODO(sgotti) some fuzzy testing will be really good
func TestCheckpoint(t *testing.T) {
tests := []struct {
name string
basePath string
}{
{
name: "test with empty basepath",
basePath: "",
},
{
name: "test with relative basepath",
basePath: "base/path",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testCheckpoint(t, tt.basePath)
})
}
}
func testCheckpoint(t *testing.T, basePath string) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
@ -528,8 +550,9 @@ func TestCheckpoint(t *testing.T) {
}
dmConfig := &DataManagerConfig{
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
BasePath: basePath,
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01"},
@ -562,7 +585,7 @@ func TestCheckpoint(t *testing.T) {
actions = append(actions, action)
}
currentEntries, err := testCheckpoint(t, ctx, dm, [][]*Action{actions}, nil)
currentEntries, err := doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -577,7 +600,7 @@ func TestCheckpoint(t *testing.T) {
})
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -594,7 +617,7 @@ func TestCheckpoint(t *testing.T) {
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -610,7 +633,7 @@ func TestCheckpoint(t *testing.T) {
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -626,7 +649,7 @@ func TestCheckpoint(t *testing.T) {
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -643,7 +666,7 @@ func TestCheckpoint(t *testing.T) {
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -660,7 +683,7 @@ func TestCheckpoint(t *testing.T) {
actions = append(actions, action)
}
currentEntries, err = testCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -707,7 +730,7 @@ func TestCheckpoint(t *testing.T) {
}
actionGroups = append(actionGroups, actions)
_, err = testCheckpoint(t, ctx, dm, actionGroups, currentEntries)
_, err = doAndCheckCheckpoint(t, ctx, dm, actionGroups, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -724,7 +747,7 @@ func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expected
var prevLastEntryID string
for i, file := range curDataStatus.Files["datatype01"] {
dataFileIndexf, err := dm.ost.ReadObject(DataFileIndexPath("datatype01", file.ID))
dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath("datatype01", file.ID))
if err != nil {
return err
}
@ -739,7 +762,7 @@ func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expected
dataFileIndexf.Close()
dataEntriesMap := map[string]*DataEntry{}
dataEntries := []*DataEntry{}
dataf, err := dm.ost.ReadObject(DataFilePath("datatype01", file.ID))
dataf, err := dm.ost.ReadObject(dm.DataFilePath("datatype01", file.ID))
if err != nil {
return err
}

View File

@ -38,18 +38,6 @@ import (
errors "golang.org/x/xerrors"
)
func (d *DataManager) storageWalStatusFile(walSeq string) string {
return path.Join(d.basePath, storageWalsStatusDir, walSeq)
}
func (d *DataManager) storageWalDataFile(walFileID string) string {
return path.Join(d.basePath, storageWalsDataDir, walFileID)
}
func etcdWalKey(walSeq string) string {
return path.Join(etcdWalsDir, walSeq)
}
type ActionType string
const (

View File

@ -146,7 +146,7 @@ func (r *ReadDB) SyncFromDump() (string, error) {
}
for dataType, files := range dumpIndex.Files {
for _, file := range files {
dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID))
dumpf, err := r.ost.ReadObject(r.dm.DataFilePath(dataType, file.ID))
if err != nil {
return "", err
}

View File

@ -659,7 +659,7 @@ func (r *ReadDB) SyncFromDump() (string, error) {
}
for dataType, files := range dumpIndex.Files {
for _, file := range files {
dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID))
dumpf, err := r.ost.ReadObject(r.dm.DataFilePath(dataType, file.ID))
if err != nil {
return "", err
}