runservice: simplify workspace restore

This commit is contained in:
Simone Gotti 2019-04-30 14:00:34 +02:00
parent 1820e7c477
commit 27f84738d6
3 changed files with 41 additions and 41 deletions

View File

@ -419,26 +419,20 @@ func (e *Executor) doRestoreWorkspaceStep(ctx context.Context, s *types.RestoreW
} }
defer logf.Close() defer logf.Close()
// TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer. for _, op := range t.WorkspaceOperations {
for level, wl := range t.Workspace { log.Debugf("unarchiving workspace for taskID: %s, step: %d", level, op.TaskID, op.Step)
log.Debugf("unarchiving archives at level %d", level) resp, err := e.runserviceClient.GetArchive(ctx, op.TaskID, op.Step)
for _, archives := range wl { if err != nil {
for _, archive := range archives { // TODO(sgotti) retry before giving up
log.Debugf("unarchiving workspace at level %d, taskID: %s, step: %d", level, archive.TaskID, archive.Step) fmt.Fprintf(logf, "error reading workspace archive: %v\n", err)
resp, err := e.runserviceClient.GetArchive(ctx, archive.TaskID, archive.Step) return -1, err
if err != nil {
// TODO(sgotti) retry before giving up
fmt.Fprintf(logf, "error reading workspace archive: %v\n", err)
return -1, err
}
archivef := resp.Body
if err := e.unarchive(ctx, t, archivef, pod, logf, s.DestDir, false, false); err != nil {
archivef.Close()
return -1, err
}
archivef.Close()
}
} }
archivef := resp.Body
if err := e.unarchive(ctx, t, archivef, pod, logf, s.DestDir, false, false); err != nil {
archivef.Close()
return -1, err
}
archivef.Close()
} }
return 0, nil return 0, nil

View File

@ -23,6 +23,7 @@ import (
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"time" "time"
scommon "github.com/sorintlab/agola/internal/common" scommon "github.com/sorintlab/agola/internal/common"
@ -305,6 +306,17 @@ func (s *Scheduler) chooseExecutor(ctx context.Context, rct *types.RunConfigTask
return nil, nil return nil, nil
} }
type parentsByLevelName []*types.RunConfigTask
func (p parentsByLevelName) Len() int { return len(p) }
func (p parentsByLevelName) Less(i, j int) bool {
if p[i].Level != p[j].Level {
return p[i].Level < p[j].Level
}
return p[i].Name < p[j].Name
}
func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask {
rct := rc.Tasks[rt.ID] rct := rc.Tasks[rt.ID]
@ -345,25 +357,24 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types
} }
} }
// calculate workspace layers // calculate workspace operations
ws := make(types.Workspace, rct.Level+1) // TODO(sgotti) right now we don't support duplicated files. So it's not currently possibile to overwrite a file in a upper layer.
// this simplifies the workspaces extractions since they could be extracted in any order. We make them ordered just for reproducibility
wsops := []types.WorkspaceOperation{}
rctAllParents := runconfig.GetAllParents(rc.Tasks, rct) rctAllParents := runconfig.GetAllParents(rc.Tasks, rct)
log.Debugf("rctAllParents: %s", util.Dump(rctAllParents))
// sort parents by level and name just for reproducibility
sort.Sort(parentsByLevelName(rctAllParents))
for _, rctParent := range rctAllParents { for _, rctParent := range rctAllParents {
log.Debugf("rctParent: %s", util.Dump(rctParent)) log.Debugf("rctParent: %s", util.Dump(rctParent))
log.Debugf("ws: %s", util.Dump(ws))
archives := []types.WorkspaceArchive{}
for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives { for _, archiveStep := range r.Tasks[rctParent.ID].WorkspaceArchives {
archives = append(archives, types.WorkspaceArchive{TaskID: rctParent.ID, Step: archiveStep}) wsop := types.WorkspaceOperation{TaskID: rctParent.ID, Step: archiveStep}
} wsops = append(wsops, wsop)
log.Debugf("archives: %v", util.Dump(archives))
if len(archives) > 0 {
ws[rctParent.Level] = append(ws[rctParent.Level], archives)
} }
} }
log.Debugf("ws: %s", util.Dump(ws))
et.Workspace = ws et.WorkspaceOperations = wsops
return et return et
} }

View File

@ -518,7 +518,7 @@ type ExecutorTask struct {
SetupError string `fail_reason:"setup_error,omitempty"` SetupError string `fail_reason:"setup_error,omitempty"`
FailError string `fail_reason:"fail_error,omitempty"` FailError string `fail_reason:"fail_error,omitempty"`
Workspace Workspace `json:"workspace,omitempty"` WorkspaceOperations []WorkspaceOperation `json:"workspace_operations,omitempty"`
// Cache prefix to use when asking for a cache key. To isolate caches between // Cache prefix to use when asking for a cache key. To isolate caches between
// groups (projects) // groups (projects)
@ -556,15 +556,10 @@ type Container struct {
Entrypoint string `json:"entrypoint"` Entrypoint string `json:"entrypoint"`
} }
type Workspace []WorkspaceLevel type WorkspaceOperation struct {
TaskID string `json:"task_id,omitempty"`
type WorkspaceLevel []WorkspaceArchives Step int `json:"step,omitempty"`
Overwrite bool `json:"overwrite,omitempty"`
type WorkspaceArchives []WorkspaceArchive
type WorkspaceArchive struct {
TaskID string `json:"task_id,omitempty"`
Step int `json:"step,omitempty"`
} }
func (et *ExecutorTask) UnmarshalJSON(b []byte) error { func (et *ExecutorTask) UnmarshalJSON(b []byte) error {