diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 4f713fc..baf5c2f 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -91,17 +91,10 @@ func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex, wimap := map[string]map[string]*Action{} for _, walData := range wals { - walFilef, err := d.ReadWal(walData.WalSequence) + header, err := d.ReadWal(walData.WalSequence) if err != nil { return nil, err } - dec := json.NewDecoder(walFilef) - var header *WalHeader - if err = dec.Decode(&header); err != nil && err != io.EOF { - walFilef.Close() - return nil, err - } - walFilef.Close() walFile, err := d.ReadWalData(header.WalDataFileID) if err != nil { @@ -109,7 +102,7 @@ func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex, } defer walFile.Close() - dec = json.NewDecoder(walFile) + dec := json.NewDecoder(walFile) for { var action *Action diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 4bc583a..b81a4ec 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -141,8 +141,19 @@ func (d *DataManager) HasOSTWal(walseq string) (bool, error) { return true, nil } -func (d *DataManager) ReadWal(walseq string) (io.ReadCloser, error) { - return d.ost.ReadObject(d.storageWalStatusFile(walseq) + ".committed") +func (d *DataManager) ReadWal(walseq string) (*WalHeader, error) { + walFilef, err := d.ost.ReadObject(d.storageWalStatusFile(walseq) + ".committed") + if err != nil { + return nil, err + } + defer walFilef.Close() + dec := json.NewDecoder(walFilef) + var header *WalHeader + if err = dec.Decode(&header); err != nil { + return nil, err + } + + return header, nil } func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) { diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index cbec1ba..ca32a5a 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -197,17 +197,10 @@ func (r *ReadDB) SyncFromWals(ctx context.Context, startWalSeq, endWalSeq string insertfunc := func(walFiles []*datamanager.WalFile) error { err := r.rdb.Do(ctx, func(tx *db.Tx) error { for _, walFile := range walFiles { - walFilef, err := r.dm.ReadWal(walFile.WalSequence) + header, err := r.dm.ReadWal(walFile.WalSequence) if err != nil { return err } - dec := json.NewDecoder(walFilef) - var header *datamanager.WalHeader - if err = dec.Decode(&header); err != nil && err != io.EOF { - walFilef.Close() - return err - } - walFilef.Close() if err := r.insertCommittedWalSequence(tx, walFile.WalSequence); err != nil { return err } diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 890b715..029af56 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -735,17 +735,10 @@ func (r *ReadDB) SyncFromWals(ctx context.Context, startWalSeq, endWalSeq string insertfunc := func(walFiles []*datamanager.WalFile) error { err := r.rdb.Do(ctx, func(tx *db.Tx) error { for _, walFile := range walFiles { - walFilef, err := r.dm.ReadWal(walFile.WalSequence) + header, err := r.dm.ReadWal(walFile.WalSequence) if err != nil { return err } - dec := json.NewDecoder(walFilef) - var header *datamanager.WalHeader - if err = dec.Decode(&header); err != nil && err != io.EOF { - walFilef.Close() - return err - } - walFilef.Close() if err := r.insertCommittedWalSequenceOST(tx, walFile.WalSequence); err != nil { return err }