datamanager: don't create ost wal checkpointed files
currently creating .checkpointed files in the ost isn't useful. We already have the data snapshot that reports the last checkpointed wal.
This commit is contained in:
parent
ae8eec94b5
commit
acd62a3f90
@ -16,7 +16,6 @@ package datamanager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/ring"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -151,10 +150,8 @@ func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) {
|
||||
}
|
||||
|
||||
type WalFile struct {
|
||||
WalSequence string
|
||||
Err error
|
||||
Committed bool
|
||||
Checkpointed bool
|
||||
WalSequence string
|
||||
Err error
|
||||
}
|
||||
|
||||
func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
|
||||
@ -181,36 +178,25 @@ func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
|
||||
|
||||
name := path.Base(object.Path)
|
||||
ext := path.Ext(name)
|
||||
// accept only ".committed" files (skip old files that had ".checkpointed" extensions)
|
||||
if ext != ".committed" {
|
||||
continue
|
||||
}
|
||||
|
||||
walSequence := strings.TrimSuffix(name, ext)
|
||||
|
||||
// wal file refers to another wal, so return the current one
|
||||
if curWal.WalSequence != walSequence {
|
||||
// if this happen something is wrong on the objectstorage
|
||||
if !curWal.Committed && curWal.Checkpointed {
|
||||
walCh <- &WalFile{
|
||||
Err: errors.Errorf("wal is checkpointed but not committed. this should never happen"),
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if curWal.WalSequence != "" {
|
||||
// skip not committed wals
|
||||
if curWal.Committed {
|
||||
walCh <- curWal
|
||||
}
|
||||
walCh <- curWal
|
||||
}
|
||||
|
||||
curWal = &WalFile{
|
||||
WalSequence: walSequence,
|
||||
}
|
||||
}
|
||||
|
||||
if ext == ".committed" {
|
||||
curWal.Committed = true
|
||||
}
|
||||
if ext == ".checkpointed" {
|
||||
curWal.Checkpointed = true
|
||||
}
|
||||
}
|
||||
|
||||
if curWal.WalSequence != "" {
|
||||
walCh <- curWal
|
||||
}
|
||||
@ -627,13 +613,6 @@ func (d *DataManager) sync(ctx context.Context) error {
|
||||
if !tresp.Succeeded {
|
||||
return errors.Errorf("failed to write committedstorage wal: concurrent update")
|
||||
}
|
||||
case WalStatusCheckpointed:
|
||||
walFilePath := d.storageWalStatusFile(walData.WalSequence)
|
||||
d.log.Debugf("checkpointing committed wal to storage")
|
||||
walFileCheckpointedPath := walFilePath + ".checkpointed"
|
||||
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), 0, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -963,9 +942,6 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
|
||||
WalStatus: WalStatusCommittedStorage,
|
||||
PreviousWalSequence: header.PreviousWalSequence,
|
||||
}
|
||||
if wal.Checkpointed {
|
||||
walData.WalStatus = WalStatusCheckpointed
|
||||
}
|
||||
walDataj, err := json.Marshal(walData)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1057,8 +1033,6 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
|
||||
// if there're some wals in the objectstorage this means etcd has been reset.
|
||||
// So take all the wals in committed or checkpointed state starting from the
|
||||
// first not checkpointed wal and put them in etcd
|
||||
lastCommittedStorageWalsRing := ring.New(100)
|
||||
lastCommittedStorageWalElem := lastCommittedStorageWalsRing
|
||||
lastCommittedStorageWalSequence := ""
|
||||
wroteWals := 0
|
||||
for wal := range d.ListOSTWals("") {
|
||||
@ -1075,37 +1049,13 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
|
||||
continue
|
||||
}
|
||||
|
||||
lastCommittedStorageWalElem.Value = wal
|
||||
lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next()
|
||||
lastCommittedStorageWalSequence = wal.WalSequence
|
||||
if wal.Checkpointed {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := writeWal(wal); err != nil {
|
||||
return err
|
||||
}
|
||||
wroteWals++
|
||||
}
|
||||
|
||||
// if no wal has been written (because all are checkpointed), write at least
|
||||
// the ones in the ring
|
||||
if wroteWals == 0 {
|
||||
var err error
|
||||
lastCommittedStorageWalsRing.Do(func(e interface{}) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
wal := e.(*WalFile)
|
||||
err = writeWal(wal)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
lastCommittedStorageWalSequence = wal.WalSequence
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// insert an empty wal and make it already committedstorage
|
||||
|
Loading…
Reference in New Issue
Block a user