diff --git a/internal/services/configstore/configstore.go b/internal/services/configstore/configstore.go index fb5f989..f7e1749 100644 --- a/internal/services/configstore/configstore.go +++ b/internal/services/configstore/configstore.go @@ -95,8 +95,13 @@ func NewConfigStore(ctx context.Context, c *config.ConfigStore) (*ConfigStore, e func (s *ConfigStore) Run(ctx context.Context) error { errCh := make(chan error) + walReadyCh := make(chan struct{}) + + go func() { errCh <- s.wal.Run(ctx, walReadyCh) }() + + // wait for wal to be ready + <-walReadyCh - go func() { errCh <- s.wal.Run(ctx) }() go func() { errCh <- s.readDB.Run(ctx) }() // noop coors handler diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 2a0bfc0..ec145f8 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -1193,8 +1193,14 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule func (s *Scheduler) Run(ctx context.Context) error { errCh := make(chan error) + walReadyCh := make(chan struct{}) + + go func() { errCh <- s.wal.Run(ctx, walReadyCh) }() + + // wait for wal to be ready + <-walReadyCh + - go func() { errCh <- s.wal.Run(ctx) }() go s.readDB.Run(ctx) ch := make(chan *types.ExecutorTask) diff --git a/internal/wal/wal.go b/internal/wal/wal.go index c5fec24..5dc0652 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -1241,8 +1241,7 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf return w, nil } -func (w *WalManager) Run(ctx context.Context) error { - +func (w *WalManager) Run(ctx context.Context, readyCh chan struct{}) error { for { err := w.InitEtcd(ctx) if err == nil { @@ -1252,6 +1251,8 @@ func (w *WalManager) Run(ctx context.Context) error { time.Sleep(1 * time.Second) } + readyCh <- struct{}{} + go w.watcherLoop(ctx) go w.syncLoop(ctx) go w.checkpointLoop(ctx) diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go index 928b910..f3fd166 100644 --- a/internal/wal/wal_test.go +++ b/internal/wal/wal_test.go @@ -89,8 +89,9 @@ func TestEtcdReset(t *testing.T) { EtcdWalsKeepNum: 10, } wal, err := NewWalManager(ctx, logger, walConfig) - go wal.Run(ctx) - time.Sleep(1 * time.Second) + walReadyCh := make(chan struct{}) + go wal.Run(ctx, walReadyCh) + <-walReadyCh actions := []*Action{ { @@ -123,8 +124,8 @@ func TestEtcdReset(t *testing.T) { cancel() ctx = context.Background() - go wal.Run(ctx) - time.Sleep(5 * time.Second) + go wal.Run(ctx, walReadyCh) + <-walReadyCh curObjects := []string{} doneCh := make(chan struct{}) @@ -178,8 +179,9 @@ func TestConcurrentUpdate(t *testing.T) { }, } - go wal.Run(ctx) - time.Sleep(1 * time.Second) + walReadyCh := make(chan struct{}) + go wal.Run(ctx, walReadyCh) + <-walReadyCh cgNames := []string{"changegroup01", "changegroup02"} cgt := wal.GetChangeGroupsUpdateToken(cgNames) @@ -253,8 +255,9 @@ func TestWalCleaner(t *testing.T) { }, } - go wal.Run(ctx) - time.Sleep(1 * time.Second) + walReadyCh := make(chan struct{}) + go wal.Run(ctx, walReadyCh) + <-walReadyCh for i := 0; i < 20; i++ { if _, err := wal.WriteWal(ctx, actions, nil); err != nil {