From 27f84738d64fb2ac29178f228fe35cd8342d6707 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 30 Apr 2019 14:00:34 +0200 Subject: [PATCH] runservice: simplify workspace restore --- .../services/runservice/executor/executor.go | 32 +++++++---------- .../runservice/scheduler/scheduler.go | 35 ++++++++++++------- internal/services/runservice/types/types.go | 15 +++----- 3 files changed, 41 insertions(+), 41 deletions(-) diff --git a/internal/services/runservice/executor/executor.go b/internal/services/runservice/executor/executor.go index b9077ff..358e95c 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/runservice/executor/executor.go @@ -419,26 +419,20 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW } defer logf.Close() - // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. - for level, wl := range t.Workspace { - log.Debugf("unarchiving archives at level %d", level) - for _, archives := range wl { - for _, archive := range archives { - log.Debugf("unarchiving workspace at level %d, taskID: %s, step: %d", level, archive.TaskID, archive.Step) - resp, err := e.runserviceClient.GetArchive(ctx, archive.TaskID, archive.Step) - if err != nil { - // TODO(sgotti) retry before giving up - fmt.Fprintf(logf, "error reading workspace archive: %v\n", err) - return -1, err - } - archivef := resp.Body - if err := e.unarchive(ctx, t, archivef, pod, logf, s.DestDir, false, false); err != nil { - archivef.Close() - return -1, err - } - archivef.Close() - } + for _, op := range t.WorkspaceOperations { + log.Debugf("unarchiving workspace for taskID: %s, step: %d", level, op.TaskID, op.Step) + resp, err := e.runserviceClient.GetArchive(ctx, op.TaskID, op.Step) + if err != nil { + // TODO(sgotti) retry before giving up + fmt.Fprintf(logf, "error reading workspace archive: %v\n", err) + return -1, err } + archivef := resp.Body + if err := e.unarchive(ctx, t, archivef, pod, logf, s.DestDir, false, false); err != nil { + archivef.Close() + return -1, err + } + archivef.Close() } return 0, nil diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 969b488..c46f220 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "path/filepath" + "sort" "time" scommon "github.com/sorintlab/agola/internal/common" @@ -305,6 +306,17 @@ func (s *Scheduler) chooseExecutor(ctx context.Context, rct *types.RunConfigTask return nil, nil } +type parentsByLevelName []*types.RunConfigTask + +func (p parentsByLevelName) Len() int { return len(p) } +func (p parentsByLevelName) Less(i, j int) bool { + if p[i].Level != p[j].Level { + return p[i].Level < p[j].Level + } + return p[i].Name < p[j].Name +} +func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { rct := rc.Tasks[rt.ID] @@ -345,25 +357,24 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types } } - // calculate workspace layers - ws := make(types.Workspace, rct.Level+1) + // calculate workspace operations + // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. + // this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility + wsops := []types.WorkspaceOperation{} rctAllParents := runconfig.GetAllParents(rc.Tasks, rct) - log.Debugf("rctAllParents: %s", util.Dump(rctAllParents)) + + // sort parents by level and name just for reproducibility + sort.Sort(parentsByLevelName(rctAllParents)) + for _, rctParent := range rctAllParents { log.Debugf("rctParent: %s", util.Dump(rctParent)) - log.Debugf("ws: %s", util.Dump(ws)) - archives := []types.WorkspaceArchive{} for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives { - archives = append(archives, types.WorkspaceArchive{TaskID: rctParent.ID, Step: archiveStep}) - } - log.Debugf("archives: %v", util.Dump(archives)) - if len(archives) > 0 { - ws[rctParent.Level] = append(ws[rctParent.Level], archives) + wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep} + wsops = append(wsops, wsop) } } - log.Debugf("ws: %s", util.Dump(ws)) - et.Workspace = ws + et.WorkspaceOperations = wsops return et } diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index b491285..e3ae7c3 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -518,7 +518,7 @@ type ExecutorTask struct { SetupError string `fail_reason:"setup_error,omitempty"` FailError string `fail_reason:"fail_error,omitempty"` - Workspace Workspace `json:"workspace,omitempty"` + WorkspaceOperations []WorkspaceOperation `json:"workspace_operations,omitempty"` // Cache prefix to use when asking for a cache key. To isolate caches between // groups (projects) @@ -556,15 +556,10 @@ type Container struct { Entrypoint string `json:"entrypoint"` } -type Workspace []WorkspaceLevel - -type WorkspaceLevel []WorkspaceArchives - -type WorkspaceArchives []WorkspaceArchive - -type WorkspaceArchive struct { - TaskID string `json:"task_id,omitempty"` - Step int `json:"step,omitempty"` +type WorkspaceOperation struct { + TaskID string `json:"task_id,omitempty"` + Step int `json:"step,omitempty"` + Overwrite bool `json:"overwrite,omitempty"` } func (et *ExecutorTask) UnmarshalJSON(b []byte) error {