*: use sleep timer in loops

So we'll react instantly to a context cancel instead of waiting on time.Sleep
returning.
This commit is contained in:
Simone Gotti 2019-07-25 15:53:26 +02:00
parent 9481b8d67d
commit 6f3798e8fe
10 changed files with 106 additions and 72 deletions

View File

@ -184,14 +184,13 @@ func (d *DataManager) watcherLoop(ctx context.Context) {
}
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
d.log.Infof("watcher exiting")
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}

View File

@ -208,7 +208,13 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
break
}
d.log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
}
readyCh <- struct{}{}

View File

@ -550,13 +550,12 @@ func (d *DataManager) syncLoop(ctx context.Context) {
d.log.Errorf("syncer error: %+v", err)
}
sleepCh := time.NewTimer(5 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(5 * time.Second)
}
}
@ -647,13 +646,12 @@ func (d *DataManager) checkpointLoop(ctx context.Context) {
d.log.Errorf("checkpoint error: %v", err)
}
sleepCh := time.NewTimer(d.checkpointInterval).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(d.checkpointInterval)
}
}
@ -730,13 +728,12 @@ func (d *DataManager) walCleanerLoop(ctx context.Context) {
d.log.Errorf("walcleaner error: %v", err)
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}
@ -805,13 +802,12 @@ func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) {
d.log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}
@ -891,13 +887,12 @@ func (d *DataManager) etcdPingerLoop(ctx context.Context) {
d.log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}

View File

@ -419,7 +419,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
}
r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
}
}
r.SetInitialized(true)
@ -437,17 +442,22 @@ func (r *ReadDB) Run(ctx context.Context) error {
}
r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
}
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx)
hctx, cancel := context.WithCancel(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
r.log.Infof("starting handleEvents")
if err := r.handleEvents(ctx); err != nil {
if err := r.handleEvents(hctx); err != nil {
r.log.Errorf("handleEvents err: %+v", err)
errCh <- err
}
@ -466,7 +476,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
wg.Wait()
}
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
}
}

View File

@ -999,13 +999,12 @@ func (e *Executor) podsCleanerLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}
@ -1056,13 +1055,12 @@ func (e *Executor) executorStatusSenderLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}
@ -1090,13 +1088,12 @@ func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) {
rt.Unlock()
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}
@ -1108,13 +1105,12 @@ func (e *Executor) tasksUpdaterLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}
@ -1188,13 +1184,12 @@ func (e *Executor) tasksDataCleanerLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}

View File

@ -40,13 +40,12 @@ func (n *NotificationService) runEventsHandlerLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}

View File

@ -269,7 +269,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
}
r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
}
}
r.SetInitialized(true)
@ -287,17 +292,22 @@ func (r *ReadDB) Run(ctx context.Context) error {
}
r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
}
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(ctx)
hctx, cancel := context.WithCancel(ctx)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
r.log.Infof("starting handleEvents")
if err := r.handleEvents(ctx); err != nil {
if err := r.handleEvents(hctx); err != nil {
r.log.Errorf("handleEvents err: %+v", err)
errCh <- err
}
@ -307,7 +317,7 @@ func (r *ReadDB) Run(ctx context.Context) error {
wg.Add(1)
go func() {
r.log.Infof("starting handleEventsOST")
if err := r.handleEventsOST(ctx); err != nil {
if err := r.handleEventsOST(hctx); err != nil {
r.log.Errorf("handleEventsOST err: %+v", err)
errCh <- err
}
@ -326,7 +336,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
wg.Wait()
}
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
}
}

View File

@ -49,13 +49,12 @@ func (s *Runservice) etcdPingerLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}

View File

@ -425,13 +425,12 @@ func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}
@ -767,13 +766,12 @@ func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(1 * time.Second)
}
}
@ -855,7 +853,12 @@ func (s *Runservice) runTasksUpdaterLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
time.Sleep(10 * time.Second)
sleepCh := time.NewTimer(10 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
@ -1148,13 +1151,12 @@ func (s *Runservice) fetcherLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}
@ -1217,13 +1219,12 @@ func (s *Runservice) runsSchedulerLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}
@ -1260,13 +1261,12 @@ func (s *Runservice) finishedRunsArchiverLoop(ctx context.Context) {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(2 * time.Second).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(2 * time.Second)
}
}
@ -1361,13 +1361,12 @@ func (s *Runservice) cacheCleanerLoop(ctx context.Context, cacheExpireInterval t
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(cacheCleanerInterval).C
select {
case <-ctx.Done():
return
default:
case <-sleepCh:
}
time.Sleep(cacheCleanerInterval)
}
}

View File

@ -40,7 +40,13 @@ func (s *Scheduler) scheduleLoop(ctx context.Context) {
if err := s.schedule(ctx); err != nil {
log.Errorf("err: %+v", err)
}
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
@ -108,7 +114,13 @@ func (s *Scheduler) approveLoop(ctx context.Context) {
if err := s.approve(ctx); err != nil {
log.Errorf("err: %+v", err)
}
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}