Merge pull request #165 from sgotti/datamanager_check_wal_previousequence_is_correct
datamanager: check wal previouswalsequence is correct in initEtcd
This commit is contained in:
commit
cd5c5a20ed
@ -22,8 +22,10 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -33,6 +35,7 @@ import (
|
||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
||||
"agola.io/agola/internal/testutil"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
errors "golang.org/x/xerrors"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
@ -171,6 +174,135 @@ func TestEtcdReset(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdResetWalsGap(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "agola")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
etcdDir, err := ioutil.TempDir(dir, "etcd")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
tetcd := setupEtcd(t, etcdDir)
|
||||
defer shutdownEtcd(tetcd)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
ostDir, err := ioutil.TempDir(dir, "ost")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
ost, err := posix.New(ostDir)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
dmConfig := &DataManagerConfig{
|
||||
BasePath: "basepath",
|
||||
E: tetcd.TestEtcd.Store,
|
||||
OST: objectstorage.NewObjStorage(ost, "/"),
|
||||
EtcdWalsKeepNum: 10,
|
||||
DataTypes: []string{"datatype01"},
|
||||
}
|
||||
dm, err := NewDataManager(ctx, logger, dmConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
dmReadyCh := make(chan struct{})
|
||||
|
||||
t.Logf("starting datamanager")
|
||||
go func() { _ = dm.Run(ctx, dmReadyCh) }()
|
||||
<-dmReadyCh
|
||||
|
||||
actions := []*Action{
|
||||
{
|
||||
ActionType: ActionTypePut,
|
||||
DataType: "datatype01",
|
||||
Data: []byte("{}"),
|
||||
},
|
||||
}
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
objectID := fmt.Sprintf("object%02d", i)
|
||||
actions[0].ID = objectID
|
||||
if _, err := dm.WriteWal(ctx, actions, nil); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for wal to be committed storage
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
t.Logf("stopping datamanager")
|
||||
cancel()
|
||||
|
||||
t.Logf("stopping etcd")
|
||||
// Reset etcd
|
||||
shutdownEtcd(tetcd)
|
||||
if err := tetcd.WaitDown(10 * time.Second); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
t.Logf("resetting etcd")
|
||||
os.RemoveAll(etcdDir)
|
||||
t.Logf("starting etcd")
|
||||
tetcd = setupEtcd(t, etcdDir)
|
||||
if err := tetcd.Start(); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
defer shutdownEtcd(tetcd)
|
||||
|
||||
// Remove a wal in the middle
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
|
||||
walStatusFiles := []string{}
|
||||
for object := range dm.ost.List(path.Join(dm.basePath, storageWalsStatusDir)+"/", "", true, doneCh) {
|
||||
if object.Err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
walStatusFiles = append(walStatusFiles, object.Path)
|
||||
}
|
||||
if len(walStatusFiles) < 20 {
|
||||
t.Fatalf("exptected at least 20 wals, got: %d wals", len(walStatusFiles))
|
||||
}
|
||||
|
||||
removeIndex := 10
|
||||
if err := dm.ost.DeleteObject(walStatusFiles[removeIndex]); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
errorWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex+1]), path.Ext(walStatusFiles[removeIndex+1]))
|
||||
prevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex]), path.Ext(walStatusFiles[removeIndex]))
|
||||
expectedPrevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex-1]), path.Ext(walStatusFiles[removeIndex-1]))
|
||||
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
dmConfig = &DataManagerConfig{
|
||||
BasePath: "basepath",
|
||||
E: tetcd.TestEtcd.Store,
|
||||
OST: objectstorage.NewObjStorage(ost, "/"),
|
||||
EtcdWalsKeepNum: 10,
|
||||
DataTypes: []string{"datatype01"},
|
||||
}
|
||||
dm, err = NewDataManager(ctx, logger, dmConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
dmReadyCh = make(chan struct{})
|
||||
|
||||
expectedErr := errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", errorWalSequence, prevWalSequence, expectedPrevWalSequence)
|
||||
err = dm.InitEtcd(ctx, nil)
|
||||
if err == nil {
|
||||
t.Fatalf("expected err: %q, got nil error", expectedErr)
|
||||
}
|
||||
if expectedErr.Error() != err.Error() {
|
||||
t.Fatalf("expected err: %q, got err %q", expectedErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentUpdate(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "agola")
|
||||
if err != nil {
|
||||
|
@ -923,7 +923,7 @@ func (d *DataManager) etcdPinger(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error {
|
||||
writeWal := func(wal *WalFile) error {
|
||||
writeWal := func(wal *WalFile, prevWalSequence string) error {
|
||||
walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -936,6 +936,12 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
|
||||
}
|
||||
walFile.Close()
|
||||
|
||||
if prevWalSequence != "" {
|
||||
if header.PreviousWalSequence != "" && header.PreviousWalSequence != prevWalSequence {
|
||||
return errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", wal.WalSequence, header.PreviousWalSequence, prevWalSequence)
|
||||
}
|
||||
}
|
||||
|
||||
walData := &WalData{
|
||||
WalSequence: wal.WalSequence,
|
||||
WalDataFileID: header.WalDataFileID,
|
||||
@ -1034,6 +1040,7 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
|
||||
// So take all the wals in committed or checkpointed state starting from the
|
||||
// first not checkpointed wal and put them in etcd
|
||||
lastCommittedStorageWalSequence := ""
|
||||
previousWalSequence := ""
|
||||
wroteWals := 0
|
||||
for wal := range d.ListOSTWals("") {
|
||||
// if there're wals in ost but not a datastatus return an error
|
||||
@ -1051,9 +1058,10 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
|
||||
|
||||
lastCommittedStorageWalSequence = wal.WalSequence
|
||||
|
||||
if err := writeWal(wal); err != nil {
|
||||
if err := writeWal(wal, previousWalSequence); err != nil {
|
||||
return err
|
||||
}
|
||||
previousWalSequence = wal.WalSequence
|
||||
wroteWals++
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user