runservice: add etcd pinger loop
This commit is contained in:
parent
9c7c589bba
commit
e1368d18d6
|
@ -46,6 +46,8 @@ var (
|
||||||
|
|
||||||
EtcdExecutorsDir = "executors"
|
EtcdExecutorsDir = "executors"
|
||||||
EtcdTasksDir = "tasks"
|
EtcdTasksDir = "tasks"
|
||||||
|
|
||||||
|
EtcdPingKey = "ping"
|
||||||
)
|
)
|
||||||
|
|
||||||
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
|
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
|
||||||
|
|
|
@ -1532,6 +1532,34 @@ func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.D
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// etcdPingerLoop periodically updates a key.
|
||||||
|
// This is used by watchers to inform the client of the current revision
|
||||||
|
// this is needed since if other users are updating other unwatched keys on
|
||||||
|
// etcd we won't be notified, not updating the known revisions
|
||||||
|
// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress???
|
||||||
|
func (s *Scheduler) etcdPingerLoop(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
if err := s.etcdPinger(ctx); err != nil {
|
||||||
|
log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) etcdPinger(ctx context.Context) error {
|
||||||
|
if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
c *config.RunServiceScheduler
|
c *config.RunServiceScheduler
|
||||||
e *etcd.Store
|
e *etcd.Store
|
||||||
|
@ -1693,9 +1721,10 @@ func (s *Scheduler) Run(ctx context.Context) error {
|
||||||
go s.dumpLTSCleanerLoop(ctx)
|
go s.dumpLTSCleanerLoop(ctx)
|
||||||
go s.compactChangeGroupsLoop(ctx)
|
go s.compactChangeGroupsLoop(ctx)
|
||||||
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
|
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
|
||||||
|
|
||||||
go s.executorTaskUpdateHandler(ctx, ch)
|
go s.executorTaskUpdateHandler(ctx, ch)
|
||||||
|
|
||||||
|
go s.etcdPingerLoop(ctx)
|
||||||
|
|
||||||
var tlsConfig *tls.Config
|
var tlsConfig *tls.Config
|
||||||
if s.c.Web.TLS {
|
if s.c.Web.TLS {
|
||||||
var err error
|
var err error
|
||||||
|
|
Loading…
Reference in New Issue