Merge pull request #168 from sgotti/datamanager_refactor_readwal

datamanager: refactor ReadWal
This commit is contained in:
Simone Gotti 2019-11-08 14:26:55 +01:00 committed by GitHub
commit 9fbfeb9d13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 17 additions and 27 deletions

View File

@ -91,17 +91,10 @@ func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex,
wimap := map[string]map[string]*Action{}
for _, walData := range wals {
walFilef, err := d.ReadWal(walData.WalSequence)
header, err := d.ReadWal(walData.WalSequence)
if err != nil {
return nil, err
}
dec := json.NewDecoder(walFilef)
var header *WalHeader
if err = dec.Decode(&header); err != nil && err != io.EOF {
walFilef.Close()
return nil, err
}
walFilef.Close()
walFile, err := d.ReadWalData(header.WalDataFileID)
if err != nil {
@ -109,7 +102,7 @@ func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex,
}
defer walFile.Close()
dec = json.NewDecoder(walFile)
dec := json.NewDecoder(walFile)
for {
var action *Action

View File

@ -141,8 +141,19 @@ func (d *DataManager) HasOSTWal(walseq string) (bool, error) {
return true, nil
}
func (d *DataManager) ReadWal(walseq string) (io.ReadCloser, error) {
return d.ost.ReadObject(d.storageWalStatusFile(walseq) + ".committed")
func (d *DataManager) ReadWal(walseq string) (*WalHeader, error) {
walFilef, err := d.ost.ReadObject(d.storageWalStatusFile(walseq) + ".committed")
if err != nil {
return nil, err
}
defer walFilef.Close()
dec := json.NewDecoder(walFilef)
var header *WalHeader
if err = dec.Decode(&header); err != nil {
return nil, err
}
return header, nil
}
func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) {

View File

@ -197,17 +197,10 @@ func (r *ReadDB) SyncFromWals(ctx context.Context, startWalSeq, endWalSeq string
insertfunc := func(walFiles []*datamanager.WalFile) error {
err := r.rdb.Do(ctx, func(tx *db.Tx) error {
for _, walFile := range walFiles {
walFilef, err := r.dm.ReadWal(walFile.WalSequence)
header, err := r.dm.ReadWal(walFile.WalSequence)
if err != nil {
return err
}
dec := json.NewDecoder(walFilef)
var header *datamanager.WalHeader
if err = dec.Decode(&header); err != nil && err != io.EOF {
walFilef.Close()
return err
}
walFilef.Close()
if err := r.insertCommittedWalSequence(tx, walFile.WalSequence); err != nil {
return err
}

View File

@ -735,17 +735,10 @@ func (r *ReadDB) SyncFromWals(ctx context.Context, startWalSeq, endWalSeq string
insertfunc := func(walFiles []*datamanager.WalFile) error {
err := r.rdb.Do(ctx, func(tx *db.Tx) error {
for _, walFile := range walFiles {
walFilef, err := r.dm.ReadWal(walFile.WalSequence)
header, err := r.dm.ReadWal(walFile.WalSequence)
if err != nil {
return err
}
dec := json.NewDecoder(walFilef)
var header *datamanager.WalHeader
if err = dec.Decode(&header); err != nil && err != io.EOF {
walFilef.Close()
return err
}
walFilef.Close()
if err := r.insertCommittedWalSequenceOST(tx, walFile.WalSequence); err != nil {
return err
}