Merge pull request #164 from sgotti/datamanager_dont_create_ost_checkpointed_files

datamanager: don't create ost wal checkpointed files
This commit is contained in:
Simone Gotti 2019-11-07 14:48:12 +01:00 committed by GitHub
commit 33e2d50036
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 10 additions and 60 deletions

View File

@ -16,7 +16,6 @@ package datamanager
import ( import (
"bytes" "bytes"
"container/ring"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -153,8 +152,6 @@ func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) {
type WalFile struct { type WalFile struct {
WalSequence string WalSequence string
Err error Err error
Committed bool
Checkpointed bool
} }
func (d *DataManager) ListOSTWals(start string) <-chan *WalFile { func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
@ -181,36 +178,25 @@ func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
name := path.Base(object.Path) name := path.Base(object.Path)
ext := path.Ext(name) ext := path.Ext(name)
walSequence := strings.TrimSuffix(name, ext) // accept only ".committed" files (skip old files that had ".checkpointed" extensions)
// wal file refers to another wal, so return the current one if ext != ".committed" {
if curWal.WalSequence != walSequence { continue
// if this happen something is wrong on the objectstorage
if !curWal.Committed && curWal.Checkpointed {
walCh <- &WalFile{
Err: errors.Errorf("wal is checkpointed but not committed. this should never happen"),
}
return
} }
walSequence := strings.TrimSuffix(name, ext)
// wal file refers to another wal, so return the current one
if curWal.WalSequence != walSequence {
if curWal.WalSequence != "" { if curWal.WalSequence != "" {
// skip not committed wals
if curWal.Committed {
walCh <- curWal walCh <- curWal
} }
}
curWal = &WalFile{ curWal = &WalFile{
WalSequence: walSequence, WalSequence: walSequence,
} }
} }
}
if ext == ".committed" {
curWal.Committed = true
}
if ext == ".checkpointed" {
curWal.Checkpointed = true
}
}
if curWal.WalSequence != "" { if curWal.WalSequence != "" {
walCh <- curWal walCh <- curWal
} }
@ -627,13 +613,6 @@ func (d *DataManager) sync(ctx context.Context) error {
if !tresp.Succeeded { if !tresp.Succeeded {
return errors.Errorf("failed to write committedstorage wal: concurrent update") return errors.Errorf("failed to write committedstorage wal: concurrent update")
} }
case WalStatusCheckpointed:
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("checkpointing committed wal to storage")
walFileCheckpointedPath := walFilePath + ".checkpointed"
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), 0, true); err != nil {
return err
}
} }
} }
return nil return nil
@ -963,9 +942,6 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
WalStatus: WalStatusCommittedStorage, WalStatus: WalStatusCommittedStorage,
PreviousWalSequence: header.PreviousWalSequence, PreviousWalSequence: header.PreviousWalSequence,
} }
if wal.Checkpointed {
walData.WalStatus = WalStatusCheckpointed
}
walDataj, err := json.Marshal(walData) walDataj, err := json.Marshal(walData)
if err != nil { if err != nil {
return err return err
@ -1057,8 +1033,6 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
// if there're some wals in the objectstorage this means etcd has been reset. // if there're some wals in the objectstorage this means etcd has been reset.
// So take all the wals in committed or checkpointed state starting from the // So take all the wals in committed or checkpointed state starting from the
// first not checkpointed wal and put them in etcd // first not checkpointed wal and put them in etcd
lastCommittedStorageWalsRing := ring.New(100)
lastCommittedStorageWalElem := lastCommittedStorageWalsRing
lastCommittedStorageWalSequence := "" lastCommittedStorageWalSequence := ""
wroteWals := 0 wroteWals := 0
for wal := range d.ListOSTWals("") { for wal := range d.ListOSTWals("") {
@ -1075,37 +1049,13 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
continue continue
} }
lastCommittedStorageWalElem.Value = wal
lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next()
lastCommittedStorageWalSequence = wal.WalSequence lastCommittedStorageWalSequence = wal.WalSequence
if wal.Checkpointed {
continue
}
if err := writeWal(wal); err != nil { if err := writeWal(wal); err != nil {
return err return err
} }
wroteWals++ wroteWals++
}
// if no wal has been written (because all are checkpointed), write at least
// the ones in the ring
if wroteWals == 0 {
var err error
lastCommittedStorageWalsRing.Do(func(e interface{}) {
if e == nil {
return
}
wal := e.(*WalFile)
err = writeWal(wal)
if err != nil {
return
}
lastCommittedStorageWalSequence = wal.WalSequence
})
if err != nil {
return err
}
} }
// insert an empty wal and make it already committedstorage // insert an empty wal and make it already committedstorage