From 07cde065c867a4794fa67ec8cecddff25def4428 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 12 Nov 2019 18:18:27 +0100 Subject: [PATCH] runservice: use etcd mutex TryLock on fetching When fetching avoid concurrent fetches from multiple runservices using an etcd mutex TryLock. --- internal/services/runservice/common/common.go | 13 ++- internal/services/runservice/scheduler.go | 102 +++++++++++------- 2 files changed, 73 insertions(+), 42 deletions(-) diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index e5aeb08..67f0f20 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -44,10 +44,12 @@ var ( EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping") - EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock") - EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner") - EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner") - EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") + EtcdLocksDir = path.Join(EtcdSchedulerBaseDir, "locks") + + EtcdCompactChangeGroupsLockKey = path.Join(EtcdLocksDir, "compactchangegroups") + EtcdCacheCleanerLockKey = path.Join(EtcdLocksDir, "cachecleaner") + EtcdWorkspaceCleanerLockKey = path.Join(EtcdLocksDir, "workspacecleaner") + EtcdTaskUpdaterLockKey = path.Join(EtcdLocksDir, "taskupdater") EtcdMaintenanceKey = "maintenance" ) @@ -55,6 +57,9 @@ var ( func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } func EtcdExecutorKey(taskID string) string { return path.Join(EtcdExecutorsDir, taskID) } func EtcdTaskKey(taskID string) string { return path.Join(EtcdTasksDir, taskID) } +func EtcdTaskFetcherLockKey(taskID string) string { + return path.Join(EtcdLocksDir, "taskfetcher", taskID) +} const ( EtcdChangeGroupMinRevisionRange = 100 diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 2dd678f..744bfe9 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -1011,9 +1011,10 @@ func (s *Runservice) fetchTaskLogs(ctx context.Context, runID string, rt *types. if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted { if err := s.fetchLog(ctx, rt, true, 0); err != nil { log.Errorf("err: %+v", err) - } - if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil { - log.Errorf("err: %+v", err) + } else { + if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil { + log.Errorf("err: %+v", err) + } } } @@ -1133,45 +1134,70 @@ func (s *Runservice) fetcher(ctx context.Context) error { log.Debugf("r: %s", util.Dump(r)) for _, rt := range r.Tasks { log.Debugf("rt: %s", util.Dump(rt)) - if rt.Status.IsFinished() { - // write related logs runID - runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID) - exists, err := s.OSTFileExists(runIDPath) - if err != nil { - log.Errorf("err: %+v", err) - } else if !exists { - if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { - log.Errorf("err: %+v", err) - } - } - - // write related archives runID - runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID) - exists, err = s.OSTFileExists(runIDPath) - if err != nil { - log.Errorf("err: %+v", err) - } else if !exists { - if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { - log.Errorf("err: %+v", err) - } - } - - s.fetchTaskLogs(ctx, r.ID, rt) - s.fetchTaskArchives(ctx, r.ID, rt) - - // if the fetching is finished we can remove the executor tasks. We cannot - // remove it before since it contains the reference to the executor where we - // should fetch the data - if rt.LogsFetchFinished() && rt.ArchivesFetchFinished() { - if err := store.DeleteExecutorTask(ctx, s.e, rt.ID); err != nil { - return err - } - } + if err := s.taskFetcher(ctx, r, rt); err != nil { + return err } } } - return nil + return nil +} + +func (s *Runservice) taskFetcher(ctx context.Context, r *types.Run, rt *types.RunTask) error { + if !rt.Status.IsFinished() { + return nil + } + session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := etcd.NewMutex(session, common.EtcdTaskFetcherLockKey(rt.ID)) + + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } + return err + } + defer func() { _ = m.Unlock(ctx) }() + + // write related logs runID + runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID) + exists, err := s.OSTFileExists(runIDPath) + if err != nil { + log.Errorf("err: %+v", err) + } else if !exists { + if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { + log.Errorf("err: %+v", err) + } + } + + // write related archives runID + runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID) + exists, err = s.OSTFileExists(runIDPath) + if err != nil { + log.Errorf("err: %+v", err) + } else if !exists { + if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { + log.Errorf("err: %+v", err) + } + } + + s.fetchTaskLogs(ctx, r.ID, rt) + s.fetchTaskArchives(ctx, r.ID, rt) + + // if the fetching is finished we can remove the executor tasks. We cannot + // remove it before since it contains the reference to the executor where we + // should fetch the data + if rt.LogsFetchFinished() && rt.ArchivesFetchFinished() { + if err := store.DeleteExecutorTask(ctx, s.e, rt.ID); err != nil { + return err + } + } + + return nil } func (s *Runservice) runsSchedulerLoop(ctx context.Context) {