wal: remove additionalwalactions

This commit is contained in:
Simone Gotti 2019-03-27 20:39:59 +01:00
parent 61b454d4da
commit 1e1ba89a3f
2 changed files with 13 additions and 44 deletions

View File

@ -172,14 +172,6 @@ func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revi
} }
w.applyWalChangesAction(ctx, action, walData.WalSequence, revision) w.applyWalChangesAction(ctx, action, walData.WalSequence, revision)
additionalActions, err := w.additionalActionsFunc(action)
if err != nil {
return err
}
for _, action := range additionalActions {
w.applyWalChangesAction(ctx, action, walData.WalSequence, revision)
}
} }
w.changes.updatePathsOrdered() w.changes.updatePathsOrdered()

View File

@ -201,17 +201,6 @@ func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *Cha
w.log.Debugf("reading file from wal: %q", action.Path) w.log.Debugf("reading file from wal: %q", action.Path)
return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil
} }
additionalActions, err := w.additionalActionsFunc(action)
if err != nil {
return nil, nil, err
}
for _, action := range additionalActions {
if action.ActionType == ActionTypePut && action.Path == p {
w.log.Debugf("reading file from wal additional actions: %q", action.Path)
return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil
}
}
} }
return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq) return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq)
} }
@ -305,7 +294,7 @@ func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-cha
} }
func (w *WalManager) HasLtsWal(walseq string) (bool, error) { func (w *WalManager) HasLtsWal(walseq string) (bool, error) {
_, err := w.lts.Stat(w.storageWalStatusFile(walseq)) _, err := w.lts.Stat(w.storageWalStatusFile(walseq) + ".committed")
if err == objectstorage.ErrNotExist { if err == objectstorage.ErrNotExist {
return false, nil return false, nil
} }
@ -848,16 +837,6 @@ func (w *WalManager) checkpoint(ctx context.Context) error {
if err := w.checkpointAction(ctx, action); err != nil { if err := w.checkpointAction(ctx, action); err != nil {
return err return err
} }
additionalActions, err := w.additionalActionsFunc(action)
if err != nil {
return err
}
for _, action := range additionalActions {
if err := w.checkpointAction(ctx, action); err != nil {
return err
}
}
} }
w.log.Debugf("updating wal to state %q", WalStatusCheckpointed) w.log.Debugf("updating wal to state %q", WalStatusCheckpointed)
@ -1214,13 +1193,12 @@ type WalManagerConfig struct {
} }
type WalManager struct { type WalManager struct {
basePath string basePath string
log *zap.SugaredLogger log *zap.SugaredLogger
e *etcd.Store e *etcd.Store
lts *objectstorage.ObjStorage lts *objectstorage.ObjStorage
changes *WalChanges changes *WalChanges
additionalActionsFunc AdditionalActionsFunc etcdWalsKeepNum int
etcdWalsKeepNum int
} }
func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) { func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) {
@ -1237,12 +1215,12 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf
} }
w := &WalManager{ w := &WalManager{
basePath: conf.BasePath, basePath: conf.BasePath,
log: logger.Sugar(), log: logger.Sugar(),
e: conf.E, e: conf.E,
lts: conf.Lts, lts: conf.Lts,
additionalActionsFunc: additionalActionsFunc, etcdWalsKeepNum: conf.EtcdWalsKeepNum,
etcdWalsKeepNum: conf.EtcdWalsKeepNum, changes: NewWalChanges(),
} }
// add trailing slash the basepath // add trailing slash the basepath
@ -1254,7 +1232,6 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf
} }
func (w *WalManager) Run(ctx context.Context) error { func (w *WalManager) Run(ctx context.Context) error {
w.changes = NewWalChanges()
for { for {
err := w.InitEtcd(ctx) err := w.InitEtcd(ctx)