Merge pull request #63 from sgotti/datamanager_export_import

datamanager: implement import/export
This commit is contained in:
Simone Gotti 2019-07-25 14:49:16 +02:00 committed by GitHub
commit 5c87d89c72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 496 additions and 71 deletions

View File

@ -558,3 +558,165 @@ func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
return dataStatus, dec.Decode(&dataStatus)
}
func (d *DataManager) Export(ctx context.Context, w io.Writer) error {
if err := d.checkpoint(ctx, true); err != nil {
return err
}
curDataStatus, err := d.GetLastDataStatus()
if err != nil {
return err
}
for _, dataType := range d.dataTypes {
var curDataStatusFiles []*DataStatusFile
if curDataStatus != nil {
curDataStatusFiles = curDataStatus.Files[dataType]
}
for _, dsf := range curDataStatusFiles {
dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, dsf.ID))
if err != nil {
return err
}
if _, err := io.Copy(w, dataf); err != nil {
dataf.Close()
return err
}
dataf.Close()
}
}
return nil
}
func (d *DataManager) Import(ctx context.Context, r io.Reader) error {
// delete contents in etcd
if err := d.deleteEtcd(ctx); err != nil {
return err
}
// we require all entries of the same datatypes grouped together
seenDataTypes := map[string]struct{}{}
// create a new sequence, we assume that it'll be greater than previous data sequences
dataSequence, err := sequence.IncSequence(ctx, d.e, etcdCheckpointSeqKey)
if err != nil {
return err
}
dataStatus := &DataStatus{
DataSequence: dataSequence.String(),
// no last wal sequence on import
WalSequence: "",
Files: make(map[string][]*DataStatusFile),
}
dataStatusFiles := []*DataStatusFile{}
var lastEntryID string
var curDataType string
var buf bytes.Buffer
var pos int64
dataFileIndex := &DataFileIndex{
Index: make(map[string]int64),
}
dec := json.NewDecoder(r)
for {
var de *DataEntry
err := dec.Decode(&de)
if err == io.EOF {
dataFileID := uuid.NewV4().String()
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
return err
}
dataStatusFiles = append(dataStatusFiles, &DataStatusFile{
ID: dataFileID,
LastEntryID: lastEntryID,
})
dataStatus.Files[curDataType] = dataStatusFiles
break
}
if curDataType == "" {
curDataType = de.DataType
seenDataTypes[de.DataType] = struct{}{}
}
mustWrite := false
mustReset := false
if pos > d.maxDataFileSize {
mustWrite = true
}
if curDataType != de.DataType {
if _, ok := seenDataTypes[de.DataType]; ok {
return errors.Errorf("dataType %q already imported", de.DataType)
}
mustWrite = true
mustReset = true
}
if mustWrite {
dataFileID := uuid.NewV4().String()
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
return err
}
dataStatusFiles = append(dataStatusFiles, &DataStatusFile{
ID: dataFileID,
LastEntryID: lastEntryID,
})
if mustReset {
dataStatus.Files[curDataType] = dataStatusFiles
dataStatusFiles = []*DataStatusFile{}
curDataType = de.DataType
lastEntryID = ""
}
dataFileIndex = &DataFileIndex{
Index: make(map[string]int64),
}
buf = bytes.Buffer{}
pos = 0
}
if de.ID <= lastEntryID {
// entries for the same datatype must be unique and ordered
return errors.Errorf("entry id %q is less or equal than previous entry id %q", de.ID, lastEntryID)
}
lastEntryID = de.ID
dataEntryj, err := json.Marshal(de)
if err != nil {
return err
}
if _, err := buf.Write(dataEntryj); err != nil {
return err
}
dataFileIndex.Index[de.ID] = pos
pos += int64(len(dataEntryj))
}
dataStatusj, err := json.Marshal(dataStatus)
if err != nil {
return err
}
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err
}
// initialize etcd providing the specific datastatus
if err := d.InitEtcd(ctx, dataStatus); err != nil {
return err
}
return nil
}

View File

@ -15,6 +15,7 @@
package datamanager
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -31,6 +32,7 @@ import (
"agola.io/agola/internal/objectstorage/posix"
ostypes "agola.io/agola/internal/objectstorage/types"
"agola.io/agola/internal/testutil"
"agola.io/agola/internal/util"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -738,92 +740,97 @@ func testCheckpoint(t *testing.T, basePath string) {
func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error {
// read the data file
curDataStatus, err := dm.GetLastDataStatus()
t.Logf("curDataStatus: %s", util.Dump(curDataStatus))
if err != nil {
return err
}
allEntriesMap := map[string]*DataEntry{}
var prevLastEntryID string
for i, file := range curDataStatus.Files["datatype01"] {
dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath("datatype01", file.ID))
if err != nil {
return err
}
var dataFileIndex *DataFileIndex
dec := json.NewDecoder(dataFileIndexf)
err = dec.Decode(&dataFileIndex)
if err != nil {
dataFileIndexf.Close()
return err
}
dataFileIndexf.Close()
dataEntriesMap := map[string]*DataEntry{}
dataEntries := []*DataEntry{}
dataf, err := dm.ost.ReadObject(dm.DataFilePath("datatype01", file.ID))
if err != nil {
return err
}
dec = json.NewDecoder(dataf)
var prevEntryID string
for {
var de *DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
for dataType := range curDataStatus.Files {
var prevLastEntryID string
t.Logf("dataType: %q", dataType)
for i, file := range curDataStatus.Files[dataType] {
t.Logf("data file: %d %q", i, file)
dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath(dataType, file.ID))
if err != nil {
dataf.Close()
return err
}
// check that there are no duplicate entries
if _, ok := allEntriesMap[de.ID]; ok {
return fmt.Errorf("duplicate entry id: %s", de.ID)
}
// check that the entries are in order
if de.ID < prevEntryID {
return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID)
var dataFileIndex *DataFileIndex
dec := json.NewDecoder(dataFileIndexf)
err = dec.Decode(&dataFileIndex)
if err != nil {
dataFileIndexf.Close()
return err
}
dataEntriesMap[de.ID] = de
dataEntries = append(dataEntries, de)
allEntriesMap[de.ID] = de
}
dataf.Close()
dataFileIndexf.Close()
dataEntriesMap := map[string]*DataEntry{}
dataEntries := []*DataEntry{}
dataf, err := dm.ost.ReadObject(dm.DataFilePath(dataType, file.ID))
if err != nil {
return err
}
dec = json.NewDecoder(dataf)
var prevEntryID string
for {
var de *DataEntry
// check that the index matches the entries
if len(dataFileIndex.Index) != len(dataEntriesMap) {
return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap))
}
indexIDs := make([]string, len(dataFileIndex.Index))
entriesIDs := make([]string, len(dataEntriesMap))
for id := range dataFileIndex.Index {
indexIDs = append(indexIDs, id)
}
for id := range dataEntriesMap {
entriesIDs = append(entriesIDs, id)
}
sort.Strings(indexIDs)
sort.Strings(entriesIDs)
if !reflect.DeepEqual(indexIDs, entriesIDs) {
return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs)
}
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil {
dataf.Close()
return err
}
// check that there are no duplicate entries
if _, ok := allEntriesMap[de.ID]; ok {
return fmt.Errorf("duplicate entry id: %s", de.ID)
}
// check that the entries are in order
if de.ID < prevEntryID {
return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID)
}
if file.LastEntryID != dataEntries[len(dataEntries)-1].ID {
return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID)
}
dataEntriesMap[de.ID] = de
dataEntries = append(dataEntries, de)
allEntriesMap[de.ID] = de
}
dataf.Close()
// check that all the files are in order
if file.LastEntryID == prevLastEntryID {
return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID)
// check that the index matches the entries
if len(dataFileIndex.Index) != len(dataEntriesMap) {
return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap))
}
indexIDs := make([]string, len(dataFileIndex.Index))
entriesIDs := make([]string, len(dataEntriesMap))
for id := range dataFileIndex.Index {
indexIDs = append(indexIDs, id)
}
for id := range dataEntriesMap {
entriesIDs = append(entriesIDs, id)
}
sort.Strings(indexIDs)
sort.Strings(entriesIDs)
if !reflect.DeepEqual(indexIDs, entriesIDs) {
return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs)
}
if file.LastEntryID != dataEntries[len(dataEntries)-1].ID {
return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID)
}
// check that all the files are in order
if file.LastEntryID == prevLastEntryID {
return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID)
}
if file.LastEntryID < prevLastEntryID {
return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID)
}
prevLastEntryID = file.LastEntryID
}
if file.LastEntryID < prevLastEntryID {
return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID)
}
prevLastEntryID = file.LastEntryID
}
// check that the number of entries is right
@ -836,3 +843,259 @@ func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expected
return nil
}
func TestExportImport(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx, cancel := context.WithCancel(context.Background())
ostDir, err := ioutil.TempDir(dir, "ost")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ost, err := posix.New(ostDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmConfig := &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01", "datatype02"},
// checkpoint also with only one wal
MinCheckpointWalsNum: 1,
// use a small maxDataFileSize
MaxDataFileSize: 10 * 1024,
}
dm, err := NewDataManager(ctx, logger, dmConfig)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmReadyCh := make(chan struct{})
go func() { _ = dm.Run(ctx, dmReadyCh) }()
<-dmReadyCh
time.Sleep(5 * time.Second)
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
expectedEntries := map[string]*DataEntry{}
// test insert from scratch (no current entries)
actionGroups := [][]*Action{}
actions := []*Action{}
for i := 200; i < 400; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
actionGroups = append(actionGroups, actions)
actions = []*Action{}
for i := 600; i < 1000; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype02",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
actionGroups = append(actionGroups, actions)
for _, actionGroup := range actionGroups {
for _, action := range actionGroup {
switch action.ActionType {
case ActionTypePut:
expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
case ActionTypeDelete:
delete(expectedEntries, action.ID)
}
}
}
for _, actionGroup := range actionGroups {
// populate with a wal
_, err := dm.WriteWal(ctx, actionGroup, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// wait for the event to be read
time.Sleep(500 * time.Millisecond)
var export bytes.Buffer
if err := dm.Export(ctx, &export); err != nil {
t.Fatalf("unexpected err: %v", err)
}
t.Logf("stopping datamanager")
cancel()
time.Sleep(5 * time.Second)
t.Logf("stopping etcd")
// Reset etcd
shutdownEtcd(tetcd)
if err := tetcd.WaitDown(10 * time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
t.Logf("resetting etcd")
os.RemoveAll(etcdDir)
t.Logf("starting etcd")
tetcd = setupEtcd(t, etcdDir)
if err := tetcd.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer shutdownEtcd(tetcd)
ostDir, err = ioutil.TempDir(dir, "ost")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ost, err = posix.New(ostDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ctx, cancel = context.WithCancel(context.Background())
dmConfig = &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01", "datatype02"},
// checkpoint also with only one wal
MinCheckpointWalsNum: 1,
// use a small maxDataFileSize
MaxDataFileSize: 10 * 1024,
MaintenanceMode: true,
}
dm, err = NewDataManager(ctx, logger, dmConfig)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmReadyCh = make(chan struct{})
go func() { _ = dm.Run(ctx, dmReadyCh) }()
<-dmReadyCh
time.Sleep(5 * time.Second)
if err := dm.Import(ctx, &export); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
t.Fatalf("unexpected err: %v", err)
}
t.Logf("stopping datamanager")
cancel()
time.Sleep(5 * time.Second)
ctx = context.Background()
// restart datamanager in normal mode
dmConfig = &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01", "datatype02"},
// checkpoint also with only one wal
MinCheckpointWalsNum: 1,
// use a small maxDataFileSize
MaxDataFileSize: 10 * 1024,
}
dm, err = NewDataManager(ctx, logger, dmConfig)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmReadyCh = make(chan struct{})
go func() { _ = dm.Run(ctx, dmReadyCh) }()
<-dmReadyCh
time.Sleep(5 * time.Second)
if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
t.Fatalf("unexpected err: %v", err)
}
actionGroups = [][]*Action{}
actions = []*Action{}
for i := 400; i < 600; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
actionGroups = append(actionGroups, actions)
actions = []*Action{}
for i := 1000; i < 1400; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype02",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
actionGroups = append(actionGroups, actions)
for _, actionGroup := range actionGroups {
for _, action := range actionGroup {
switch action.ActionType {
case ActionTypePut:
expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
case ActionTypeDelete:
delete(expectedEntries, action.ID)
}
}
}
for _, actionGroup := range actionGroups {
// populate with a wal
_, err := dm.WriteWal(ctx, actionGroup, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// wait for the event to be read
time.Sleep(500 * time.Millisecond)
if err := dm.checkpoint(ctx, false); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}