diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index e4de93c..02abf23 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -57,10 +57,9 @@ var ( etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata") etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") + etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq") - etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq") - - etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd") + etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd") etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") @@ -172,9 +171,29 @@ func etcdWalKey(walSeq string) string { return path.Join(etcdWalsDir, walSeq) } +// deleteEtcd deletes all etcd data excluding keys used for locking +func (d *DataManager) deleteEtcd(ctx context.Context) error { + prefixes := []string{ + etcdWalsDir + "/", + etcdWalsDataKey, + etcdWalSeqKey, + etcdLastCommittedStorageWalSeqKey, + etcdCheckpointSeqKey, + etcdChangeGroupsDir + "/", + etcdChangeGroupMinRevisionKey, + } + for _, prefix := range prefixes { + if err := d.e.DeletePrefix(ctx, prefix); err != nil { + return err + } + } + + return nil +} + func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { for { - err := d.InitEtcd(ctx) + err := d.InitEtcd(ctx, nil) if err == nil { break } diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 1bf19c9..7023ecb 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -318,7 +318,7 @@ func TestWalCleaner(t *testing.T) { } } - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := dm.walCleaner(ctx); err != nil { @@ -436,7 +436,7 @@ func TestReadObject(t *testing.T) { } // do a checkpoint and wal clean - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := dm.walCleaner(ctx); err != nil { @@ -489,7 +489,7 @@ func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, ac time.Sleep(500 * time.Millisecond) // do a checkpoint - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { return nil, err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index c9a8a14..c1ca1f0 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -460,14 +460,15 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti } d.log.Debugf("wrote wal file: %s", walDataFilePath) - walsData.LastCommittedWalSequence = walSequence.String() - walData := &WalData{ - WalSequence: walSequence.String(), - WalDataFileID: walDataFileID, - WalStatus: WalStatusCommitted, + WalSequence: walSequence.String(), + WalDataFileID: walDataFileID, + WalStatus: WalStatusCommitted, + PreviousWalSequence: walsData.LastCommittedWalSequence, } + walsData.LastCommittedWalSequence = walSequence.String() + walsDataj, err := json.Marshal(walsData) if err != nil { return nil, err @@ -590,7 +591,7 @@ func (d *DataManager) sync(ctx context.Context) error { switch walData.WalStatus { case WalStatusCommitted: walFilePath := d.storageWalStatusFile(walData.WalSequence) - d.log.Debugf("syncing committed wal to storage") + d.log.Debugf("syncing committed wal %q to storage", walData.WalSequence) header := &WalHeader{ WalDataFileID: walData.WalDataFileID, PreviousWalSequence: walData.PreviousWalSequence, @@ -642,7 +643,7 @@ func (d *DataManager) sync(ctx context.Context) error { func (d *DataManager) checkpointLoop(ctx context.Context) { for { d.log.Debugf("checkpointer") - if err := d.checkpoint(ctx); err != nil { + if err := d.checkpoint(ctx, false); err != nil { d.log.Errorf("checkpoint error: %v", err) } @@ -656,7 +657,7 @@ func (d *DataManager) checkpointLoop(ctx context.Context) { } } -func (d *DataManager) checkpoint(ctx context.Context) error { +func (d *DataManager) checkpoint(ctx context.Context, force bool) error { session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err @@ -694,7 +695,11 @@ func (d *DataManager) checkpoint(ctx context.Context) error { } walsData = append(walsData, walData) } - if len(walsData) < d.minCheckpointWalsNum { + + if !force && len(walsData) < d.minCheckpointWalsNum { + return nil + } + if len(walsData) == 0 { return nil } @@ -903,7 +908,7 @@ func (d *DataManager) etcdPinger(ctx context.Context) error { return nil } -func (d *DataManager) InitEtcd(ctx context.Context) error { +func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error { writeWal := func(wal *WalFile) error { walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed") if err != nil { @@ -918,9 +923,10 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { walFile.Close() walData := &WalData{ - WalSequence: wal.WalSequence, - WalDataFileID: header.WalDataFileID, - WalStatus: WalStatusCommitted, + WalSequence: wal.WalSequence, + WalDataFileID: header.WalDataFileID, + WalStatus: WalStatusCommittedStorage, + PreviousWalSequence: header.PreviousWalSequence, } if wal.Checkpointed { walData.WalStatus = WalStatusCheckpointed @@ -962,7 +968,26 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { } defer func() { _ = m.Unlock(ctx) }() - // Create changegroup min revision if it doesn't exists + mustInit := false + + _, err = d.e.Get(ctx, etcdWalsDataKey, 0) + if err != nil { + if err != etcd.ErrKeyNotFound { + return err + } + mustInit = true + } + + if mustInit { + d.log.Infof("no data found in etcd, initializing") + + // delete all wals from etcd + if err := d.deleteEtcd(ctx); err != nil { + return err + } + } + + // Always create changegroup min revision if it doesn't exists cmp := []etcdclientv3.Cmp{} then := []etcdclientv3.Op{} @@ -973,18 +998,27 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return etcd.FromEtcdError(err) } - _, err = d.e.Get(ctx, etcdWalsDataKey, 0) - if err != nil && err != etcd.ErrKeyNotFound { - return err - } - if err == nil { + if !mustInit { return nil } - d.log.Infof("no data found in etcd, initializing") - // walsdata not found in etcd + var firstWal string + if dataStatus != nil { + firstWal = dataStatus.WalSequence + } else { + dataStatus, err = d.GetLastDataStatus() + if err != nil && err != ostypes.ErrNotExist { + return err + } + // set the first wal to import in etcd if there's a snapshot. In this way we'll + // ignore older wals (or wals left after an import) + if err == nil { + firstWal = dataStatus.WalSequence + } + } + // if there're some wals in the objectstorage this means etcd has been reset. // So take all the wals in committed or checkpointed state starting from the // first not checkpointed wal and put them in etcd @@ -993,11 +1027,19 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { lastCommittedStorageWalSequence := "" wroteWals := 0 for wal := range d.ListOSTWals("") { + // if there're wals in ost but not a datastatus return an error + if dataStatus == nil { + return errors.Errorf("no datastatus in etcd but some wals are present, this shouldn't happen") + } d.log.Debugf("wal: %s", wal) if wal.Err != nil { return wal.Err } + if wal.WalSequence < firstWal { + continue + } + lastCommittedStorageWalElem.Value = wal lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next() lastCommittedStorageWalSequence = wal.WalSequence @@ -1031,9 +1073,54 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { } } + // insert an empty wal and make it already committedstorage + walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey) + if err != nil { + return err + } + + walDataFileID := uuid.NewV4().String() + walDataFilePath := d.storageWalDataFile(walDataFileID) + walKey := etcdWalKey(walSequence.String()) + + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader([]byte{}), 0, true); err != nil { + return err + } + d.log.Debugf("wrote wal file: %s", walDataFilePath) + + walFilePath := d.storageWalStatusFile(walSequence.String()) + d.log.Infof("syncing committed wal %q to storage", walSequence.String()) + header := &WalHeader{ + WalDataFileID: walDataFileID, + PreviousWalSequence: lastCommittedStorageWalSequence, + } + headerj, err := json.Marshal(header) + if err != nil { + return err + } + walFileCommittedPath := walFilePath + ".committed" + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil { + return err + } + + walData := &WalData{ + WalSequence: walSequence.String(), + WalDataFileID: walDataFileID, + WalStatus: WalStatusCommittedStorage, + PreviousWalSequence: lastCommittedStorageWalSequence, + } + + lastCommittedStorageWalSequence = walSequence.String() + walsData := &WalsData{ LastCommittedWalSequence: lastCommittedStorageWalSequence, } + + walDataj, err := json.Marshal(walData) + if err != nil { + return err + } + walsDataj, err := json.Marshal(walsData) if err != nil { return err @@ -1049,13 +1136,19 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence)) + then = append(then, etcdclientv3.OpPut(walKey, string(walDataj))) txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) } if !tresp.Succeeded { - return errors.Errorf("failed to sync etcd: waldata already written") + return errors.Errorf("failed to sync etcd: walsdata already written") + } + + // force a checkpoint + if err := d.checkpoint(ctx, true); err != nil { + return err } return nil diff --git a/internal/etcd/etcd.go b/internal/etcd/etcd.go index ca3dae7..84fdba7 100644 --- a/internal/etcd/etcd.go +++ b/internal/etcd/etcd.go @@ -286,6 +286,22 @@ func (s *Store) Delete(ctx context.Context, key string) error { return err } +func (s *Store) DeletePrefix(ctx context.Context, prefix string) error { + etcdv3Options := []clientv3.OpOption{} + + key := prefix + if len(key) == 0 { + key = "\x00" + etcdv3Options = append(etcdv3Options, clientv3.WithFromKey()) + } else { + etcdv3Options = append(etcdv3Options, clientv3.WithPrefix()) + } + + _, err := s.c.Delete(ctx, key, etcdv3Options...) + + return err +} + func (s *Store) AtomicDelete(ctx context.Context, key string, revision int64) (*etcdclientv3.TxnResponse, error) { cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(key), "=", revision) req := etcdclientv3.OpDelete(key) diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index b526a6f..005c3a7 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -188,6 +188,16 @@ func (r *ReadDB) SyncFromDump() (string, error) { } } + err = r.rdb.Do(func(tx *db.Tx) error { + if err := r.insertCommittedWalSequence(tx, dumpIndex.WalSequence); err != nil { + return err + } + return nil + }) + if err != nil { + return "", err + } + return dumpIndex.WalSequence, nil } @@ -341,17 +351,13 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { r.log.Debugf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData)) r.log.Debugf("revision: %d", revision) if firstAvailableWalData == nil { - if curWalSeq != "" { - // this happens if etcd has been reset - return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq) - } + return errors.Errorf("no wal data in etcd") } - if firstAvailableWalData != nil { - if curWalSeq < firstAvailableWalData.WalSequence { - return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence) - } + if curWalSeq < firstAvailableWalData.WalSequence { + return errors.Errorf("current applied wal seq %q is smaller than the first available wal in etcd %q", curWalSeq, firstAvailableWalData.WalSequence) } + r.log.Infof("syncing from wals") err = r.rdb.Do(func(tx *db.Tx) error { if err := r.insertRevision(tx, revision); err != nil { return err @@ -360,21 +366,21 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { // use the same revision as previous operation for walElement := range r.dm.ListEtcdWals(ctx, revision) { if walElement.Err != nil { - return err + return walElement.Err } if walElement.WalData.WalSequence <= curWalSeq { continue } - if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil { - return err - } - // update readdb only when the wal has been committed to etcd if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { return nil } + if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil { + return err + } + r.log.Debugf("applying wal to db") if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { return err diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index de7e1c8..1974cd6 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -591,17 +591,13 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { r.log.Debugf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData)) r.log.Debugf("revision: %d", revision) if firstAvailableWalData == nil { - if curWalSeq != "" { - // this happens if etcd has been reset - return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq) - } + return errors.Errorf("no wal data in etcd") } - if firstAvailableWalData != nil { - if curWalSeq < firstAvailableWalData.WalSequence { - return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence) - } + if curWalSeq < firstAvailableWalData.WalSequence { + return errors.Errorf("current applied wal seq %q is smaller than the first available wal in etcd %q", curWalSeq, firstAvailableWalData.WalSequence) } + r.log.Infof("syncing from wals") err = r.rdb.Do(func(tx *db.Tx) error { if err := insertRevisionOST(tx, revision); err != nil { return err @@ -610,21 +606,21 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { // use the same revision as previous operation for walElement := range r.dm.ListEtcdWals(ctx, revision) { if walElement.Err != nil { - return err + return walElement.Err } if walElement.WalData.WalSequence <= curWalSeq { continue } - if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil { - return err - } - // update readdb only when the wal has been committed to etcd if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { return nil } + if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil { + return err + } + r.log.Debugf("applying wal to db") if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { return err @@ -701,6 +697,16 @@ func (r *ReadDB) SyncFromDump() (string, error) { } } + err = r.rdb.Do(func(tx *db.Tx) error { + if err := r.insertCommittedWalSequenceOST(tx, dumpIndex.WalSequence); err != nil { + return err + } + return nil + }) + if err != nil { + return "", err + } + return dumpIndex.WalSequence, nil }