diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/scheduler/common/common.go index 5478020..5ff069e 100644 --- a/internal/services/runservice/scheduler/common/common.go +++ b/internal/services/runservice/scheduler/common/common.go @@ -46,6 +46,8 @@ var ( EtcdExecutorsDir = "executors" EtcdTasksDir = "tasks" + + EtcdPingKey = "ping" ) func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index f72a61e..af39ef4 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -1532,6 +1532,34 @@ func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.D return nil } +// etcdPingerLoop periodically updates a key. +// This is used by watchers to inform the client of the current revision +// this is needed since if other users are updating other unwatched keys on +// etcd we won't be notified, not updating the known revisions +// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? +func (s *Scheduler) etcdPingerLoop(ctx context.Context) { + for { + if err := s.etcdPinger(ctx); err != nil { + log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(1 * time.Second) + } +} + +func (s *Scheduler) etcdPinger(ctx context.Context) error { + if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil { + return err + } + return nil +} + type Scheduler struct { c *config.RunServiceScheduler e *etcd.Store @@ -1693,9 +1721,10 @@ func (s *Scheduler) Run(ctx context.Context) error { go s.dumpLTSCleanerLoop(ctx) go s.compactChangeGroupsLoop(ctx) go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) - go s.executorTaskUpdateHandler(ctx, ch) + go s.etcdPingerLoop(ctx) + var tlsConfig *tls.Config if s.c.Web.TLS { var err error