runservice: add lock around compatchangegroups

just to avoid concurrency errors when multiple instances are running
This commit is contained in:
Simone Gotti 2019-07-10 10:20:35 +02:00
parent c064e490b2
commit 940264e413
2 changed files with 19 additions and 2 deletions

View File

@ -50,8 +50,9 @@ var (
EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping")
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")
EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock")
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")
)
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }

View File

@ -436,6 +436,22 @@ func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) {
}
func (s *Runservice) compactChangeGroups(ctx context.Context) error {
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()
m := concurrency.NewMutex(session, common.EtcdCompactChangeGroupsLockKey)
// TODO(sgotti) find a way to use a trylock so we'll just return if already
// locked. Currently multiple task updaters will enqueue and start when another
// finishes (unuseful and consume resources)
if err := m.Lock(ctx); err != nil {
return err
}
defer func() { _ = m.Unlock(ctx) }()
resp, err := s.e.Client().Get(ctx, common.EtcdChangeGroupMinRevisionKey)
if err != nil {
return err