diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 6c25a5f..7705950 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -21,6 +21,7 @@ import ( "io" "os" "path/filepath" + "sync" "time" "agola.io/agola/internal/datamanager" @@ -60,6 +61,7 @@ type ReadDB struct { dm *datamanager.DataManager Initialized bool + initLock sync.Mutex } func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) { @@ -88,6 +90,18 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. return readDB, nil } +func (r *ReadDB) SetInitialized(initialized bool) { + r.initLock.Lock() + r.Initialized = initialized + r.initLock.Unlock() +} + +func (r *ReadDB) IsInitialized() bool { + r.initLock.Lock() + defer r.initLock.Unlock() + return r.Initialized +} + // Initialize populates the readdb with the current etcd data and save the // revision to then feed it with the etcd events func (r *ReadDB) Initialize(ctx context.Context) error { @@ -391,11 +405,10 @@ func (r *ReadDB) Run(ctx context.Context) error { return err } - if revision == 0 { + if revision == 0 || !r.Initialized { for { err := r.Initialize(ctx) if err == nil { - r.Initialized = true break } r.log.Errorf("initialize err: %+v", err) @@ -403,18 +416,48 @@ func (r *ReadDB) Run(ctx context.Context) error { time.Sleep(1 * time.Second) } } + r.SetInitialized(true) + errCh := make(chan error) for { - if err := r.HandleEvents(ctx); err != nil { - r.log.Errorf("handleevents err: %+v", err) + for { + initialized := r.IsInitialized() + if initialized { + break + } + err := r.Initialize(ctx) + if err == nil { + r.SetInitialized(true) + break + } + r.log.Errorf("initialize err: %+v", err) + + time.Sleep(1 * time.Second) } + ctx, cancel := context.WithCancel(ctx) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + r.log.Infof("starting handleEvents") + if err := r.handleEvents(ctx); err != nil { + r.log.Errorf("handleEvents err: %+v", err) + errCh <- err + } + wg.Done() + }() + select { case <-ctx.Done(): r.log.Infof("readdb exiting") + cancel() r.rdb.Close() return nil - default: + case <-errCh: + // cancel context and wait for the all the goroutines to exit + cancel() + wg.Wait() } time.Sleep(1 * time.Second) @@ -424,7 +467,7 @@ func (r *ReadDB) Run(ctx context.Context) error { // TODO(sgotti) improve to apply when the wal have been "committedstorage" and // not only "committed", in this way we don't have to full resync when etcd is // lost/reset -func (r *ReadDB) HandleEvents(ctx context.Context) error { +func (r *ReadDB) handleEvents(ctx context.Context) error { var revision int64 err := r.rdb.Do(func(tx *db.Tx) error { err := tx.QueryRow("select revision from revision order by revision desc limit 1").Scan(&revision)