From 0f9a5f9c4b4099f3c499253c9c862aa85fe508ca Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 27 Mar 2019 20:40:23 +0100 Subject: [PATCH] wal: send watch events only when needed --- internal/wal/wal.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/internal/wal/wal.go b/internal/wal/wal.go index aae67fc..98c9de8 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -480,6 +480,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle defer close(walCh) for wresp := range wch { we := &WatchElement{ChangeGroupsRevisions: make(changeGroupsRevisions)} + send := false if wresp.Canceled { err := wresp.Err() @@ -501,6 +502,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle switch { case strings.HasPrefix(key, etcdWalsDir+"/"): + send = true switch ev.Type { case mvccpb.PUT: var walData *WalData @@ -514,6 +516,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle } case strings.HasPrefix(key, etcdChangeGroupsDir+"/"): + send = true switch ev.Type { case mvccpb.PUT: changeGroup := path.Base(string(ev.Kv.Key)) @@ -523,12 +526,17 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle we.ChangeGroupsRevisions[changeGroup] = 0 } + case key == etcdPingKey: + send = true + default: continue } } - walCh <- we + if send { + walCh <- we + } } }()