Merge pull request #177 from sgotti/runservice_use_trylock_on_fetching
runservice: use etcd mutex TryLock on fetching
This commit is contained in:
commit
eeacc296d0
|
@ -44,10 +44,12 @@ var (
|
||||||
|
|
||||||
EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping")
|
EtcdPingKey = path.Join(EtcdSchedulerBaseDir, "ping")
|
||||||
|
|
||||||
EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock")
|
EtcdLocksDir = path.Join(EtcdSchedulerBaseDir, "locks")
|
||||||
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
|
|
||||||
EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner")
|
EtcdCompactChangeGroupsLockKey = path.Join(EtcdLocksDir, "compactchangegroups")
|
||||||
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")
|
EtcdCacheCleanerLockKey = path.Join(EtcdLocksDir, "cachecleaner")
|
||||||
|
EtcdWorkspaceCleanerLockKey = path.Join(EtcdLocksDir, "workspacecleaner")
|
||||||
|
EtcdTaskUpdaterLockKey = path.Join(EtcdLocksDir, "taskupdater")
|
||||||
|
|
||||||
EtcdMaintenanceKey = "maintenance"
|
EtcdMaintenanceKey = "maintenance"
|
||||||
)
|
)
|
||||||
|
@ -55,6 +57,9 @@ var (
|
||||||
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
|
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
|
||||||
func EtcdExecutorKey(taskID string) string { return path.Join(EtcdExecutorsDir, taskID) }
|
func EtcdExecutorKey(taskID string) string { return path.Join(EtcdExecutorsDir, taskID) }
|
||||||
func EtcdTaskKey(taskID string) string { return path.Join(EtcdTasksDir, taskID) }
|
func EtcdTaskKey(taskID string) string { return path.Join(EtcdTasksDir, taskID) }
|
||||||
|
func EtcdTaskFetcherLockKey(taskID string) string {
|
||||||
|
return path.Join(EtcdLocksDir, "taskfetcher", taskID)
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EtcdChangeGroupMinRevisionRange = 100
|
EtcdChangeGroupMinRevisionRange = 100
|
||||||
|
|
|
@ -1011,9 +1011,10 @@ func (s *Runservice) fetchTaskLogs(ctx context.Context, runID string, rt *types.
|
||||||
if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted {
|
if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted {
|
||||||
if err := s.fetchLog(ctx, rt, true, 0); err != nil {
|
if err := s.fetchLog(ctx, rt, true, 0); err != nil {
|
||||||
log.Errorf("err: %+v", err)
|
log.Errorf("err: %+v", err)
|
||||||
}
|
} else {
|
||||||
if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil {
|
if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil {
|
||||||
log.Errorf("err: %+v", err)
|
log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1133,45 +1134,70 @@ func (s *Runservice) fetcher(ctx context.Context) error {
|
||||||
log.Debugf("r: %s", util.Dump(r))
|
log.Debugf("r: %s", util.Dump(r))
|
||||||
for _, rt := range r.Tasks {
|
for _, rt := range r.Tasks {
|
||||||
log.Debugf("rt: %s", util.Dump(rt))
|
log.Debugf("rt: %s", util.Dump(rt))
|
||||||
if rt.Status.IsFinished() {
|
if err := s.taskFetcher(ctx, r, rt); err != nil {
|
||||||
// write related logs runID
|
return err
|
||||||
runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID)
|
|
||||||
exists, err := s.OSTFileExists(runIDPath)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("err: %+v", err)
|
|
||||||
} else if !exists {
|
|
||||||
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
|
||||||
log.Errorf("err: %+v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// write related archives runID
|
|
||||||
runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID)
|
|
||||||
exists, err = s.OSTFileExists(runIDPath)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("err: %+v", err)
|
|
||||||
} else if !exists {
|
|
||||||
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
|
||||||
log.Errorf("err: %+v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
s.fetchTaskLogs(ctx, r.ID, rt)
|
|
||||||
s.fetchTaskArchives(ctx, r.ID, rt)
|
|
||||||
|
|
||||||
// if the fetching is finished we can remove the executor tasks. We cannot
|
|
||||||
// remove it before since it contains the reference to the executor where we
|
|
||||||
// should fetch the data
|
|
||||||
if rt.LogsFetchFinished() && rt.ArchivesFetchFinished() {
|
|
||||||
if err := store.DeleteExecutorTask(ctx, s.e, rt.ID); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Runservice) taskFetcher(ctx context.Context, r *types.Run, rt *types.RunTask) error {
|
||||||
|
if !rt.Status.IsFinished() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
m := etcd.NewMutex(session, common.EtcdTaskFetcherLockKey(rt.ID))
|
||||||
|
|
||||||
|
if err := m.TryLock(ctx); err != nil {
|
||||||
|
if errors.Is(err, etcd.ErrLocked) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() { _ = m.Unlock(ctx) }()
|
||||||
|
|
||||||
|
// write related logs runID
|
||||||
|
runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID)
|
||||||
|
exists, err := s.OSTFileExists(runIDPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("err: %+v", err)
|
||||||
|
} else if !exists {
|
||||||
|
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
||||||
|
log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// write related archives runID
|
||||||
|
runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID)
|
||||||
|
exists, err = s.OSTFileExists(runIDPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("err: %+v", err)
|
||||||
|
} else if !exists {
|
||||||
|
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
||||||
|
log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
s.fetchTaskLogs(ctx, r.ID, rt)
|
||||||
|
s.fetchTaskArchives(ctx, r.ID, rt)
|
||||||
|
|
||||||
|
// if the fetching is finished we can remove the executor tasks. We cannot
|
||||||
|
// remove it before since it contains the reference to the executor where we
|
||||||
|
// should fetch the data
|
||||||
|
if rt.LogsFetchFinished() && rt.ArchivesFetchFinished() {
|
||||||
|
if err := store.DeleteExecutorTask(ctx, s.e, rt.ID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Runservice) runsSchedulerLoop(ctx context.Context) {
|
func (s *Runservice) runsSchedulerLoop(ctx context.Context) {
|
||||||
|
|
Loading…
Reference in New Issue