diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 4ae9663..5892caf 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -558,3 +558,165 @@ func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { return dataStatus, dec.Decode(&dataStatus) } + +func (d *DataManager) Export(ctx context.Context, w io.Writer) error { + if err := d.checkpoint(ctx, true); err != nil { + return err + } + + curDataStatus, err := d.GetLastDataStatus() + if err != nil { + return err + } + + for _, dataType := range d.dataTypes { + var curDataStatusFiles []*DataStatusFile + if curDataStatus != nil { + curDataStatusFiles = curDataStatus.Files[dataType] + } + for _, dsf := range curDataStatusFiles { + dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, dsf.ID)) + if err != nil { + return err + } + if _, err := io.Copy(w, dataf); err != nil { + dataf.Close() + return err + } + + dataf.Close() + } + } + + return nil +} + +func (d *DataManager) Import(ctx context.Context, r io.Reader) error { + // delete contents in etcd + if err := d.deleteEtcd(ctx); err != nil { + return err + } + + // we require all entries of the same datatypes grouped together + seenDataTypes := map[string]struct{}{} + + // create a new sequence, we assume that it'll be greater than previous data sequences + dataSequence, err := sequence.IncSequence(ctx, d.e, etcdCheckpointSeqKey) + if err != nil { + return err + } + + dataStatus := &DataStatus{ + DataSequence: dataSequence.String(), + // no last wal sequence on import + WalSequence: "", + Files: make(map[string][]*DataStatusFile), + } + + dataStatusFiles := []*DataStatusFile{} + + var lastEntryID string + var curDataType string + var buf bytes.Buffer + var pos int64 + dataFileIndex := &DataFileIndex{ + Index: make(map[string]int64), + } + dec := json.NewDecoder(r) + + for { + var de *DataEntry + + err := dec.Decode(&de) + if err == io.EOF { + dataFileID := uuid.NewV4().String() + if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil { + return err + } + + dataStatusFiles = append(dataStatusFiles, &DataStatusFile{ + ID: dataFileID, + LastEntryID: lastEntryID, + }) + dataStatus.Files[curDataType] = dataStatusFiles + + break + } + + if curDataType == "" { + curDataType = de.DataType + seenDataTypes[de.DataType] = struct{}{} + } + + mustWrite := false + mustReset := false + if pos > d.maxDataFileSize { + mustWrite = true + } + + if curDataType != de.DataType { + if _, ok := seenDataTypes[de.DataType]; ok { + return errors.Errorf("dataType %q already imported", de.DataType) + } + mustWrite = true + mustReset = true + } + + if mustWrite { + dataFileID := uuid.NewV4().String() + if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil { + return err + } + + dataStatusFiles = append(dataStatusFiles, &DataStatusFile{ + ID: dataFileID, + LastEntryID: lastEntryID, + }) + + if mustReset { + dataStatus.Files[curDataType] = dataStatusFiles + + dataStatusFiles = []*DataStatusFile{} + curDataType = de.DataType + lastEntryID = "" + } + + dataFileIndex = &DataFileIndex{ + Index: make(map[string]int64), + } + buf = bytes.Buffer{} + pos = 0 + } + + if de.ID <= lastEntryID { + // entries for the same datatype must be unique and ordered + return errors.Errorf("entry id %q is less or equal than previous entry id %q", de.ID, lastEntryID) + } + lastEntryID = de.ID + + dataEntryj, err := json.Marshal(de) + if err != nil { + return err + } + if _, err := buf.Write(dataEntryj); err != nil { + return err + } + dataFileIndex.Index[de.ID] = pos + pos += int64(len(dataEntryj)) + } + + dataStatusj, err := json.Marshal(dataStatus) + if err != nil { + return err + } + if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { + return err + } + + // initialize etcd providing the specific datastatus + if err := d.InitEtcd(ctx, dataStatus); err != nil { + return err + } + + return nil +} diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 7023ecb..7ac8c6f 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -15,6 +15,7 @@ package datamanager import ( + "bytes" "context" "encoding/json" "fmt" @@ -31,6 +32,7 @@ import ( "agola.io/agola/internal/objectstorage/posix" ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/testutil" + "agola.io/agola/internal/util" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -738,92 +740,97 @@ func testCheckpoint(t *testing.T, basePath string) { func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error { // read the data file curDataStatus, err := dm.GetLastDataStatus() + t.Logf("curDataStatus: %s", util.Dump(curDataStatus)) if err != nil { return err } allEntriesMap := map[string]*DataEntry{} - var prevLastEntryID string - for i, file := range curDataStatus.Files["datatype01"] { - dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath("datatype01", file.ID)) - if err != nil { - return err - } - var dataFileIndex *DataFileIndex - dec := json.NewDecoder(dataFileIndexf) - err = dec.Decode(&dataFileIndex) - if err != nil { - dataFileIndexf.Close() - return err - } - - dataFileIndexf.Close() - dataEntriesMap := map[string]*DataEntry{} - dataEntries := []*DataEntry{} - dataf, err := dm.ost.ReadObject(dm.DataFilePath("datatype01", file.ID)) - if err != nil { - return err - } - dec = json.NewDecoder(dataf) - var prevEntryID string - for { - var de *DataEntry - - err := dec.Decode(&de) - if err == io.EOF { - // all done - break - } + for dataType := range curDataStatus.Files { + var prevLastEntryID string + t.Logf("dataType: %q", dataType) + for i, file := range curDataStatus.Files[dataType] { + t.Logf("data file: %d %q", i, file) + dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath(dataType, file.ID)) if err != nil { - dataf.Close() return err } - // check that there are no duplicate entries - if _, ok := allEntriesMap[de.ID]; ok { - return fmt.Errorf("duplicate entry id: %s", de.ID) - } - // check that the entries are in order - if de.ID < prevEntryID { - return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID) + var dataFileIndex *DataFileIndex + dec := json.NewDecoder(dataFileIndexf) + err = dec.Decode(&dataFileIndex) + if err != nil { + dataFileIndexf.Close() + return err } - dataEntriesMap[de.ID] = de - dataEntries = append(dataEntries, de) - allEntriesMap[de.ID] = de - } - dataf.Close() + dataFileIndexf.Close() + dataEntriesMap := map[string]*DataEntry{} + dataEntries := []*DataEntry{} + dataf, err := dm.ost.ReadObject(dm.DataFilePath(dataType, file.ID)) + if err != nil { + return err + } + dec = json.NewDecoder(dataf) + var prevEntryID string + for { + var de *DataEntry - // check that the index matches the entries - if len(dataFileIndex.Index) != len(dataEntriesMap) { - return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap)) - } - indexIDs := make([]string, len(dataFileIndex.Index)) - entriesIDs := make([]string, len(dataEntriesMap)) - for id := range dataFileIndex.Index { - indexIDs = append(indexIDs, id) - } - for id := range dataEntriesMap { - entriesIDs = append(entriesIDs, id) - } - sort.Strings(indexIDs) - sort.Strings(entriesIDs) - if !reflect.DeepEqual(indexIDs, entriesIDs) { - return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs) - } + err := dec.Decode(&de) + if err == io.EOF { + // all done + break + } + if err != nil { + dataf.Close() + return err + } + // check that there are no duplicate entries + if _, ok := allEntriesMap[de.ID]; ok { + return fmt.Errorf("duplicate entry id: %s", de.ID) + } + // check that the entries are in order + if de.ID < prevEntryID { + return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID) + } - if file.LastEntryID != dataEntries[len(dataEntries)-1].ID { - return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID) - } + dataEntriesMap[de.ID] = de + dataEntries = append(dataEntries, de) + allEntriesMap[de.ID] = de + } + dataf.Close() - // check that all the files are in order - if file.LastEntryID == prevLastEntryID { - return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID) + // check that the index matches the entries + if len(dataFileIndex.Index) != len(dataEntriesMap) { + return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap)) + } + indexIDs := make([]string, len(dataFileIndex.Index)) + entriesIDs := make([]string, len(dataEntriesMap)) + for id := range dataFileIndex.Index { + indexIDs = append(indexIDs, id) + } + for id := range dataEntriesMap { + entriesIDs = append(entriesIDs, id) + } + sort.Strings(indexIDs) + sort.Strings(entriesIDs) + if !reflect.DeepEqual(indexIDs, entriesIDs) { + return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs) + } + + if file.LastEntryID != dataEntries[len(dataEntries)-1].ID { + return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID) + } + + // check that all the files are in order + if file.LastEntryID == prevLastEntryID { + return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID) + } + if file.LastEntryID < prevLastEntryID { + return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID) + } + prevLastEntryID = file.LastEntryID } - if file.LastEntryID < prevLastEntryID { - return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID) - } - prevLastEntryID = file.LastEntryID } // check that the number of entries is right @@ -836,3 +843,259 @@ func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expected return nil } + +func TestExportImport(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx, cancel := context.WithCancel(context.Background()) + + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01", "datatype02"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh := make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + time.Sleep(5 * time.Second) + + contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + expectedEntries := map[string]*DataEntry{} + + // test insert from scratch (no current entries) + actionGroups := [][]*Action{} + actions := []*Action{} + for i := 200; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + actionGroups = append(actionGroups, actions) + + actions = []*Action{} + for i := 600; i < 1000; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype02", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + actionGroups = append(actionGroups, actions) + + for _, actionGroup := range actionGroups { + for _, action := range actionGroup { + switch action.ActionType { + case ActionTypePut: + expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data} + case ActionTypeDelete: + delete(expectedEntries, action.ID) + } + } + } + + for _, actionGroup := range actionGroups { + // populate with a wal + _, err := dm.WriteWal(ctx, actionGroup, nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // wait for the event to be read + time.Sleep(500 * time.Millisecond) + + var export bytes.Buffer + if err := dm.Export(ctx, &export); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + t.Logf("stopping datamanager") + cancel() + + time.Sleep(5 * time.Second) + + t.Logf("stopping etcd") + // Reset etcd + shutdownEtcd(tetcd) + if err := tetcd.WaitDown(10 * time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + t.Logf("resetting etcd") + os.RemoveAll(etcdDir) + t.Logf("starting etcd") + tetcd = setupEtcd(t, etcdDir) + if err := tetcd.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer shutdownEtcd(tetcd) + + ostDir, err = ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ost, err = posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ctx, cancel = context.WithCancel(context.Background()) + + dmConfig = &DataManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01", "datatype02"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + MaintenanceMode: true, + } + dm, err = NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh = make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + time.Sleep(5 * time.Second) + if err := dm.Import(ctx, &export); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + t.Logf("stopping datamanager") + cancel() + + time.Sleep(5 * time.Second) + + ctx = context.Background() + + // restart datamanager in normal mode + dmConfig = &DataManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01", "datatype02"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err = NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh = make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + time.Sleep(5 * time.Second) + + if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + actionGroups = [][]*Action{} + actions = []*Action{} + for i := 400; i < 600; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + actionGroups = append(actionGroups, actions) + + actions = []*Action{} + for i := 1000; i < 1400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype02", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + actionGroups = append(actionGroups, actions) + + for _, actionGroup := range actionGroups { + for _, action := range actionGroup { + switch action.ActionType { + case ActionTypePut: + expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data} + case ActionTypeDelete: + delete(expectedEntries, action.ID) + } + } + } + + for _, actionGroup := range actionGroups { + // populate with a wal + _, err := dm.WriteWal(ctx, actionGroup, nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // wait for the event to be read + time.Sleep(500 * time.Millisecond) + + if err := dm.checkpoint(ctx, false); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil { + t.Fatalf("unexpected err: %v", err) + } +}