diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index c00dfac..3acbe80 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -50,8 +50,9 @@ var ( EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping") - EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner") - EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") + EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock") + EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner") + EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") ) func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 7b772db..0052580 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -436,6 +436,22 @@ func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) { } func (s *Runservice) compactChangeGroups(ctx context.Context) error { + session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, common.EtcdCompactChangeGroupsLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer func() { _ = m.Unlock(ctx) }() + resp, err := s.e.Client().Get(ctx, common.EtcdChangeGroupMinRevisionKey) if err != nil { return err