Merge pull request #64 from sgotti/use_sleep_timers_loops

*: use sleep timer in loops
This commit is contained in:
Simone Gotti 2019-07-25 17:56:46 +02:00 committed by GitHub
commit d18c878d4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 106 additions and 72 deletions

View File

@ -184,14 +184,13 @@ func (d *DataManager) watcherLoop(ctx context.Context) {
} }
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
d.log.Infof("watcher exiting") d.log.Infof("watcher exiting")
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }

View File

@ -208,7 +208,13 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
break break
} }
d.log.Errorf("failed to initialize etcd: %+v", err) d.log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
} }
readyCh <- struct{}{} readyCh <- struct{}{}

View File

@ -550,13 +550,12 @@ func (d *DataManager) syncLoop(ctx context.Context) {
d.log.Errorf("syncer error: %+v", err) d.log.Errorf("syncer error: %+v", err)
} }
sleepCh := time.NewTimer(5 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(5 * time.Second)
} }
} }
@ -647,13 +646,12 @@ func (d *DataManager) checkpointLoop(ctx context.Context) {
d.log.Errorf("checkpoint error: %v", err) d.log.Errorf("checkpoint error: %v", err)
} }
sleepCh := time.NewTimer(d.checkpointInterval).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(d.checkpointInterval)
} }
} }
@ -730,13 +728,12 @@ func (d *DataManager) walCleanerLoop(ctx context.Context) {
d.log.Errorf("walcleaner error: %v", err) d.log.Errorf("walcleaner error: %v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }
@ -805,13 +802,12 @@ func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) {
d.log.Errorf("err: %+v", err) d.log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }
@ -891,13 +887,12 @@ func (d *DataManager) etcdPingerLoop(ctx context.Context) {
d.log.Errorf("err: %+v", err) d.log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }

View File

@ -419,7 +419,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
} }
r.log.Errorf("initialize err: %+v", err) r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second) sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
} }
} }
r.SetInitialized(true) r.SetInitialized(true)
@ -437,17 +442,22 @@ func (r *ReadDB) Run(ctx context.Context) error {
} }
r.log.Errorf("initialize err: %+v", err) r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second) sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
} }
errCh := make(chan error, 1) errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(ctx) hctx, cancel := context.WithCancel(ctx)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
r.log.Infof("starting handleEvents") r.log.Infof("starting handleEvents")
if err := r.handleEvents(ctx); err != nil { if err := r.handleEvents(hctx); err != nil {
r.log.Errorf("handleEvents err: %+v", err) r.log.Errorf("handleEvents err: %+v", err)
errCh <- err errCh <- err
} }
@ -466,7 +476,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
wg.Wait() wg.Wait()
} }
time.Sleep(1 * time.Second) sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
} }
} }

View File

@ -999,13 +999,12 @@ func (e *Executor) podsCleanerLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }
@ -1056,13 +1055,12 @@ func (e *Executor) executorStatusSenderLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }
@ -1090,13 +1088,12 @@ func (e *Executor) executorTasksStatusSenderLoop(ctx context.Context) {
rt.Unlock() rt.Unlock()
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }
@ -1108,13 +1105,12 @@ func (e *Executor) tasksUpdaterLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }
@ -1188,13 +1184,12 @@ func (e *Executor) tasksDataCleanerLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }

View File

@ -40,13 +40,12 @@ func (n *NotificationService) runEventsHandlerLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }

View File

@ -269,7 +269,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
} }
r.log.Errorf("initialize err: %+v", err) r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second) sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
} }
} }
r.SetInitialized(true) r.SetInitialized(true)
@ -287,17 +292,22 @@ func (r *ReadDB) Run(ctx context.Context) error {
} }
r.log.Errorf("initialize err: %+v", err) r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second) sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
} }
errCh := make(chan error, 2) errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(ctx) hctx, cancel := context.WithCancel(ctx)
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
r.log.Infof("starting handleEvents") r.log.Infof("starting handleEvents")
if err := r.handleEvents(ctx); err != nil { if err := r.handleEvents(hctx); err != nil {
r.log.Errorf("handleEvents err: %+v", err) r.log.Errorf("handleEvents err: %+v", err)
errCh <- err errCh <- err
} }
@ -307,7 +317,7 @@ func (r *ReadDB) Run(ctx context.Context) error {
wg.Add(1) wg.Add(1)
go func() { go func() {
r.log.Infof("starting handleEventsOST") r.log.Infof("starting handleEventsOST")
if err := r.handleEventsOST(ctx); err != nil { if err := r.handleEventsOST(hctx); err != nil {
r.log.Errorf("handleEventsOST err: %+v", err) r.log.Errorf("handleEventsOST err: %+v", err)
errCh <- err errCh <- err
} }
@ -326,7 +336,12 @@ func (r *ReadDB) Run(ctx context.Context) error {
wg.Wait() wg.Wait()
} }
time.Sleep(1 * time.Second) sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return nil
case <-sleepCh:
}
} }
} }

View File

@ -49,13 +49,12 @@ func (s *Runservice) etcdPingerLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }

View File

@ -425,13 +425,12 @@ func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }
@ -767,13 +766,12 @@ func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(1 * time.Second)
} }
} }
@ -855,7 +853,12 @@ func (s *Runservice) runTasksUpdaterLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
time.Sleep(10 * time.Second) sleepCh := time.NewTimer(10 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
} }
} }
@ -1148,13 +1151,12 @@ func (s *Runservice) fetcherLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }
@ -1217,13 +1219,12 @@ func (s *Runservice) runsSchedulerLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }
@ -1260,13 +1261,12 @@ func (s *Runservice) finishedRunsArchiverLoop(ctx context.Context) {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(2 * time.Second)
} }
} }
@ -1361,13 +1361,12 @@ func (s *Runservice) cacheCleanerLoop(ctx context.Context, cacheExpireInterval t
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(cacheCleanerInterval).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
default: case <-sleepCh:
} }
time.Sleep(cacheCleanerInterval)
} }
} }

View File

@ -40,7 +40,13 @@ func (s *Scheduler) scheduleLoop(ctx context.Context) {
if err := s.schedule(ctx); err != nil { if err := s.schedule(ctx); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
} }
} }
@ -108,7 +114,13 @@ func (s *Scheduler) approveLoop(ctx context.Context) {
if err := s.approve(ctx); err != nil { if err := s.approve(ctx); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
time.Sleep(1 * time.Second)
sleepCh := time.NewTimer(1 * time.Second).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
} }
} }