wal: send watch events only when needed

This commit is contained in:
Simone Gotti 2019-03-27 20:40:23 +01:00
parent 1e1ba89a3f
commit 0f9a5f9c4b
1 changed files with 9 additions and 1 deletions

View File

@ -480,6 +480,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle
defer close(walCh) defer close(walCh)
for wresp := range wch { for wresp := range wch {
we := &WatchElement{ChangeGroupsRevisions: make(changeGroupsRevisions)} we := &WatchElement{ChangeGroupsRevisions: make(changeGroupsRevisions)}
send := false
if wresp.Canceled { if wresp.Canceled {
err := wresp.Err() err := wresp.Err()
@ -501,6 +502,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle
switch { switch {
case strings.HasPrefix(key, etcdWalsDir+"/"): case strings.HasPrefix(key, etcdWalsDir+"/"):
send = true
switch ev.Type { switch ev.Type {
case mvccpb.PUT: case mvccpb.PUT:
var walData *WalData var walData *WalData
@ -514,6 +516,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle
} }
case strings.HasPrefix(key, etcdChangeGroupsDir+"/"): case strings.HasPrefix(key, etcdChangeGroupsDir+"/"):
send = true
switch ev.Type { switch ev.Type {
case mvccpb.PUT: case mvccpb.PUT:
changeGroup := path.Base(string(ev.Kv.Key)) changeGroup := path.Base(string(ev.Kv.Key))
@ -523,12 +526,17 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle
we.ChangeGroupsRevisions[changeGroup] = 0 we.ChangeGroupsRevisions[changeGroup] = 0
} }
case key == etcdPingKey:
send = true
default: default:
continue continue
} }
} }
walCh <- we if send {
walCh <- we
}
} }
}() }()