readdb: insert current wal sequence after checking wal status

This commit is contained in:
Simone Gotti 2019-07-17 16:58:43 +02:00
parent f7175c4592
commit 16820e9033
2 changed files with 12 additions and 10 deletions

View File

@ -352,6 +352,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
} }
} }
r.log.Infof("syncing from wals")
err = r.rdb.Do(func(tx *db.Tx) error { err = r.rdb.Do(func(tx *db.Tx) error {
if err := r.insertRevision(tx, revision); err != nil { if err := r.insertRevision(tx, revision); err != nil {
return err return err
@ -360,21 +361,21 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
// use the same revision as previous operation // use the same revision as previous operation
for walElement := range r.dm.ListEtcdWals(ctx, revision) { for walElement := range r.dm.ListEtcdWals(ctx, revision) {
if walElement.Err != nil { if walElement.Err != nil {
return err return walElement.Err
} }
if walElement.WalData.WalSequence <= curWalSeq { if walElement.WalData.WalSequence <= curWalSeq {
continue continue
} }
if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil {
return err
}
// update readdb only when the wal has been committed to etcd // update readdb only when the wal has been committed to etcd
if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
return nil return nil
} }
if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil {
return err
}
r.log.Debugf("applying wal to db") r.log.Debugf("applying wal to db")
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
return err return err

View File

@ -602,6 +602,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
} }
} }
r.log.Infof("syncing from wals")
err = r.rdb.Do(func(tx *db.Tx) error { err = r.rdb.Do(func(tx *db.Tx) error {
if err := insertRevisionOST(tx, revision); err != nil { if err := insertRevisionOST(tx, revision); err != nil {
return err return err
@ -610,21 +611,21 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
// use the same revision as previous operation // use the same revision as previous operation
for walElement := range r.dm.ListEtcdWals(ctx, revision) { for walElement := range r.dm.ListEtcdWals(ctx, revision) {
if walElement.Err != nil { if walElement.Err != nil {
return err return walElement.Err
} }
if walElement.WalData.WalSequence <= curWalSeq { if walElement.WalData.WalSequence <= curWalSeq {
continue continue
} }
if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil {
return err
}
// update readdb only when the wal has been committed to etcd // update readdb only when the wal has been committed to etcd
if walElement.WalData.WalStatus != datamanager.WalStatusCommitted { if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
return nil return nil
} }
if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil {
return err
}
r.log.Debugf("applying wal to db") r.log.Debugf("applying wal to db")
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
return err return err