readdb: improve HandleEvents goroutine exiting
Rename errCh to doneCh (error is not needed) and always send to it when one of the HandleEvents functions exits (not only on error). This will ensure that all the goroutines will be stopped also if one of them returns without an error.
This commit is contained in:
parent
aff44f7e89
commit
d679254516
@ -445,18 +445,19 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
errCh := make(chan error, 1)
|
doneCh := make(chan struct{}, 2)
|
||||||
hctx, cancel := context.WithCancel(ctx)
|
hctx, cancel := context.WithCancel(ctx)
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
r.log.Infof("starting handleEvents")
|
r.log.Infof("starting handleEvents")
|
||||||
if err := r.handleEvents(hctx); err != nil {
|
if err := r.handleEvents(hctx); err != nil {
|
||||||
r.log.Errorf("handleEvents err: %+v", err)
|
r.log.Errorf("handleEvents err: %+v", err)
|
||||||
errCh <- err
|
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
doneCh <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -464,7 +465,7 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
|||||||
r.log.Infof("readdb exiting")
|
r.log.Infof("readdb exiting")
|
||||||
cancel()
|
cancel()
|
||||||
return nil
|
return nil
|
||||||
case <-errCh:
|
case <-doneCh:
|
||||||
// cancel context and wait for the all the goroutines to exit
|
// cancel context and wait for the all the goroutines to exit
|
||||||
cancel()
|
cancel()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -306,28 +306,28 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
errCh := make(chan error, 2)
|
doneCh := make(chan struct{}, 2)
|
||||||
hctx, cancel := context.WithCancel(ctx)
|
hctx, cancel := context.WithCancel(ctx)
|
||||||
wg := &sync.WaitGroup{}
|
wg := &sync.WaitGroup{}
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(2)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
r.log.Infof("starting handleEvents")
|
r.log.Infof("starting handleEvents")
|
||||||
if err := r.handleEvents(hctx); err != nil {
|
if err := r.handleEvents(hctx); err != nil {
|
||||||
r.log.Errorf("handleEvents err: %+v", err)
|
r.log.Errorf("handleEvents err: %+v", err)
|
||||||
errCh <- err
|
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
doneCh <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
go func() {
|
||||||
r.log.Infof("starting handleEventsOST")
|
r.log.Infof("starting handleEventsOST")
|
||||||
if err := r.handleEventsOST(hctx); err != nil {
|
if err := r.handleEventsOST(hctx); err != nil {
|
||||||
r.log.Errorf("handleEventsOST err: %+v", err)
|
r.log.Errorf("handleEventsOST err: %+v", err)
|
||||||
errCh <- err
|
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
|
doneCh <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -335,7 +335,7 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
|||||||
r.log.Infof("readdb exiting")
|
r.log.Infof("readdb exiting")
|
||||||
cancel()
|
cancel()
|
||||||
return nil
|
return nil
|
||||||
case <-errCh:
|
case <-doneCh:
|
||||||
// cancel context and wait for the all the goroutines to exit
|
// cancel context and wait for the all the goroutines to exit
|
||||||
cancel()
|
cancel()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
Loading…
Reference in New Issue
Block a user