From 6f3798e8fe0900541461868d7aa881cc9ff5fd2d Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 25 Jul 2019 15:53:26 +0200 Subject: [PATCH] *: use sleep timer in loops So we'll react instantly to a context cancel instead of waiting on time.Sleep returning. --- internal/datamanager/changes.go | 5 +-- internal/datamanager/datamanager.go | 8 +++- internal/datamanager/wal.go | 25 +++++-------- .../services/configstore/readdb/readdb.go | 25 ++++++++++--- internal/services/executor/executor.go | 25 +++++-------- internal/services/notification/runevents.go | 5 +-- internal/services/runservice/readdb/readdb.go | 27 +++++++++++--- internal/services/runservice/runservice.go | 5 +-- internal/services/runservice/scheduler.go | 37 +++++++++---------- internal/services/scheduler/scheduler.go | 16 +++++++- 10 files changed, 106 insertions(+), 72 deletions(-) diff --git a/internal/datamanager/changes.go b/internal/datamanager/changes.go index 35975eb..6851129 100644 --- a/internal/datamanager/changes.go +++ b/internal/datamanager/changes.go @@ -184,14 +184,13 @@ func (d *DataManager) watcherLoop(ctx context.Context) { } } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): d.log.Infof("watcher exiting") return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index f856f44..5e9b2eb 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -208,7 +208,13 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { break } d.log.Errorf("failed to initialize etcd: %+v", err) - time.Sleep(1 * time.Second) + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } } readyCh <- struct{}{} diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index c1ca1f0..aa97f90 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -550,13 +550,12 @@ func (d *DataManager) syncLoop(ctx context.Context) { d.log.Errorf("syncer error: %+v", err) } + sleepCh := time.NewTimer(5 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(5 * time.Second) } } @@ -647,13 +646,12 @@ func (d *DataManager) checkpointLoop(ctx context.Context) { d.log.Errorf("checkpoint error: %v", err) } + sleepCh := time.NewTimer(d.checkpointInterval).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(d.checkpointInterval) } } @@ -730,13 +728,12 @@ func (d *DataManager) walCleanerLoop(ctx context.Context) { d.log.Errorf("walcleaner error: %v", err) } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } @@ -805,13 +802,12 @@ func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) { d.log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } @@ -891,13 +887,12 @@ func (d *DataManager) etcdPingerLoop(ctx context.Context) { d.log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 3522de0..2e43b16 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -419,7 +419,12 @@ func (r *ReadDB) Run(ctx context.Context) error { } r.log.Errorf("initialize err: %+v", err) - time.Sleep(1 * time.Second) + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } } } r.SetInitialized(true) @@ -437,17 +442,22 @@ func (r *ReadDB) Run(ctx context.Context) error { } r.log.Errorf("initialize err: %+v", err) - time.Sleep(1 * time.Second) + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } } errCh := make(chan error, 1) - ctx, cancel := context.WithCancel(ctx) + hctx, cancel := context.WithCancel(ctx) wg := &sync.WaitGroup{} wg.Add(1) go func() { r.log.Infof("starting handleEvents") - if err := r.handleEvents(ctx); err != nil { + if err := r.handleEvents(hctx); err != nil { r.log.Errorf("handleEvents err: %+v", err) errCh <- err } @@ -466,7 +476,12 @@ func (r *ReadDB) Run(ctx context.Context) error { wg.Wait() } - time.Sleep(1 * time.Second) + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } } } diff --git a/internal/services/executor/executor.go b/internal/services/executor/executor.go index 571f579..5191abe 100644 --- a/internal/services/executor/executor.go +++ b/internal/services/executor/executor.go @@ -999,13 +999,12 @@ func (e *Executor) podsCleanerLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } @@ -1056,13 +1055,12 @@ func (e *Executor) executorStatusSenderLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } @@ -1090,13 +1088,12 @@ func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) { rt.Unlock() } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } @@ -1108,13 +1105,12 @@ func (e *Executor) tasksUpdaterLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } @@ -1188,13 +1184,12 @@ func (e *Executor) tasksDataCleanerLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } diff --git a/internal/services/notification/runevents.go b/internal/services/notification/runevents.go index 656b746..acfeb90 100644 --- a/internal/services/notification/runevents.go +++ b/internal/services/notification/runevents.go @@ -40,13 +40,12 @@ func (n *NotificationService) runEventsHandlerLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index e82ef63..56d3337 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -269,7 +269,12 @@ func (r *ReadDB) Run(ctx context.Context) error { } r.log.Errorf("initialize err: %+v", err) - time.Sleep(1 * time.Second) + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } } } r.SetInitialized(true) @@ -287,17 +292,22 @@ func (r *ReadDB) Run(ctx context.Context) error { } r.log.Errorf("initialize err: %+v", err) - time.Sleep(1 * time.Second) + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } } errCh := make(chan error, 2) - ctx, cancel := context.WithCancel(ctx) + hctx, cancel := context.WithCancel(ctx) wg := &sync.WaitGroup{} wg.Add(1) go func() { r.log.Infof("starting handleEvents") - if err := r.handleEvents(ctx); err != nil { + if err := r.handleEvents(hctx); err != nil { r.log.Errorf("handleEvents err: %+v", err) errCh <- err } @@ -307,7 +317,7 @@ func (r *ReadDB) Run(ctx context.Context) error { wg.Add(1) go func() { r.log.Infof("starting handleEventsOST") - if err := r.handleEventsOST(ctx); err != nil { + if err := r.handleEventsOST(hctx); err != nil { r.log.Errorf("handleEventsOST err: %+v", err) errCh <- err } @@ -326,7 +336,12 @@ func (r *ReadDB) Run(ctx context.Context) error { wg.Wait() } - time.Sleep(1 * time.Second) + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } } } diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index aabbf44..cb4c28d 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -49,13 +49,12 @@ func (s *Runservice) etcdPingerLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 0052580..80ca0b7 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -425,13 +425,12 @@ func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } @@ -767,13 +766,12 @@ func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(1 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(1 * time.Second) } } @@ -855,7 +853,12 @@ func (s *Runservice) runTasksUpdaterLoop(ctx context.Context) { log.Errorf("err: %+v", err) } - time.Sleep(10 * time.Second) + sleepCh := time.NewTimer(10 * time.Second).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } } } @@ -1148,13 +1151,12 @@ func (s *Runservice) fetcherLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } @@ -1217,13 +1219,12 @@ func (s *Runservice) runsSchedulerLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } @@ -1260,13 +1261,12 @@ func (s *Runservice) finishedRunsArchiverLoop(ctx context.Context) { log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(2 * time.Second).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(2 * time.Second) } } @@ -1361,13 +1361,12 @@ func (s *Runservice) cacheCleanerLoop(ctx context.Context, cacheExpireInterval t log.Errorf("err: %+v", err) } + sleepCh := time.NewTimer(cacheCleanerInterval).C select { case <-ctx.Done(): return - default: + case <-sleepCh: } - - time.Sleep(cacheCleanerInterval) } } diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go index 12710b4..d6edeac 100644 --- a/internal/services/scheduler/scheduler.go +++ b/internal/services/scheduler/scheduler.go @@ -40,7 +40,13 @@ func (s *Scheduler) scheduleLoop(ctx context.Context) { if err := s.schedule(ctx); err != nil { log.Errorf("err: %+v", err) } - time.Sleep(1 * time.Second) + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } } } @@ -108,7 +114,13 @@ func (s *Scheduler) approveLoop(ctx context.Context) { if err := s.approve(ctx); err != nil { log.Errorf("err: %+v", err) } - time.Sleep(1 * time.Second) + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } } }