readdb: ensure that we apply only etcd committed wals

Ensure that we apply only etcd commited wals to avoid doing an unuseful insert
when the wal becomes committed storage.
This commit is contained in:
Simone Gotti 2019-06-03 18:02:09 +02:00
parent 51a5594c89
commit 6d095cbe50
2 changed files with 19 additions and 18 deletions
internal/services
configstore/readdb
runservice/readdb

View File

@ -351,17 +351,15 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
if walElement.WalData.WalSequence <= curWalSeq { if walElement.WalData.WalSequence <= curWalSeq {
continue continue
} }
//if walElement.WalData.WalStatus == datamanager.WalStatusCommittedStorage {
if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil { if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil {
return err return err
} }
//}
//// update readdb only when the wal has been committed to objectstorage // update readdb only when the wal has been committed to etcd
//if walElement.WalData.WalStatus != datamanager.WalStatusCommittedStorage { if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
// return nil return nil
//} }
r.log.Debugf("applying wal to db") r.log.Debugf("applying wal to db")
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
@ -524,17 +522,6 @@ func (r *ReadDB) handleEvent(tx *db.Tx, we *datamanager.WatchElement) error {
} }
func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error { func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error {
// update readdb only when the wal has been committed to objectstorage
//if we.WalData.WalStatus != wal.WalStatusCommittedStorage {
// return nil
//}
if we.WalData != nil {
if err := r.insertCommittedWalSequence(tx, we.WalData.WalSequence); err != nil {
return err
}
}
for cgName, cgRev := range we.ChangeGroupsRevisions { for cgName, cgRev := range we.ChangeGroupsRevisions {
if err := r.insertChangeGroupRevision(tx, cgName, cgRev); err != nil { if err := r.insertChangeGroupRevision(tx, cgName, cgRev); err != nil {
return err return err
@ -542,6 +529,15 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error {
} }
if we.WalData != nil { if we.WalData != nil {
// update readdb only when the wal has been committed to etcd
if we.WalData.WalStatus != datamanager.WalStatusCommitted {
return nil
}
if err := r.insertCommittedWalSequence(tx, we.WalData.WalSequence); err != nil {
return err
}
r.log.Debugf("applying wal to db") r.log.Debugf("applying wal to db")
return r.applyWal(tx, we.WalData.WalDataFileID) return r.applyWal(tx, we.WalData.WalDataFileID)
} }

View File

@ -622,6 +622,11 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
return err return err
} }
// update readdb only when the wal has been committed to etcd
if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
return nil
}
r.log.Debugf("applying wal to db") r.log.Debugf("applying wal to db")
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
return err return err
@ -923,7 +928,7 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error {
} }
if we.WalData != nil { if we.WalData != nil {
// update readdb only when the wal has been committed to objectstorage // update readdb only when the wal has been committed to etcd
if we.WalData.WalStatus != datamanager.WalStatusCommitted { if we.WalData.WalStatus != datamanager.WalStatusCommitted {
return nil return nil
} }