datamanager: implement import/export
* export: exports the newest data checkpoint. It forces a checkpoint before exporting (currently no wals are exported) * import: cleans up etcd, creates a new datasnaphot from the provided import stream and then initializes etcd. Currently no old data is removed from the object storage but it's just ignored.
This commit is contained in:
parent
af4aa58903
commit
3404cb94b9
|
@ -558,3 +558,165 @@ func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
|
||||||
|
|
||||||
return dataStatus, dec.Decode(&dataStatus)
|
return dataStatus, dec.Decode(&dataStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *DataManager) Export(ctx context.Context, w io.Writer) error {
|
||||||
|
if err := d.checkpoint(ctx, true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
curDataStatus, err := d.GetLastDataStatus()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dataType := range d.dataTypes {
|
||||||
|
var curDataStatusFiles []*DataStatusFile
|
||||||
|
if curDataStatus != nil {
|
||||||
|
curDataStatusFiles = curDataStatus.Files[dataType]
|
||||||
|
}
|
||||||
|
for _, dsf := range curDataStatusFiles {
|
||||||
|
dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, dsf.ID))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.Copy(w, dataf); err != nil {
|
||||||
|
dataf.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dataf.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DataManager) Import(ctx context.Context, r io.Reader) error {
|
||||||
|
// delete contents in etcd
|
||||||
|
if err := d.deleteEtcd(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// we require all entries of the same datatypes grouped together
|
||||||
|
seenDataTypes := map[string]struct{}{}
|
||||||
|
|
||||||
|
// create a new sequence, we assume that it'll be greater than previous data sequences
|
||||||
|
dataSequence, err := sequence.IncSequence(ctx, d.e, etcdCheckpointSeqKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dataStatus := &DataStatus{
|
||||||
|
DataSequence: dataSequence.String(),
|
||||||
|
// no last wal sequence on import
|
||||||
|
WalSequence: "",
|
||||||
|
Files: make(map[string][]*DataStatusFile),
|
||||||
|
}
|
||||||
|
|
||||||
|
dataStatusFiles := []*DataStatusFile{}
|
||||||
|
|
||||||
|
var lastEntryID string
|
||||||
|
var curDataType string
|
||||||
|
var buf bytes.Buffer
|
||||||
|
var pos int64
|
||||||
|
dataFileIndex := &DataFileIndex{
|
||||||
|
Index: make(map[string]int64),
|
||||||
|
}
|
||||||
|
dec := json.NewDecoder(r)
|
||||||
|
|
||||||
|
for {
|
||||||
|
var de *DataEntry
|
||||||
|
|
||||||
|
err := dec.Decode(&de)
|
||||||
|
if err == io.EOF {
|
||||||
|
dataFileID := uuid.NewV4().String()
|
||||||
|
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dataStatusFiles = append(dataStatusFiles, &DataStatusFile{
|
||||||
|
ID: dataFileID,
|
||||||
|
LastEntryID: lastEntryID,
|
||||||
|
})
|
||||||
|
dataStatus.Files[curDataType] = dataStatusFiles
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if curDataType == "" {
|
||||||
|
curDataType = de.DataType
|
||||||
|
seenDataTypes[de.DataType] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
mustWrite := false
|
||||||
|
mustReset := false
|
||||||
|
if pos > d.maxDataFileSize {
|
||||||
|
mustWrite = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if curDataType != de.DataType {
|
||||||
|
if _, ok := seenDataTypes[de.DataType]; ok {
|
||||||
|
return errors.Errorf("dataType %q already imported", de.DataType)
|
||||||
|
}
|
||||||
|
mustWrite = true
|
||||||
|
mustReset = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if mustWrite {
|
||||||
|
dataFileID := uuid.NewV4().String()
|
||||||
|
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dataStatusFiles = append(dataStatusFiles, &DataStatusFile{
|
||||||
|
ID: dataFileID,
|
||||||
|
LastEntryID: lastEntryID,
|
||||||
|
})
|
||||||
|
|
||||||
|
if mustReset {
|
||||||
|
dataStatus.Files[curDataType] = dataStatusFiles
|
||||||
|
|
||||||
|
dataStatusFiles = []*DataStatusFile{}
|
||||||
|
curDataType = de.DataType
|
||||||
|
lastEntryID = ""
|
||||||
|
}
|
||||||
|
|
||||||
|
dataFileIndex = &DataFileIndex{
|
||||||
|
Index: make(map[string]int64),
|
||||||
|
}
|
||||||
|
buf = bytes.Buffer{}
|
||||||
|
pos = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if de.ID <= lastEntryID {
|
||||||
|
// entries for the same datatype must be unique and ordered
|
||||||
|
return errors.Errorf("entry id %q is less or equal than previous entry id %q", de.ID, lastEntryID)
|
||||||
|
}
|
||||||
|
lastEntryID = de.ID
|
||||||
|
|
||||||
|
dataEntryj, err := json.Marshal(de)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := buf.Write(dataEntryj); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dataFileIndex.Index[de.ID] = pos
|
||||||
|
pos += int64(len(dataEntryj))
|
||||||
|
}
|
||||||
|
|
||||||
|
dataStatusj, err := json.Marshal(dataStatus)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize etcd providing the specific datastatus
|
||||||
|
if err := d.InitEtcd(ctx, dataStatus); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
package datamanager
|
package datamanager
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -31,6 +32,7 @@ import (
|
||||||
"agola.io/agola/internal/objectstorage/posix"
|
"agola.io/agola/internal/objectstorage/posix"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
ostypes "agola.io/agola/internal/objectstorage/types"
|
||||||
"agola.io/agola/internal/testutil"
|
"agola.io/agola/internal/testutil"
|
||||||
|
"agola.io/agola/internal/util"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
|
@ -738,92 +740,97 @@ func testCheckpoint(t *testing.T, basePath string) {
|
||||||
func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error {
|
func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error {
|
||||||
// read the data file
|
// read the data file
|
||||||
curDataStatus, err := dm.GetLastDataStatus()
|
curDataStatus, err := dm.GetLastDataStatus()
|
||||||
|
t.Logf("curDataStatus: %s", util.Dump(curDataStatus))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
allEntriesMap := map[string]*DataEntry{}
|
allEntriesMap := map[string]*DataEntry{}
|
||||||
var prevLastEntryID string
|
|
||||||
|
|
||||||
for i, file := range curDataStatus.Files["datatype01"] {
|
for dataType := range curDataStatus.Files {
|
||||||
dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath("datatype01", file.ID))
|
var prevLastEntryID string
|
||||||
if err != nil {
|
t.Logf("dataType: %q", dataType)
|
||||||
return err
|
for i, file := range curDataStatus.Files[dataType] {
|
||||||
}
|
t.Logf("data file: %d %q", i, file)
|
||||||
var dataFileIndex *DataFileIndex
|
dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath(dataType, file.ID))
|
||||||
dec := json.NewDecoder(dataFileIndexf)
|
|
||||||
err = dec.Decode(&dataFileIndex)
|
|
||||||
if err != nil {
|
|
||||||
dataFileIndexf.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
dataFileIndexf.Close()
|
|
||||||
dataEntriesMap := map[string]*DataEntry{}
|
|
||||||
dataEntries := []*DataEntry{}
|
|
||||||
dataf, err := dm.ost.ReadObject(dm.DataFilePath("datatype01", file.ID))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
dec = json.NewDecoder(dataf)
|
|
||||||
var prevEntryID string
|
|
||||||
for {
|
|
||||||
var de *DataEntry
|
|
||||||
|
|
||||||
err := dec.Decode(&de)
|
|
||||||
if err == io.EOF {
|
|
||||||
// all done
|
|
||||||
break
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
dataf.Close()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// check that there are no duplicate entries
|
var dataFileIndex *DataFileIndex
|
||||||
if _, ok := allEntriesMap[de.ID]; ok {
|
dec := json.NewDecoder(dataFileIndexf)
|
||||||
return fmt.Errorf("duplicate entry id: %s", de.ID)
|
err = dec.Decode(&dataFileIndex)
|
||||||
}
|
if err != nil {
|
||||||
// check that the entries are in order
|
dataFileIndexf.Close()
|
||||||
if de.ID < prevEntryID {
|
return err
|
||||||
return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dataEntriesMap[de.ID] = de
|
dataFileIndexf.Close()
|
||||||
dataEntries = append(dataEntries, de)
|
dataEntriesMap := map[string]*DataEntry{}
|
||||||
allEntriesMap[de.ID] = de
|
dataEntries := []*DataEntry{}
|
||||||
}
|
dataf, err := dm.ost.ReadObject(dm.DataFilePath(dataType, file.ID))
|
||||||
dataf.Close()
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
dec = json.NewDecoder(dataf)
|
||||||
|
var prevEntryID string
|
||||||
|
for {
|
||||||
|
var de *DataEntry
|
||||||
|
|
||||||
// check that the index matches the entries
|
err := dec.Decode(&de)
|
||||||
if len(dataFileIndex.Index) != len(dataEntriesMap) {
|
if err == io.EOF {
|
||||||
return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap))
|
// all done
|
||||||
}
|
break
|
||||||
indexIDs := make([]string, len(dataFileIndex.Index))
|
}
|
||||||
entriesIDs := make([]string, len(dataEntriesMap))
|
if err != nil {
|
||||||
for id := range dataFileIndex.Index {
|
dataf.Close()
|
||||||
indexIDs = append(indexIDs, id)
|
return err
|
||||||
}
|
}
|
||||||
for id := range dataEntriesMap {
|
// check that there are no duplicate entries
|
||||||
entriesIDs = append(entriesIDs, id)
|
if _, ok := allEntriesMap[de.ID]; ok {
|
||||||
}
|
return fmt.Errorf("duplicate entry id: %s", de.ID)
|
||||||
sort.Strings(indexIDs)
|
}
|
||||||
sort.Strings(entriesIDs)
|
// check that the entries are in order
|
||||||
if !reflect.DeepEqual(indexIDs, entriesIDs) {
|
if de.ID < prevEntryID {
|
||||||
return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs)
|
return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
if file.LastEntryID != dataEntries[len(dataEntries)-1].ID {
|
dataEntriesMap[de.ID] = de
|
||||||
return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID)
|
dataEntries = append(dataEntries, de)
|
||||||
}
|
allEntriesMap[de.ID] = de
|
||||||
|
}
|
||||||
|
dataf.Close()
|
||||||
|
|
||||||
// check that all the files are in order
|
// check that the index matches the entries
|
||||||
if file.LastEntryID == prevLastEntryID {
|
if len(dataFileIndex.Index) != len(dataEntriesMap) {
|
||||||
return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID)
|
return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap))
|
||||||
|
}
|
||||||
|
indexIDs := make([]string, len(dataFileIndex.Index))
|
||||||
|
entriesIDs := make([]string, len(dataEntriesMap))
|
||||||
|
for id := range dataFileIndex.Index {
|
||||||
|
indexIDs = append(indexIDs, id)
|
||||||
|
}
|
||||||
|
for id := range dataEntriesMap {
|
||||||
|
entriesIDs = append(entriesIDs, id)
|
||||||
|
}
|
||||||
|
sort.Strings(indexIDs)
|
||||||
|
sort.Strings(entriesIDs)
|
||||||
|
if !reflect.DeepEqual(indexIDs, entriesIDs) {
|
||||||
|
return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs)
|
||||||
|
}
|
||||||
|
|
||||||
|
if file.LastEntryID != dataEntries[len(dataEntries)-1].ID {
|
||||||
|
return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that all the files are in order
|
||||||
|
if file.LastEntryID == prevLastEntryID {
|
||||||
|
return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID)
|
||||||
|
}
|
||||||
|
if file.LastEntryID < prevLastEntryID {
|
||||||
|
return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID)
|
||||||
|
}
|
||||||
|
prevLastEntryID = file.LastEntryID
|
||||||
}
|
}
|
||||||
if file.LastEntryID < prevLastEntryID {
|
|
||||||
return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID)
|
|
||||||
}
|
|
||||||
prevLastEntryID = file.LastEntryID
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// check that the number of entries is right
|
// check that the number of entries is right
|
||||||
|
@ -836,3 +843,259 @@ func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expected
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestExportImport(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "agola")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
etcdDir, err := ioutil.TempDir(dir, "etcd")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
tetcd := setupEtcd(t, etcdDir)
|
||||||
|
defer shutdownEtcd(tetcd)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
ostDir, err := ioutil.TempDir(dir, "ost")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
ost, err := posix.New(ostDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dmConfig := &DataManagerConfig{
|
||||||
|
BasePath: "basepath",
|
||||||
|
E: tetcd.TestEtcd.Store,
|
||||||
|
OST: objectstorage.NewObjStorage(ost, "/"),
|
||||||
|
// remove almost all wals to see that they are removed also from changes
|
||||||
|
EtcdWalsKeepNum: 1,
|
||||||
|
DataTypes: []string{"datatype01", "datatype02"},
|
||||||
|
// checkpoint also with only one wal
|
||||||
|
MinCheckpointWalsNum: 1,
|
||||||
|
// use a small maxDataFileSize
|
||||||
|
MaxDataFileSize: 10 * 1024,
|
||||||
|
}
|
||||||
|
dm, err := NewDataManager(ctx, logger, dmConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
dmReadyCh := make(chan struct{})
|
||||||
|
go func() { _ = dm.Run(ctx, dmReadyCh) }()
|
||||||
|
<-dmReadyCh
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||||
|
|
||||||
|
expectedEntries := map[string]*DataEntry{}
|
||||||
|
|
||||||
|
// test insert from scratch (no current entries)
|
||||||
|
actionGroups := [][]*Action{}
|
||||||
|
actions := []*Action{}
|
||||||
|
for i := 200; i < 400; i++ {
|
||||||
|
action := &Action{
|
||||||
|
ActionType: ActionTypePut,
|
||||||
|
ID: fmt.Sprintf("object%04d", i),
|
||||||
|
DataType: "datatype01",
|
||||||
|
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
|
||||||
|
}
|
||||||
|
actions = append(actions, action)
|
||||||
|
}
|
||||||
|
actionGroups = append(actionGroups, actions)
|
||||||
|
|
||||||
|
actions = []*Action{}
|
||||||
|
for i := 600; i < 1000; i++ {
|
||||||
|
action := &Action{
|
||||||
|
ActionType: ActionTypePut,
|
||||||
|
ID: fmt.Sprintf("object%04d", i),
|
||||||
|
DataType: "datatype02",
|
||||||
|
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
|
||||||
|
}
|
||||||
|
actions = append(actions, action)
|
||||||
|
}
|
||||||
|
actionGroups = append(actionGroups, actions)
|
||||||
|
|
||||||
|
for _, actionGroup := range actionGroups {
|
||||||
|
for _, action := range actionGroup {
|
||||||
|
switch action.ActionType {
|
||||||
|
case ActionTypePut:
|
||||||
|
expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
|
||||||
|
case ActionTypeDelete:
|
||||||
|
delete(expectedEntries, action.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, actionGroup := range actionGroups {
|
||||||
|
// populate with a wal
|
||||||
|
_, err := dm.WriteWal(ctx, actionGroup, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the event to be read
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
|
var export bytes.Buffer
|
||||||
|
if err := dm.Export(ctx, &export); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("stopping datamanager")
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
t.Logf("stopping etcd")
|
||||||
|
// Reset etcd
|
||||||
|
shutdownEtcd(tetcd)
|
||||||
|
if err := tetcd.WaitDown(10 * time.Second); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
t.Logf("resetting etcd")
|
||||||
|
os.RemoveAll(etcdDir)
|
||||||
|
t.Logf("starting etcd")
|
||||||
|
tetcd = setupEtcd(t, etcdDir)
|
||||||
|
if err := tetcd.Start(); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
defer shutdownEtcd(tetcd)
|
||||||
|
|
||||||
|
ostDir, err = ioutil.TempDir(dir, "ost")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ost, err = posix.New(ostDir)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel = context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
dmConfig = &DataManagerConfig{
|
||||||
|
BasePath: "basepath",
|
||||||
|
E: tetcd.TestEtcd.Store,
|
||||||
|
OST: objectstorage.NewObjStorage(ost, "/"),
|
||||||
|
// remove almost all wals to see that they are removed also from changes
|
||||||
|
EtcdWalsKeepNum: 1,
|
||||||
|
DataTypes: []string{"datatype01", "datatype02"},
|
||||||
|
// checkpoint also with only one wal
|
||||||
|
MinCheckpointWalsNum: 1,
|
||||||
|
// use a small maxDataFileSize
|
||||||
|
MaxDataFileSize: 10 * 1024,
|
||||||
|
MaintenanceMode: true,
|
||||||
|
}
|
||||||
|
dm, err = NewDataManager(ctx, logger, dmConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
dmReadyCh = make(chan struct{})
|
||||||
|
go func() { _ = dm.Run(ctx, dmReadyCh) }()
|
||||||
|
<-dmReadyCh
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
if err := dm.Import(ctx, &export); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("stopping datamanager")
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
ctx = context.Background()
|
||||||
|
|
||||||
|
// restart datamanager in normal mode
|
||||||
|
dmConfig = &DataManagerConfig{
|
||||||
|
BasePath: "basepath",
|
||||||
|
E: tetcd.TestEtcd.Store,
|
||||||
|
OST: objectstorage.NewObjStorage(ost, "/"),
|
||||||
|
// remove almost all wals to see that they are removed also from changes
|
||||||
|
EtcdWalsKeepNum: 1,
|
||||||
|
DataTypes: []string{"datatype01", "datatype02"},
|
||||||
|
// checkpoint also with only one wal
|
||||||
|
MinCheckpointWalsNum: 1,
|
||||||
|
// use a small maxDataFileSize
|
||||||
|
MaxDataFileSize: 10 * 1024,
|
||||||
|
}
|
||||||
|
dm, err = NewDataManager(ctx, logger, dmConfig)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
dmReadyCh = make(chan struct{})
|
||||||
|
go func() { _ = dm.Run(ctx, dmReadyCh) }()
|
||||||
|
<-dmReadyCh
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actionGroups = [][]*Action{}
|
||||||
|
actions = []*Action{}
|
||||||
|
for i := 400; i < 600; i++ {
|
||||||
|
action := &Action{
|
||||||
|
ActionType: ActionTypePut,
|
||||||
|
ID: fmt.Sprintf("object%04d", i),
|
||||||
|
DataType: "datatype01",
|
||||||
|
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
|
||||||
|
}
|
||||||
|
actions = append(actions, action)
|
||||||
|
}
|
||||||
|
actionGroups = append(actionGroups, actions)
|
||||||
|
|
||||||
|
actions = []*Action{}
|
||||||
|
for i := 1000; i < 1400; i++ {
|
||||||
|
action := &Action{
|
||||||
|
ActionType: ActionTypePut,
|
||||||
|
ID: fmt.Sprintf("object%04d", i),
|
||||||
|
DataType: "datatype02",
|
||||||
|
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
|
||||||
|
}
|
||||||
|
actions = append(actions, action)
|
||||||
|
}
|
||||||
|
actionGroups = append(actionGroups, actions)
|
||||||
|
|
||||||
|
for _, actionGroup := range actionGroups {
|
||||||
|
for _, action := range actionGroup {
|
||||||
|
switch action.ActionType {
|
||||||
|
case ActionTypePut:
|
||||||
|
expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
|
||||||
|
case ActionTypeDelete:
|
||||||
|
delete(expectedEntries, action.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, actionGroup := range actionGroups {
|
||||||
|
// populate with a wal
|
||||||
|
_, err := dm.WriteWal(ctx, actionGroup, nil)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the event to be read
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
|
||||||
|
if err := dm.checkpoint(ctx, false); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue