runservice: use etcd mutex TryLock on fetching

When fetching avoid concurrent fetches from multiple runservices using an etcd
mutex TryLock.
This commit is contained in:
Simone Gotti 2019-11-12 18:18:27 +01:00
parent 9fd4b662a8
commit 07cde065c8
2 changed files with 73 additions and 42 deletions

View File

@ -44,10 +44,12 @@ var (
EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping") EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping")
EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock") EtcdLocksDir = path.Join(EtcdSchedulerBaseDir, "locks")
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner") EtcdCompactChangeGroupsLockKey = path.Join(EtcdLocksDir, "compactchangegroups")
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") EtcdCacheCleanerLockKey = path.Join(EtcdLocksDir, "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdLocksDir, "workspacecleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdLocksDir, "taskupdater")
EtcdMaintenanceKey = "maintenance" EtcdMaintenanceKey = "maintenance"
) )
@ -55,6 +57,9 @@ var (
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
func EtcdExecutorKey(taskID string) string { return path.Join(EtcdExecutorsDir, taskID) } func EtcdExecutorKey(taskID string) string { return path.Join(EtcdExecutorsDir, taskID) }
func EtcdTaskKey(taskID string) string { return path.Join(EtcdTasksDir, taskID) } func EtcdTaskKey(taskID string) string { return path.Join(EtcdTasksDir, taskID) }
func EtcdTaskFetcherLockKey(taskID string) string {
return path.Join(EtcdLocksDir, "taskfetcher", taskID)
}
const ( const (
EtcdChangeGroupMinRevisionRange = 100 EtcdChangeGroupMinRevisionRange = 100

View File

@ -1011,11 +1011,12 @@ func (s *Runservice) fetchTaskLogs(ctx context.Context, runID string, rt *types.
if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted { if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted {
if err := s.fetchLog(ctx, rt, true, 0); err != nil { if err := s.fetchLog(ctx, rt, true, 0); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} } else {
if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil { if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
} }
} }
}
// fetch steps logs // fetch steps logs
for i, rts := range rt.Steps { for i, rts := range rt.Steps {
@ -1133,7 +1134,35 @@ func (s *Runservice) fetcher(ctx context.Context) error {
log.Debugf("r: %s", util.Dump(r)) log.Debugf("r: %s", util.Dump(r))
for _, rt := range r.Tasks { for _, rt := range r.Tasks {
log.Debugf("rt: %s", util.Dump(rt)) log.Debugf("rt: %s", util.Dump(rt))
if rt.Status.IsFinished() { if err := s.taskFetcher(ctx, r, rt); err != nil {
return err
}
}
}
return nil
}
func (s *Runservice) taskFetcher(ctx context.Context, r *types.Run, rt *types.RunTask) error {
if !rt.Status.IsFinished() {
return nil
}
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()
m := etcd.NewMutex(session, common.EtcdTaskFetcherLockKey(rt.ID))
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return err
}
defer func() { _ = m.Unlock(ctx) }()
// write related logs runID // write related logs runID
runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID) runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID)
exists, err := s.OSTFileExists(runIDPath) exists, err := s.OSTFileExists(runIDPath)
@ -1167,11 +1196,8 @@ func (s *Runservice) fetcher(ctx context.Context) error {
return err return err
} }
} }
}
}
}
return nil
return nil
} }
func (s *Runservice) runsSchedulerLoop(ctx context.Context) { func (s *Runservice) runsSchedulerLoop(ctx context.Context) {