diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 5354cbb..a639638 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -22,8 +22,10 @@ import ( "io" "io/ioutil" "os" + "path" "reflect" "sort" + "strings" "testing" "time" @@ -33,6 +35,7 @@ import ( ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/testutil" "github.com/google/go-cmp/cmp" + errors "golang.org/x/xerrors" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -171,6 +174,135 @@ func TestEtcdReset(t *testing.T) { } } +func TestEtcdResetWalsGap(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx, cancel := context.WithCancel(context.Background()) + + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + EtcdWalsKeepNum: 10, + DataTypes: []string{"datatype01"}, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh := make(chan struct{}) + + t.Logf("starting datamanager") + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + actions := []*Action{ + { + ActionType: ActionTypePut, + DataType: "datatype01", + Data: []byte("{}"), + }, + } + + for i := 0; i < 20; i++ { + objectID := fmt.Sprintf("object%02d", i) + actions[0].ID = objectID + if _, err := dm.WriteWal(ctx, actions, nil); err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // wait for wal to be committed storage + time.Sleep(5 * time.Second) + + t.Logf("stopping datamanager") + cancel() + + t.Logf("stopping etcd") + // Reset etcd + shutdownEtcd(tetcd) + if err := tetcd.WaitDown(10 * time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + t.Logf("resetting etcd") + os.RemoveAll(etcdDir) + t.Logf("starting etcd") + tetcd = setupEtcd(t, etcdDir) + if err := tetcd.Start(); err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer shutdownEtcd(tetcd) + + // Remove a wal in the middle + doneCh := make(chan struct{}) + defer close(doneCh) + + walStatusFiles := []string{} + for object := range dm.ost.List(path.Join(dm.basePath, storageWalsStatusDir)+"/", "", true, doneCh) { + if object.Err != nil { + t.Fatalf("unexpected err: %v", err) + } + + walStatusFiles = append(walStatusFiles, object.Path) + } + if len(walStatusFiles) < 20 { + t.Fatalf("exptected at least 20 wals, got: %d wals", len(walStatusFiles)) + } + + removeIndex := 10 + if err := dm.ost.DeleteObject(walStatusFiles[removeIndex]); err != nil { + t.Fatalf("unexpected err: %v", err) + } + errorWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex+1]), path.Ext(walStatusFiles[removeIndex+1])) + prevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex]), path.Ext(walStatusFiles[removeIndex])) + expectedPrevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex-1]), path.Ext(walStatusFiles[removeIndex-1])) + + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + dmConfig = &DataManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + EtcdWalsKeepNum: 10, + DataTypes: []string{"datatype01"}, + } + dm, err = NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh = make(chan struct{}) + + expectedErr := errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", errorWalSequence, prevWalSequence, expectedPrevWalSequence) + err = dm.InitEtcd(ctx, nil) + if err == nil { + t.Fatalf("expected err: %q, got nil error", expectedErr) + } + if expectedErr.Error() != err.Error() { + t.Fatalf("expected err: %q, got err %q", expectedErr, err) + } +} + func TestConcurrentUpdate(t *testing.T) { dir, err := ioutil.TempDir("", "agola") if err != nil { diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 8040ddf..4bc583a 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -923,7 +923,7 @@ func (d *DataManager) etcdPinger(ctx context.Context) error { } func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error { - writeWal := func(wal *WalFile) error { + writeWal := func(wal *WalFile, prevWalSequence string) error { walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed") if err != nil { return err @@ -936,6 +936,12 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro } walFile.Close() + if prevWalSequence != "" { + if header.PreviousWalSequence != "" && header.PreviousWalSequence != prevWalSequence { + return errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", wal.WalSequence, header.PreviousWalSequence, prevWalSequence) + } + } + walData := &WalData{ WalSequence: wal.WalSequence, WalDataFileID: header.WalDataFileID, @@ -1034,6 +1040,7 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro // So take all the wals in committed or checkpointed state starting from the // first not checkpointed wal and put them in etcd lastCommittedStorageWalSequence := "" + previousWalSequence := "" wroteWals := 0 for wal := range d.ListOSTWals("") { // if there're wals in ost but not a datastatus return an error @@ -1051,9 +1058,10 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro lastCommittedStorageWalSequence = wal.WalSequence - if err := writeWal(wal); err != nil { + if err := writeWal(wal, previousWalSequence); err != nil { return err } + previousWalSequence = wal.WalSequence wroteWals++ }