From 21964f90fdb58f74ac3112fc95494d85ae7c5c2c Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 17 Jul 2019 17:01:44 +0200 Subject: [PATCH 01/12] etcd: add DeletePrefix method --- internal/etcd/etcd.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/etcd/etcd.go b/internal/etcd/etcd.go index ca3dae7..84fdba7 100644 --- a/internal/etcd/etcd.go +++ b/internal/etcd/etcd.go @@ -286,6 +286,22 @@ func (s *Store) Delete(ctx context.Context, key string) error { return err } +func (s *Store) DeletePrefix(ctx context.Context, prefix string) error { + etcdv3Options := []clientv3.OpOption{} + + key := prefix + if len(key) == 0 { + key = "\x00" + etcdv3Options = append(etcdv3Options, clientv3.WithFromKey()) + } else { + etcdv3Options = append(etcdv3Options, clientv3.WithPrefix()) + } + + _, err := s.c.Delete(ctx, key, etcdv3Options...) + + return err +} + func (s *Store) AtomicDelete(ctx context.Context, key string, revision int64) (*etcdclientv3.TxnResponse, error) { cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(key), "=", revision) req := etcdclientv3.OpDelete(key) From 8fbb9fdcbe57a5a2b95615f09a18df62bb864900 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 17 Jul 2019 17:09:36 +0200 Subject: [PATCH 02/12] datamanager: add deleteEtcd method --- internal/datamanager/datamanager.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index e4de93c..d0ccc4f 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -57,10 +57,9 @@ var ( etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata") etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") + etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq") - etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq") - - etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd") + etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd") etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") @@ -172,6 +171,26 @@ func etcdWalKey(walSeq string) string { return path.Join(etcdWalsDir, walSeq) } +// deleteEtcd deletes all etcd data excluding keys used for locking +func (d *DataManager) deleteEtcd(ctx context.Context) error { + prefixes := []string{ + etcdWalsDir + "/", + etcdWalsDataKey, + etcdWalSeqKey, + etcdLastCommittedStorageWalSeqKey, + etcdCheckpointSeqKey, + etcdChangeGroupsDir + "/", + etcdChangeGroupMinRevisionKey, + } + for _, prefix := range prefixes { + if err := d.e.DeletePrefix(ctx, prefix); err != nil { + return err + } + } + + return nil +} + func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { for { err := d.InitEtcd(ctx) From 512162bf98c91b24b2ed85c77e47c4d590ac1e3e Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 17 Jul 2019 17:05:35 +0200 Subject: [PATCH 03/12] datamanager: clean etcd data before reinitialization --- internal/datamanager/wal.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index c9a8a14..d98e2d0 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -962,7 +962,26 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { } defer func() { _ = m.Unlock(ctx) }() - // Create changegroup min revision if it doesn't exists + mustInit := false + + _, err = d.e.Get(ctx, etcdWalsDataKey, 0) + if err != nil { + if err != etcd.ErrKeyNotFound { + return err + } + mustInit = true + } + + if mustInit { + d.log.Infof("no data found in etcd, initializing") + + // delete all wals from etcd + if err := d.deleteEtcd(ctx); err != nil { + return err + } + } + + // Always create changegroup min revision if it doesn't exists cmp := []etcdclientv3.Cmp{} then := []etcdclientv3.Op{} @@ -973,16 +992,10 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return etcd.FromEtcdError(err) } - _, err = d.e.Get(ctx, etcdWalsDataKey, 0) - if err != nil && err != etcd.ErrKeyNotFound { - return err - } - if err == nil { + if !mustInit { return nil } - d.log.Infof("no data found in etcd, initializing") - // walsdata not found in etcd // if there're some wals in the objectstorage this means etcd has been reset. @@ -1055,7 +1068,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return etcd.FromEtcdError(err) } if !tresp.Succeeded { - return errors.Errorf("failed to sync etcd: waldata already written") + return errors.Errorf("failed to sync etcd: walsdata already written") } return nil From 445ef24daa1633a2c3113d7a72ec6b61b8c2f15e Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 14:58:42 +0200 Subject: [PATCH 04/12] datamanager: add option to force a checkpoint --- internal/datamanager/datamanager_test.go | 6 +++--- internal/datamanager/wal.go | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 1bf19c9..7023ecb 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -318,7 +318,7 @@ func TestWalCleaner(t *testing.T) { } } - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := dm.walCleaner(ctx); err != nil { @@ -436,7 +436,7 @@ func TestReadObject(t *testing.T) { } // do a checkpoint and wal clean - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := dm.walCleaner(ctx); err != nil { @@ -489,7 +489,7 @@ func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, ac time.Sleep(500 * time.Millisecond) // do a checkpoint - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { return nil, err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index d98e2d0..c66cfb2 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -642,7 +642,7 @@ func (d *DataManager) sync(ctx context.Context) error { func (d *DataManager) checkpointLoop(ctx context.Context) { for { d.log.Debugf("checkpointer") - if err := d.checkpoint(ctx); err != nil { + if err := d.checkpoint(ctx, false); err != nil { d.log.Errorf("checkpoint error: %v", err) } @@ -656,7 +656,7 @@ func (d *DataManager) checkpointLoop(ctx context.Context) { } } -func (d *DataManager) checkpoint(ctx context.Context) error { +func (d *DataManager) checkpoint(ctx context.Context, force bool) error { session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err @@ -694,7 +694,11 @@ func (d *DataManager) checkpoint(ctx context.Context) error { } walsData = append(walsData, walData) } - if len(walsData) < d.minCheckpointWalsNum { + + if !force && len(walsData) < d.minCheckpointWalsNum { + return nil + } + if len(walsData) == 0 { return nil } From cb2a871be65eda22e70e5df028b8de3587a8708c Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 15:02:11 +0200 Subject: [PATCH 05/12] datamanager: start initEtcd from last datastatus --- internal/datamanager/wal.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index c66cfb2..847aaf3 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -1002,6 +1002,17 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { // walsdata not found in etcd + curDataStatus, err := d.GetLastDataStatus() + if err != nil && err != ostypes.ErrNotExist { + return err + } + // set the first wal to import in etcd if there's a snapshot. In this way we'll + // ignore older wals (or wals left after an import) + var firstWal string + if err == nil { + firstWal = curDataStatus.WalSequence + } + // if there're some wals in the objectstorage this means etcd has been reset. // So take all the wals in committed or checkpointed state starting from the // first not checkpointed wal and put them in etcd @@ -1015,6 +1026,10 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return wal.Err } + if wal.WalSequence < firstWal { + continue + } + lastCommittedStorageWalElem.Value = wal lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next() lastCommittedStorageWalSequence = wal.WalSequence From df716fccc683c4eb90a6f436628b3aacab796d01 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 13:34:10 +0200 Subject: [PATCH 06/12] datamanager: create a new wal and checkpoint in initEtcd When doing an initEtcd (new instance or etcd reset) create a new wal (that will have a new sequence epoch) and do a checkpoint. In this way: * readdb will detect that an epoch change and do a full resync * we always have a data file (also if empty) that provides the last checkpointed wal. This information could be used by readdb to resync --- internal/datamanager/wal.go | 54 +++++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 2 deletions(-) diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 847aaf3..689ea6f 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -590,7 +590,7 @@ func (d *DataManager) sync(ctx context.Context) error { switch walData.WalStatus { case WalStatusCommitted: walFilePath := d.storageWalStatusFile(walData.WalSequence) - d.log.Debugf("syncing committed wal to storage") + d.log.Debugf("syncing committed wal %q to storage", walData.WalSequence) header := &WalHeader{ WalDataFileID: walData.WalDataFileID, PreviousWalSequence: walData.PreviousWalSequence, @@ -924,7 +924,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { walData := &WalData{ WalSequence: wal.WalSequence, WalDataFileID: header.WalDataFileID, - WalStatus: WalStatusCommitted, + WalStatus: WalStatusCommittedStorage, } if wal.Checkpointed { walData.WalStatus = WalStatusCheckpointed @@ -1063,9 +1063,53 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { } } + // insert an empty wal and make it already committedstorage + walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey) + if err != nil { + return err + } + + walDataFileID := uuid.NewV4().String() + walDataFilePath := d.storageWalDataFile(walDataFileID) + walKey := etcdWalKey(walSequence.String()) + + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader([]byte{}), 0, true); err != nil { + return err + } + d.log.Debugf("wrote wal file: %s", walDataFilePath) + + walFilePath := d.storageWalStatusFile(walSequence.String()) + d.log.Infof("syncing committed wal %q to storage", walSequence.String()) + header := &WalHeader{ + WalDataFileID: walDataFileID, + PreviousWalSequence: lastCommittedStorageWalSequence, + } + headerj, err := json.Marshal(header) + if err != nil { + return err + } + walFileCommittedPath := walFilePath + ".committed" + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil { + return err + } + + lastCommittedStorageWalSequence = walSequence.String() + + walData := &WalData{ + WalSequence: walSequence.String(), + WalDataFileID: walDataFileID, + WalStatus: WalStatusCommittedStorage, + } + walsData := &WalsData{ LastCommittedWalSequence: lastCommittedStorageWalSequence, } + + walDataj, err := json.Marshal(walData) + if err != nil { + return err + } + walsDataj, err := json.Marshal(walsData) if err != nil { return err @@ -1081,6 +1125,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence)) + then = append(then, etcdclientv3.OpPut(walKey, string(walDataj))) txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { @@ -1090,5 +1135,10 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return errors.Errorf("failed to sync etcd: walsdata already written") } + // force a checkpoint + if err := d.checkpoint(ctx, true); err != nil { + return err + } + return nil } From c03481908773e3d41f2087ffc83ad31240762e24 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 14:54:07 +0200 Subject: [PATCH 07/12] datamanager: accept optional datastatus in initEtcd --- internal/datamanager/datamanager.go | 2 +- internal/datamanager/wal.go | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index d0ccc4f..02abf23 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -193,7 +193,7 @@ func (d *DataManager) deleteEtcd(ctx context.Context) error { func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { for { - err := d.InitEtcd(ctx) + err := d.InitEtcd(ctx, nil) if err == nil { break } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 689ea6f..b70d57b 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -907,7 +907,7 @@ func (d *DataManager) etcdPinger(ctx context.Context) error { return nil } -func (d *DataManager) InitEtcd(ctx context.Context) error { +func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error { writeWal := func(wal *WalFile) error { walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed") if err != nil { @@ -1002,15 +1002,19 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { // walsdata not found in etcd - curDataStatus, err := d.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { - return err - } - // set the first wal to import in etcd if there's a snapshot. In this way we'll - // ignore older wals (or wals left after an import) var firstWal string - if err == nil { - firstWal = curDataStatus.WalSequence + if dataStatus != nil { + firstWal = dataStatus.WalSequence + } else { + dataStatus, err = d.GetLastDataStatus() + if err != nil && err != ostypes.ErrNotExist { + return err + } + // set the first wal to import in etcd if there's a snapshot. In this way we'll + // ignore older wals (or wals left after an import) + if err == nil { + firstWal = dataStatus.WalSequence + } } // if there're some wals in the objectstorage this means etcd has been reset. From 18c5ae04926b2c5538953dc7574935fb7cc12748 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 14:54:28 +0200 Subject: [PATCH 08/12] datamanager: error if there're wals but not a datastatus in ost --- internal/datamanager/wal.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index b70d57b..0e55e62 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -1025,6 +1025,10 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro lastCommittedStorageWalSequence := "" wroteWals := 0 for wal := range d.ListOSTWals("") { + // if there're wals in ost but not a datastatus return an error + if dataStatus == nil { + return errors.Errorf("no datastatus in etcd but some wals are present, this shouldn't happen") + } d.log.Debugf("wal: %s", wal) if wal.Err != nil { return wal.Err From f7175c4592c3459b71f00812ef446470a5e127db Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 15:16:10 +0200 Subject: [PATCH 09/12] datamanager: save previous wal in waldata --- internal/datamanager/wal.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 0e55e62..c1ca1f0 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -460,14 +460,15 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti } d.log.Debugf("wrote wal file: %s", walDataFilePath) - walsData.LastCommittedWalSequence = walSequence.String() - walData := &WalData{ - WalSequence: walSequence.String(), - WalDataFileID: walDataFileID, - WalStatus: WalStatusCommitted, + WalSequence: walSequence.String(), + WalDataFileID: walDataFileID, + WalStatus: WalStatusCommitted, + PreviousWalSequence: walsData.LastCommittedWalSequence, } + walsData.LastCommittedWalSequence = walSequence.String() + walsDataj, err := json.Marshal(walsData) if err != nil { return nil, err @@ -922,9 +923,10 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro walFile.Close() walData := &WalData{ - WalSequence: wal.WalSequence, - WalDataFileID: header.WalDataFileID, - WalStatus: WalStatusCommittedStorage, + WalSequence: wal.WalSequence, + WalDataFileID: header.WalDataFileID, + WalStatus: WalStatusCommittedStorage, + PreviousWalSequence: header.PreviousWalSequence, } if wal.Checkpointed { walData.WalStatus = WalStatusCheckpointed @@ -1101,14 +1103,15 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro return err } - lastCommittedStorageWalSequence = walSequence.String() - walData := &WalData{ - WalSequence: walSequence.String(), - WalDataFileID: walDataFileID, - WalStatus: WalStatusCommittedStorage, + WalSequence: walSequence.String(), + WalDataFileID: walDataFileID, + WalStatus: WalStatusCommittedStorage, + PreviousWalSequence: lastCommittedStorageWalSequence, } + lastCommittedStorageWalSequence = walSequence.String() + walsData := &WalsData{ LastCommittedWalSequence: lastCommittedStorageWalSequence, } From 16820e903380121c51392ed5614224a2fc6a6199 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 17 Jul 2019 16:58:43 +0200 Subject: [PATCH 10/12] readdb: insert current wal sequence after checking wal status --- internal/services/configstore/readdb/readdb.go | 11 ++++++----- internal/services/runservice/readdb/readdb.go | 11 ++++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index b526a6f..6095298 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -352,6 +352,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { } } + r.log.Infof("syncing from wals") err = r.rdb.Do(func(tx *db.Tx) error { if err := r.insertRevision(tx, revision); err != nil { return err @@ -360,21 +361,21 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { // use the same revision as previous operation for walElement := range r.dm.ListEtcdWals(ctx, revision) { if walElement.Err != nil { - return err + return walElement.Err } if walElement.WalData.WalSequence <= curWalSeq { continue } - if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil { - return err - } - // update readdb only when the wal has been committed to etcd if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { return nil } + if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil { + return err + } + r.log.Debugf("applying wal to db") if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { return err diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index de7e1c8..07ab217 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -602,6 +602,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { } } + r.log.Infof("syncing from wals") err = r.rdb.Do(func(tx *db.Tx) error { if err := insertRevisionOST(tx, revision); err != nil { return err @@ -610,21 +611,21 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { // use the same revision as previous operation for walElement := range r.dm.ListEtcdWals(ctx, revision) { if walElement.Err != nil { - return err + return walElement.Err } if walElement.WalData.WalSequence <= curWalSeq { continue } - if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil { - return err - } - // update readdb only when the wal has been committed to etcd if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { return nil } + if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil { + return err + } + r.log.Debugf("applying wal to db") if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { return err From 3f64bda0ccf3b1bb67d8916164ca9a10ca4024ec Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 13:39:12 +0200 Subject: [PATCH 11/12] readdb: save walSequence provided by data file --- internal/services/configstore/readdb/readdb.go | 10 ++++++++++ internal/services/runservice/readdb/readdb.go | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 6095298..098e1b5 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -188,6 +188,16 @@ func (r *ReadDB) SyncFromDump() (string, error) { } } + err = r.rdb.Do(func(tx *db.Tx) error { + if err := r.insertCommittedWalSequence(tx, dumpIndex.WalSequence); err != nil { + return err + } + return nil + }) + if err != nil { + return "", err + } + return dumpIndex.WalSequence, nil } diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 07ab217..4f98696 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -702,6 +702,16 @@ func (r *ReadDB) SyncFromDump() (string, error) { } } + err = r.rdb.Do(func(tx *db.Tx) error { + if err := r.insertCommittedWalSequenceOST(tx, dumpIndex.WalSequence); err != nil { + return err + } + return nil + }) + if err != nil { + return "", err + } + return dumpIndex.WalSequence, nil } From 3a963ef95f54953ba463e38f19f6bc84c4635b2f Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 13:43:28 +0200 Subject: [PATCH 12/12] readdb: error if there's no wal in etcd --- internal/services/configstore/readdb/readdb.go | 11 +++-------- internal/services/runservice/readdb/readdb.go | 11 +++-------- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 098e1b5..005c3a7 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -351,15 +351,10 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { r.log.Debugf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData)) r.log.Debugf("revision: %d", revision) if firstAvailableWalData == nil { - if curWalSeq != "" { - // this happens if etcd has been reset - return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq) - } + return errors.Errorf("no wal data in etcd") } - if firstAvailableWalData != nil { - if curWalSeq < firstAvailableWalData.WalSequence { - return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence) - } + if curWalSeq < firstAvailableWalData.WalSequence { + return errors.Errorf("current applied wal seq %q is smaller than the first available wal in etcd %q", curWalSeq, firstAvailableWalData.WalSequence) } r.log.Infof("syncing from wals") diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 4f98696..1974cd6 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -591,15 +591,10 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { r.log.Debugf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData)) r.log.Debugf("revision: %d", revision) if firstAvailableWalData == nil { - if curWalSeq != "" { - // this happens if etcd has been reset - return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq) - } + return errors.Errorf("no wal data in etcd") } - if firstAvailableWalData != nil { - if curWalSeq < firstAvailableWalData.WalSequence { - return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence) - } + if curWalSeq < firstAvailableWalData.WalSequence { + return errors.Errorf("current applied wal seq %q is smaller than the first available wal in etcd %q", curWalSeq, firstAvailableWalData.WalSequence) } r.log.Infof("syncing from wals")