diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/scheduler/common/common.go index 5ff069e..01fca49 100644 --- a/internal/services/runservice/scheduler/common/common.go +++ b/internal/services/runservice/scheduler/common/common.go @@ -36,18 +36,23 @@ func (e ErrNotExist) Error() string { } var ( - EtcdRunsDir = "runs" - EtcdRunSequenceKey = "runsequence" - EtcdRunEventKey = "runevents" - EtcdRunEventSequenceKey = "runeventsequence" + EtcdSchedulerBaseDir = "scheduler" - EtcdChangeGroupsDir = "changegroups" - EtcdChangeGroupMinRevisionKey = "changegroupsminrev" + EtcdRunsDir = path.Join(EtcdSchedulerBaseDir, "runs") + EtcdRunSequenceKey = path.Join(EtcdSchedulerBaseDir, "runsequence") + EtcdRunEventKey = path.Join(EtcdSchedulerBaseDir, "runevents") + EtcdRunEventSequenceKey = path.Join(EtcdSchedulerBaseDir, "runeventsequence") - EtcdExecutorsDir = "executors" - EtcdTasksDir = "tasks" + EtcdChangeGroupsDir = path.Join(EtcdSchedulerBaseDir, "changegroups") + EtcdChangeGroupMinRevisionKey = path.Join(EtcdSchedulerBaseDir, "changegroupsminrev") - EtcdPingKey = "ping" + EtcdExecutorsDir = path.Join(EtcdSchedulerBaseDir, "executors") + EtcdTasksDir = path.Join(EtcdSchedulerBaseDir, "tasks") + + EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping") + + EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner") + EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") ) func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/scheduler/readdb/readdb.go index dbd8f59..87db08d 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/scheduler/readdb/readdb.go @@ -377,7 +377,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { wctx, cancel := context.WithCancel(ctx) defer cancel() wctx = etcdclientv3.WithRequireLeader(wctx) - wch := r.e.Watch(wctx, "", revision+1) + wch := r.e.Watch(wctx, common.EtcdSchedulerBaseDir+"/", revision+1) for wresp := range wch { if wresp.Canceled { err = wresp.Err() diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index af39ef4..7b38b5b 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -54,9 +54,6 @@ import ( const ( cacheCleanerInterval = 1 * 24 * time.Hour - - cacheCleanerLock = "locks/cachecleaner" - taskUpdaterLock = "locks/taskupdater" ) var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) @@ -835,7 +832,7 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, taskUpdaterLock) + m := concurrency.NewMutex(session, common.EtcdTaskUpdaterLockKey) // TODO(sgotti) find a way to use a trylock so we'll just return if already // locked. Currently multiple task updaters will enqueue and start when another @@ -1504,7 +1501,7 @@ func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.D } defer session.Close() - m := concurrency.NewMutex(session, cacheCleanerLock) + m := concurrency.NewMutex(session, common.EtcdCacheCleanerLockKey) // TODO(sgotti) find a way to use a trylock so we'll just return if already // locked. Currently multiple cachecleaners will enqueue and start when another