Merge pull request #56 from sgotti/datamanager_multiple_improvements
datamanager: multiple improvements
This commit is contained in:
commit
ea3e0d1d7c
@ -57,10 +57,9 @@ var (
|
||||
etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata")
|
||||
etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq")
|
||||
etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq")
|
||||
etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq")
|
||||
|
||||
etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq")
|
||||
|
||||
etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd")
|
||||
etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd")
|
||||
etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock")
|
||||
etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock")
|
||||
etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock")
|
||||
@ -172,9 +171,29 @@ func etcdWalKey(walSeq string) string {
|
||||
return path.Join(etcdWalsDir, walSeq)
|
||||
}
|
||||
|
||||
// deleteEtcd deletes all etcd data excluding keys used for locking
|
||||
func (d *DataManager) deleteEtcd(ctx context.Context) error {
|
||||
prefixes := []string{
|
||||
etcdWalsDir + "/",
|
||||
etcdWalsDataKey,
|
||||
etcdWalSeqKey,
|
||||
etcdLastCommittedStorageWalSeqKey,
|
||||
etcdCheckpointSeqKey,
|
||||
etcdChangeGroupsDir + "/",
|
||||
etcdChangeGroupMinRevisionKey,
|
||||
}
|
||||
for _, prefix := range prefixes {
|
||||
if err := d.e.DeletePrefix(ctx, prefix); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
|
||||
for {
|
||||
err := d.InitEtcd(ctx)
|
||||
err := d.InitEtcd(ctx, nil)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
@ -318,7 +318,7 @@ func TestWalCleaner(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := dm.checkpoint(ctx); err != nil {
|
||||
if err := dm.checkpoint(ctx, true); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if err := dm.walCleaner(ctx); err != nil {
|
||||
@ -436,7 +436,7 @@ func TestReadObject(t *testing.T) {
|
||||
}
|
||||
|
||||
// do a checkpoint and wal clean
|
||||
if err := dm.checkpoint(ctx); err != nil {
|
||||
if err := dm.checkpoint(ctx, true); err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if err := dm.walCleaner(ctx); err != nil {
|
||||
@ -489,7 +489,7 @@ func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, ac
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// do a checkpoint
|
||||
if err := dm.checkpoint(ctx); err != nil {
|
||||
if err := dm.checkpoint(ctx, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -460,14 +460,15 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti
|
||||
}
|
||||
d.log.Debugf("wrote wal file: %s", walDataFilePath)
|
||||
|
||||
walsData.LastCommittedWalSequence = walSequence.String()
|
||||
|
||||
walData := &WalData{
|
||||
WalSequence: walSequence.String(),
|
||||
WalDataFileID: walDataFileID,
|
||||
WalStatus: WalStatusCommitted,
|
||||
WalSequence: walSequence.String(),
|
||||
WalDataFileID: walDataFileID,
|
||||
WalStatus: WalStatusCommitted,
|
||||
PreviousWalSequence: walsData.LastCommittedWalSequence,
|
||||
}
|
||||
|
||||
walsData.LastCommittedWalSequence = walSequence.String()
|
||||
|
||||
walsDataj, err := json.Marshal(walsData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -590,7 +591,7 @@ func (d *DataManager) sync(ctx context.Context) error {
|
||||
switch walData.WalStatus {
|
||||
case WalStatusCommitted:
|
||||
walFilePath := d.storageWalStatusFile(walData.WalSequence)
|
||||
d.log.Debugf("syncing committed wal to storage")
|
||||
d.log.Debugf("syncing committed wal %q to storage", walData.WalSequence)
|
||||
header := &WalHeader{
|
||||
WalDataFileID: walData.WalDataFileID,
|
||||
PreviousWalSequence: walData.PreviousWalSequence,
|
||||
@ -642,7 +643,7 @@ func (d *DataManager) sync(ctx context.Context) error {
|
||||
func (d *DataManager) checkpointLoop(ctx context.Context) {
|
||||
for {
|
||||
d.log.Debugf("checkpointer")
|
||||
if err := d.checkpoint(ctx); err != nil {
|
||||
if err := d.checkpoint(ctx, false); err != nil {
|
||||
d.log.Errorf("checkpoint error: %v", err)
|
||||
}
|
||||
|
||||
@ -656,7 +657,7 @@ func (d *DataManager) checkpointLoop(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DataManager) checkpoint(ctx context.Context) error {
|
||||
func (d *DataManager) checkpoint(ctx context.Context, force bool) error {
|
||||
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
||||
if err != nil {
|
||||
return err
|
||||
@ -694,7 +695,11 @@ func (d *DataManager) checkpoint(ctx context.Context) error {
|
||||
}
|
||||
walsData = append(walsData, walData)
|
||||
}
|
||||
if len(walsData) < d.minCheckpointWalsNum {
|
||||
|
||||
if !force && len(walsData) < d.minCheckpointWalsNum {
|
||||
return nil
|
||||
}
|
||||
if len(walsData) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -903,7 +908,7 @@ func (d *DataManager) etcdPinger(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||
func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error {
|
||||
writeWal := func(wal *WalFile) error {
|
||||
walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed")
|
||||
if err != nil {
|
||||
@ -918,9 +923,10 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||
walFile.Close()
|
||||
|
||||
walData := &WalData{
|
||||
WalSequence: wal.WalSequence,
|
||||
WalDataFileID: header.WalDataFileID,
|
||||
WalStatus: WalStatusCommitted,
|
||||
WalSequence: wal.WalSequence,
|
||||
WalDataFileID: header.WalDataFileID,
|
||||
WalStatus: WalStatusCommittedStorage,
|
||||
PreviousWalSequence: header.PreviousWalSequence,
|
||||
}
|
||||
if wal.Checkpointed {
|
||||
walData.WalStatus = WalStatusCheckpointed
|
||||
@ -962,7 +968,26 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||
}
|
||||
defer func() { _ = m.Unlock(ctx) }()
|
||||
|
||||
// Create changegroup min revision if it doesn't exists
|
||||
mustInit := false
|
||||
|
||||
_, err = d.e.Get(ctx, etcdWalsDataKey, 0)
|
||||
if err != nil {
|
||||
if err != etcd.ErrKeyNotFound {
|
||||
return err
|
||||
}
|
||||
mustInit = true
|
||||
}
|
||||
|
||||
if mustInit {
|
||||
d.log.Infof("no data found in etcd, initializing")
|
||||
|
||||
// delete all wals from etcd
|
||||
if err := d.deleteEtcd(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Always create changegroup min revision if it doesn't exists
|
||||
cmp := []etcdclientv3.Cmp{}
|
||||
then := []etcdclientv3.Op{}
|
||||
|
||||
@ -973,18 +998,27 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||
return etcd.FromEtcdError(err)
|
||||
}
|
||||
|
||||
_, err = d.e.Get(ctx, etcdWalsDataKey, 0)
|
||||
if err != nil && err != etcd.ErrKeyNotFound {
|
||||
return err
|
||||
}
|
||||
if err == nil {
|
||||
if !mustInit {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.log.Infof("no data found in etcd, initializing")
|
||||
|
||||
// walsdata not found in etcd
|
||||
|
||||
var firstWal string
|
||||
if dataStatus != nil {
|
||||
firstWal = dataStatus.WalSequence
|
||||
} else {
|
||||
dataStatus, err = d.GetLastDataStatus()
|
||||
if err != nil && err != ostypes.ErrNotExist {
|
||||
return err
|
||||
}
|
||||
// set the first wal to import in etcd if there's a snapshot. In this way we'll
|
||||
// ignore older wals (or wals left after an import)
|
||||
if err == nil {
|
||||
firstWal = dataStatus.WalSequence
|
||||
}
|
||||
}
|
||||
|
||||
// if there're some wals in the objectstorage this means etcd has been reset.
|
||||
// So take all the wals in committed or checkpointed state starting from the
|
||||
// first not checkpointed wal and put them in etcd
|
||||
@ -993,11 +1027,19 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||
lastCommittedStorageWalSequence := ""
|
||||
wroteWals := 0
|
||||
for wal := range d.ListOSTWals("") {
|
||||
// if there're wals in ost but not a datastatus return an error
|
||||
if dataStatus == nil {
|
||||
return errors.Errorf("no datastatus in etcd but some wals are present, this shouldn't happen")
|
||||
}
|
||||
d.log.Debugf("wal: %s", wal)
|
||||
if wal.Err != nil {
|
||||
return wal.Err
|
||||
}
|
||||
|
||||
if wal.WalSequence < firstWal {
|
||||
continue
|
||||
}
|
||||
|
||||
lastCommittedStorageWalElem.Value = wal
|
||||
lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next()
|
||||
lastCommittedStorageWalSequence = wal.WalSequence
|
||||
@ -1031,9 +1073,54 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// insert an empty wal and make it already committedstorage
|
||||
walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
walDataFileID := uuid.NewV4().String()
|
||||
walDataFilePath := d.storageWalDataFile(walDataFileID)
|
||||
walKey := etcdWalKey(walSequence.String())
|
||||
|
||||
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader([]byte{}), 0, true); err != nil {
|
||||
return err
|
||||
}
|
||||
d.log.Debugf("wrote wal file: %s", walDataFilePath)
|
||||
|
||||
walFilePath := d.storageWalStatusFile(walSequence.String())
|
||||
d.log.Infof("syncing committed wal %q to storage", walSequence.String())
|
||||
header := &WalHeader{
|
||||
WalDataFileID: walDataFileID,
|
||||
PreviousWalSequence: lastCommittedStorageWalSequence,
|
||||
}
|
||||
headerj, err := json.Marshal(header)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
walFileCommittedPath := walFilePath + ".committed"
|
||||
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
walData := &WalData{
|
||||
WalSequence: walSequence.String(),
|
||||
WalDataFileID: walDataFileID,
|
||||
WalStatus: WalStatusCommittedStorage,
|
||||
PreviousWalSequence: lastCommittedStorageWalSequence,
|
||||
}
|
||||
|
||||
lastCommittedStorageWalSequence = walSequence.String()
|
||||
|
||||
walsData := &WalsData{
|
||||
LastCommittedWalSequence: lastCommittedStorageWalSequence,
|
||||
}
|
||||
|
||||
walDataj, err := json.Marshal(walData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
walsDataj, err := json.Marshal(walsData)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1049,13 +1136,19 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0))
|
||||
then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj)))
|
||||
then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence))
|
||||
then = append(then, etcdclientv3.OpPut(walKey, string(walDataj)))
|
||||
txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...)
|
||||
tresp, err := txn.Commit()
|
||||
if err != nil {
|
||||
return etcd.FromEtcdError(err)
|
||||
}
|
||||
if !tresp.Succeeded {
|
||||
return errors.Errorf("failed to sync etcd: waldata already written")
|
||||
return errors.Errorf("failed to sync etcd: walsdata already written")
|
||||
}
|
||||
|
||||
// force a checkpoint
|
||||
if err := d.checkpoint(ctx, true); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -286,6 +286,22 @@ func (s *Store) Delete(ctx context.Context, key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) DeletePrefix(ctx context.Context, prefix string) error {
|
||||
etcdv3Options := []clientv3.OpOption{}
|
||||
|
||||
key := prefix
|
||||
if len(key) == 0 {
|
||||
key = "\x00"
|
||||
etcdv3Options = append(etcdv3Options, clientv3.WithFromKey())
|
||||
} else {
|
||||
etcdv3Options = append(etcdv3Options, clientv3.WithPrefix())
|
||||
}
|
||||
|
||||
_, err := s.c.Delete(ctx, key, etcdv3Options...)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) AtomicDelete(ctx context.Context, key string, revision int64) (*etcdclientv3.TxnResponse, error) {
|
||||
cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(key), "=", revision)
|
||||
req := etcdclientv3.OpDelete(key)
|
||||
|
@ -188,6 +188,16 @@ func (r *ReadDB) SyncFromDump() (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
err = r.rdb.Do(func(tx *db.Tx) error {
|
||||
if err := r.insertCommittedWalSequence(tx, dumpIndex.WalSequence); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return dumpIndex.WalSequence, nil
|
||||
}
|
||||
|
||||
@ -341,17 +351,13 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
|
||||
r.log.Debugf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData))
|
||||
r.log.Debugf("revision: %d", revision)
|
||||
if firstAvailableWalData == nil {
|
||||
if curWalSeq != "" {
|
||||
// this happens if etcd has been reset
|
||||
return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq)
|
||||
}
|
||||
return errors.Errorf("no wal data in etcd")
|
||||
}
|
||||
if firstAvailableWalData != nil {
|
||||
if curWalSeq < firstAvailableWalData.WalSequence {
|
||||
return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence)
|
||||
}
|
||||
if curWalSeq < firstAvailableWalData.WalSequence {
|
||||
return errors.Errorf("current applied wal seq %q is smaller than the first available wal in etcd %q", curWalSeq, firstAvailableWalData.WalSequence)
|
||||
}
|
||||
|
||||
r.log.Infof("syncing from wals")
|
||||
err = r.rdb.Do(func(tx *db.Tx) error {
|
||||
if err := r.insertRevision(tx, revision); err != nil {
|
||||
return err
|
||||
@ -360,21 +366,21 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
|
||||
// use the same revision as previous operation
|
||||
for walElement := range r.dm.ListEtcdWals(ctx, revision) {
|
||||
if walElement.Err != nil {
|
||||
return err
|
||||
return walElement.Err
|
||||
}
|
||||
if walElement.WalData.WalSequence <= curWalSeq {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update readdb only when the wal has been committed to etcd
|
||||
if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.log.Debugf("applying wal to db")
|
||||
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
|
||||
return err
|
||||
|
@ -591,17 +591,13 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
|
||||
r.log.Debugf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData))
|
||||
r.log.Debugf("revision: %d", revision)
|
||||
if firstAvailableWalData == nil {
|
||||
if curWalSeq != "" {
|
||||
// this happens if etcd has been reset
|
||||
return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq)
|
||||
}
|
||||
return errors.Errorf("no wal data in etcd")
|
||||
}
|
||||
if firstAvailableWalData != nil {
|
||||
if curWalSeq < firstAvailableWalData.WalSequence {
|
||||
return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence)
|
||||
}
|
||||
if curWalSeq < firstAvailableWalData.WalSequence {
|
||||
return errors.Errorf("current applied wal seq %q is smaller than the first available wal in etcd %q", curWalSeq, firstAvailableWalData.WalSequence)
|
||||
}
|
||||
|
||||
r.log.Infof("syncing from wals")
|
||||
err = r.rdb.Do(func(tx *db.Tx) error {
|
||||
if err := insertRevisionOST(tx, revision); err != nil {
|
||||
return err
|
||||
@ -610,21 +606,21 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
|
||||
// use the same revision as previous operation
|
||||
for walElement := range r.dm.ListEtcdWals(ctx, revision) {
|
||||
if walElement.Err != nil {
|
||||
return err
|
||||
return walElement.Err
|
||||
}
|
||||
if walElement.WalData.WalSequence <= curWalSeq {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update readdb only when the wal has been committed to etcd
|
||||
if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.log.Debugf("applying wal to db")
|
||||
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
|
||||
return err
|
||||
@ -701,6 +697,16 @@ func (r *ReadDB) SyncFromDump() (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
err = r.rdb.Do(func(tx *db.Tx) error {
|
||||
if err := r.insertCommittedWalSequenceOST(tx, dumpIndex.WalSequence); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return dumpIndex.WalSequence, nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user