diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/scheduler/readdb/readdb.go index 2cdf965..b25ab38 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/scheduler/readdb/readdb.go @@ -90,7 +90,7 @@ type ReadDB struct { wal *wal.WalManager Initialized bool - initMutex sync.Mutex + m sync.Mutex } func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, wal *wal.WalManager) (*ReadDB, error) { @@ -118,6 +118,18 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. return readDB, nil } +func (r *ReadDB) SetInitialized(initialized bool) { + r.m.Lock() + r.Initialized = initialized + r.m.Unlock() +} + +func (r *ReadDB) IsInitialized() bool { + r.m.Lock() + defer r.m.Unlock() + return r.Initialized +} + // Initialize populates the readdb with the current etcd data and save the // revision to then feed it with the etcd events func (r *ReadDB) Initialize(ctx context.Context) error { @@ -353,17 +365,17 @@ func (r *ReadDB) Run(ctx context.Context) error { time.Sleep(1 * time.Second) } } - r.Initialized = true + r.SetInitialized(true) for { for { - initialized := r.Initialized + initialized := r.IsInitialized() if initialized { break } err := r.Initialize(ctx) if err == nil { - r.Initialized = true + r.SetInitialized(true) break } r.log.Errorf("initialize err: %+v", err) @@ -414,7 +426,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { } if lastRun != nil { if runSequence == nil { - r.Initialized = false + r.SetInitialized(false) return errors.Errorf("no runsequence in etcd, reinitializing.") } @@ -425,7 +437,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { // check that the run sequence epoch isn't different than the current one (this means etcd // has been reset, or worst, restored from a backup or manually deleted) if runSequence == nil || runSequence.Epoch != lastRunSequence.Epoch { - r.Initialized = false + r.SetInitialized(false) return errors.Errorf("last run epoch %d is different than current epoch in etcd %d, reinitializing.", lastRunSequence.Epoch, runSequence.Epoch) } } @@ -439,7 +451,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { err = wresp.Err() if err == etcdclientv3rpc.ErrCompacted { r.log.Errorf("required events already compacted, reinitializing readdb") - r.Initialized = false + r.SetInitialized(false) } return errors.Wrapf(err, "watch error") } @@ -553,6 +565,9 @@ func (r *ReadDB) handleChangeGroupEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp } func (r *ReadDB) Do(f func(tx *db.Tx) error) error { + if !r.IsInitialized() { + return errors.Errorf("db not initialized") + } return r.rdb.Do(f) }