configstore: sync readdb run method with the one of runservice
This commit is contained in:
parent
5643dd5dcd
commit
f1908c3a16
|
@ -21,6 +21,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"agola.io/agola/internal/datamanager"
|
||||
|
@ -60,6 +61,7 @@ type ReadDB struct {
|
|||
dm *datamanager.DataManager
|
||||
|
||||
Initialized bool
|
||||
initLock sync.Mutex
|
||||
}
|
||||
|
||||
func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) {
|
||||
|
@ -88,6 +90,18 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.
|
|||
return readDB, nil
|
||||
}
|
||||
|
||||
func (r *ReadDB) SetInitialized(initialized bool) {
|
||||
r.initLock.Lock()
|
||||
r.Initialized = initialized
|
||||
r.initLock.Unlock()
|
||||
}
|
||||
|
||||
func (r *ReadDB) IsInitialized() bool {
|
||||
r.initLock.Lock()
|
||||
defer r.initLock.Unlock()
|
||||
return r.Initialized
|
||||
}
|
||||
|
||||
// Initialize populates the readdb with the current etcd data and save the
|
||||
// revision to then feed it with the etcd events
|
||||
func (r *ReadDB) Initialize(ctx context.Context) error {
|
||||
|
@ -391,11 +405,10 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if revision == 0 {
|
||||
if revision == 0 || !r.Initialized {
|
||||
for {
|
||||
err := r.Initialize(ctx)
|
||||
if err == nil {
|
||||
r.Initialized = true
|
||||
break
|
||||
}
|
||||
r.log.Errorf("initialize err: %+v", err)
|
||||
|
@ -403,18 +416,48 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
|||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
r.SetInitialized(true)
|
||||
|
||||
errCh := make(chan error)
|
||||
for {
|
||||
if err := r.HandleEvents(ctx); err != nil {
|
||||
r.log.Errorf("handleevents err: %+v", err)
|
||||
for {
|
||||
initialized := r.IsInitialized()
|
||||
if initialized {
|
||||
break
|
||||
}
|
||||
err := r.Initialize(ctx)
|
||||
if err == nil {
|
||||
r.SetInitialized(true)
|
||||
break
|
||||
}
|
||||
r.log.Errorf("initialize err: %+v", err)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
r.log.Infof("starting handleEvents")
|
||||
if err := r.handleEvents(ctx); err != nil {
|
||||
r.log.Errorf("handleEvents err: %+v", err)
|
||||
errCh <- err
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
r.log.Infof("readdb exiting")
|
||||
cancel()
|
||||
r.rdb.Close()
|
||||
return nil
|
||||
default:
|
||||
case <-errCh:
|
||||
// cancel context and wait for the all the goroutines to exit
|
||||
cancel()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
@ -424,7 +467,7 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
|||
// TODO(sgotti) improve to apply when the wal have been "committedstorage" and
|
||||
// not only "committed", in this way we don't have to full resync when etcd is
|
||||
// lost/reset
|
||||
func (r *ReadDB) HandleEvents(ctx context.Context) error {
|
||||
func (r *ReadDB) handleEvents(ctx context.Context) error {
|
||||
var revision int64
|
||||
err := r.rdb.Do(func(tx *db.Tx) error {
|
||||
err := tx.QueryRow("select revision from revision order by revision desc limit 1").Scan(&revision)
|
||||
|
|
Loading…
Reference in New Issue