diff --git a/internal/wal/changes.go b/internal/wal/changes.go index 5f466e7..c47e476 100644 --- a/internal/wal/changes.go +++ b/internal/wal/changes.go @@ -172,14 +172,6 @@ func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revi } w.applyWalChangesAction(ctx, action, walData.WalSequence, revision) - - additionalActions, err := w.additionalActionsFunc(action) - if err != nil { - return err - } - for _, action := range additionalActions { - w.applyWalChangesAction(ctx, action, walData.WalSequence, revision) - } } w.changes.updatePathsOrdered() diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 89149f1..aae67fc 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -201,17 +201,6 @@ func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *Cha w.log.Debugf("reading file from wal: %q", action.Path) return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil } - - additionalActions, err := w.additionalActionsFunc(action) - if err != nil { - return nil, nil, err - } - for _, action := range additionalActions { - if action.ActionType == ActionTypePut && action.Path == p { - w.log.Debugf("reading file from wal additional actions: %q", action.Path) - return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil - } - } } return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq) } @@ -305,7 +294,7 @@ func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-cha } func (w *WalManager) HasLtsWal(walseq string) (bool, error) { - _, err := w.lts.Stat(w.storageWalStatusFile(walseq)) + _, err := w.lts.Stat(w.storageWalStatusFile(walseq) + ".committed") if err == objectstorage.ErrNotExist { return false, nil } @@ -848,16 +837,6 @@ func (w *WalManager) checkpoint(ctx context.Context) error { if err := w.checkpointAction(ctx, action); err != nil { return err } - - additionalActions, err := w.additionalActionsFunc(action) - if err != nil { - return err - } - for _, action := range additionalActions { - if err := w.checkpointAction(ctx, action); err != nil { - return err - } - } } w.log.Debugf("updating wal to state %q", WalStatusCheckpointed) @@ -1214,13 +1193,12 @@ type WalManagerConfig struct { } type WalManager struct { - basePath string - log *zap.SugaredLogger - e *etcd.Store - lts *objectstorage.ObjStorage - changes *WalChanges - additionalActionsFunc AdditionalActionsFunc - etcdWalsKeepNum int + basePath string + log *zap.SugaredLogger + e *etcd.Store + lts *objectstorage.ObjStorage + changes *WalChanges + etcdWalsKeepNum int } func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) { @@ -1237,12 +1215,12 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf } w := &WalManager{ - basePath: conf.BasePath, - log: logger.Sugar(), - e: conf.E, - lts: conf.Lts, - additionalActionsFunc: additionalActionsFunc, - etcdWalsKeepNum: conf.EtcdWalsKeepNum, + basePath: conf.BasePath, + log: logger.Sugar(), + e: conf.E, + lts: conf.Lts, + etcdWalsKeepNum: conf.EtcdWalsKeepNum, + changes: NewWalChanges(), } // add trailing slash the basepath @@ -1254,7 +1232,6 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf } func (w *WalManager) Run(ctx context.Context) error { - w.changes = NewWalChanges() for { err := w.InitEtcd(ctx)