From 8f4a5b29b900a170851147bc27e725897b0e8040 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 13 Mar 2019 15:48:35 +0100 Subject: [PATCH] *: implement setup step --- internal/services/gateway/api/run.go | 67 ++++++--- internal/services/runservice/executor/api.go | 54 ++++--- .../runservice/executor/driver/docker.go | 4 +- .../runservice/executor/driver/docker_test.go | 8 +- .../runservice/executor/driver/driver.go | 2 +- .../services/runservice/executor/executor.go | 141 +++++++++++------- .../services/runservice/scheduler/api/api.go | 47 ++++-- .../runservice/scheduler/api/client.go | 8 +- .../runservice/scheduler/command/command.go | 4 + .../runservice/scheduler/scheduler.go | 67 +++++++-- .../runservice/scheduler/store/store.go | 12 +- internal/services/runservice/types/types.go | 6 +- 12 files changed, 290 insertions(+), 130 deletions(-) diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index ecfc301..c2ce71e 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -76,7 +76,16 @@ type RunTaskResponse struct { Name string `json:"name"` Status rstypes.RunTaskStatus `json:"status"` - Steps []*RunTaskResponseStep `json:"steps"` + SetupStep *RunTaskResponseSetupStep `json:"setup_step"` + Steps []*RunTaskResponseStep `json:"steps"` + + StartTime *time.Time `json:"start_time"` + EndTime *time.Time `json:"end_time"` +} + +type RunTaskResponseSetupStep struct { + Phase rstypes.ExecutorTaskPhase `json:"phase"` + Name string `json:"name"` StartTime *time.Time `json:"start_time"` EndTime *time.Time `json:"end_time"` @@ -142,13 +151,20 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *Run EndTime: rt.EndTime, } + t.SetupStep = &RunTaskResponseSetupStep{ + Name: "Task setup", + Phase: rt.SetupStep.Phase, + StartTime: rt.SetupStep.StartTime, + EndTime: rt.SetupStep.EndTime, + } + for i := 0; i < len(t.Steps); i++ { s := &RunTaskResponseStep{ + Phase: rt.Steps[i].Phase, StartTime: rt.Steps[i].StartTime, EndTime: rt.Steps[i].EndTime, } rcts := rct.Steps[i] - s.Phase = rt.Steps[i].Phase switch rcts := rcts.(type) { case *rstypes.RunStep: s.Name = rcts.Name @@ -279,12 +295,13 @@ func NewRunsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *RunsHan func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - query := r.URL.Query() - phaseFilter := query["phase"] - groups := query["group"] - changeGroups := query["changegroup"] + q := r.URL.Query() - limitS := query.Get("limit") + phaseFilter := q["phase"] + groups := q["group"] + changeGroups := q["changegroup"] + + limitS := q.Get("limit") limit := DefaultRunsLimit if limitS != "" { var err error @@ -302,11 +319,11 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { limit = MaxRunsLimit } asc := false - if _, ok := query["asc"]; ok { + if _, ok := q["asc"]; ok { asc = true } - start := query.Get("start") + start := q.Get("start") runsResp, resp, err := h.runserviceClient.GetRuns(ctx, phaseFilter, groups, changeGroups, start, limit, asc) if err != nil { @@ -413,41 +430,53 @@ func NewLogsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *LogsHan func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // TODO(sgotti) Check authorized call from client + q := r.URL.Query() - runID := r.URL.Query().Get("runID") + runID := q.Get("runID") if runID == "" { http.Error(w, "", http.StatusBadRequest) return } - taskID := r.URL.Query().Get("taskID") + taskID := q.Get("taskID") if taskID == "" { http.Error(w, "", http.StatusBadRequest) return } - s := r.URL.Query().Get("step") - if s == "" { + + _, setup := q["setup"] + stepStr := q.Get("step") + if !setup && stepStr == "" { http.Error(w, "", http.StatusBadRequest) return } - step, err := strconv.Atoi(s) - if err != nil { + if setup && stepStr != "" { http.Error(w, "", http.StatusBadRequest) return } + + var step int + if stepStr != "" { + var err error + step, err = strconv.Atoi(stepStr) + if err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + } + follow := false - if _, ok := r.URL.Query()["follow"]; ok { + if _, ok := q["follow"]; ok { follow = true } stream := false - if _, ok := r.URL.Query()["stream"]; ok { + if _, ok := q["stream"]; ok { stream = true } if follow { stream = true } - resp, err := h.runserviceClient.GetLogs(ctx, runID, taskID, step, follow, stream) + resp, err := h.runserviceClient.GetLogs(ctx, runID, taskID, setup, step, follow, stream) if err != nil { if resp != nil && resp.StatusCode == http.StatusNotFound { http.Error(w, err.Error(), http.StatusNotFound) diff --git a/internal/services/runservice/executor/api.go b/internal/services/runservice/executor/api.go index c60a603..8162755 100644 --- a/internal/services/runservice/executor/api.go +++ b/internal/services/runservice/executor/api.go @@ -61,44 +61,57 @@ func NewLogsHandler(logger *zap.Logger, e *Executor) *logsHandler { } func (h *logsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // TODO(sgotti) Check authorized call from scheduler + q := r.URL.Query() - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - - taskID := r.URL.Query().Get("taskid") + taskID := q.Get("taskid") if taskID == "" { http.Error(w, "", http.StatusBadRequest) return } - s := r.URL.Query().Get("step") - if s == "" { + + _, setup := q["setup"] + stepStr := q.Get("step") + if !setup && stepStr == "" { http.Error(w, "", http.StatusBadRequest) return } - step, err := strconv.Atoi(s) - if err != nil { + if setup && stepStr != "" { http.Error(w, "", http.StatusBadRequest) return } + + var step int + if stepStr != "" { + var err error + step, err = strconv.Atoi(stepStr) + if err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + } + follow := false - _, ok := r.URL.Query()["follow"] + _, ok := q["follow"] if ok { follow = true } - if err := h.readTaskLogs(taskID, step, w, follow); err != nil { + if err := h.readTaskLogs(taskID, setup, step, w, follow); err != nil { h.log.Errorf("err: %+v", err) } } -func (h *logsHandler) readTaskLogs(taskID string, step int, w http.ResponseWriter, follow bool) error { - logPath := h.e.logPath(taskID, step) - return h.readLogs(taskID, step, logPath, w, follow) +func (h *logsHandler) readTaskLogs(taskID string, setup bool, step int, w http.ResponseWriter, follow bool) error { + var logPath string + if setup { + logPath = h.e.setupLogPath(taskID) + } else { + logPath = h.e.stepLogPath(taskID, step) + } + return h.readLogs(taskID, setup, step, logPath, w, follow) } -func (h *logsHandler) readLogs(taskID string, step int, logPath string, w http.ResponseWriter, follow bool) error { +func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath string, w http.ResponseWriter, follow bool) error { f, err := os.Open(logPath) if err != nil { if os.IsNotExist(err) { @@ -135,7 +148,7 @@ func (h *logsHandler) readLogs(taskID string, step int, logPath string, w http.R if _, err := f.Seek(-int64(len(data)), io.SeekCurrent); err != nil { return errors.Wrapf(err, "failed to seek in log file %q", logPath) } - // check if the step is finished, is so flush until EOF and stop + // check if the step is finished, if so flush until EOF and stop rt, ok := h.e.runningTasks.get(taskID) if !ok { flushstop = true @@ -171,14 +184,15 @@ func NewArchivesHandler(e *Executor) *archivesHandler { } func (h *archivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - // TODO(sgotti) Check authorized call from scheduler - taskID := r.URL.Query().Get("taskid") + q := r.URL.Query() + + taskID := q.Get("taskid") if taskID == "" { http.Error(w, "", http.StatusBadRequest) return } - s := r.URL.Query().Get("step") + s := q.Get("step") if s == "" { http.Error(w, "", http.StatusBadRequest) return diff --git a/internal/services/runservice/executor/driver/docker.go b/internal/services/runservice/executor/driver/docker.go index e6da9cd..bffe2d3 100644 --- a/internal/services/runservice/executor/driver/docker.go +++ b/internal/services/runservice/executor/driver/docker.go @@ -109,7 +109,7 @@ func (d *DockerDriver) CopyToolbox(ctx context.Context, toolboxPath string) erro return nil } -func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig) (Pod, error) { +func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) { if len(podConfig.Containers) == 0 { return nil, errors.Errorf("empty container config") } @@ -122,7 +122,7 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig) (Pod, e if err != nil { return nil, err } - io.Copy(os.Stdout, reader) + io.Copy(out, reader) podID := uuid.NewV4().String() diff --git a/internal/services/runservice/executor/driver/docker_test.go b/internal/services/runservice/executor/driver/docker_test.go index 77518cb..f1402a2 100644 --- a/internal/services/runservice/executor/driver/docker_test.go +++ b/internal/services/runservice/executor/driver/docker_test.go @@ -99,7 +99,7 @@ func TestPod(t *testing.T) { }, }, InitVolumeDir: "/tmp/agola", - }) + }, ioutil.Discard) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -116,7 +116,7 @@ func TestPod(t *testing.T) { }, }, InitVolumeDir: "/tmp/agola", - }) + }, ioutil.Discard) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -155,7 +155,7 @@ func TestPod(t *testing.T) { }, }, InitVolumeDir: "/tmp/agola", - }) + }, ioutil.Discard) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -206,7 +206,7 @@ func TestPod(t *testing.T) { }, }, InitVolumeDir: "/tmp/agola", - }) + }, ioutil.Discard) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/services/runservice/executor/driver/driver.go b/internal/services/runservice/executor/driver/driver.go index 92d2223..05c8545 100644 --- a/internal/services/runservice/executor/driver/driver.go +++ b/internal/services/runservice/executor/driver/driver.go @@ -38,7 +38,7 @@ const ( // * Kubernetes pods // * A Virtual Machine on which we execute multiple processes type Driver interface { - NewPod(ctx context.Context, podConfig *PodConfig) (Pod, error) + NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) GetPodsByLabels(ctx context.Context, labels map[string]string, all bool) ([]Pod, error) GetPodByID(ctx context.Context, containerID string) (Pod, error) } diff --git a/internal/services/runservice/executor/executor.go b/internal/services/runservice/executor/executor.go index 7a2341a..8bbdb03 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/runservice/executor/executor.go @@ -340,27 +340,22 @@ func (e *Executor) taskPath(taskID string) string { return filepath.Join(e.tasksDir(), taskID) } -func (e *Executor) logPath(taskID string, stepID int) string { - return filepath.Join(e.taskPath(taskID), "logs", fmt.Sprintf("%d.log", stepID)) +func (e *Executor) taskLogsPath(taskID string) string { + return filepath.Join(e.tasksDir(), taskID, "logs") +} + +func (e *Executor) setupLogPath(taskID string) string { + return filepath.Join(e.taskLogsPath(taskID), "setup.log") +} + +func (e *Executor) stepLogPath(taskID string, stepID int) string { + return filepath.Join(e.taskLogsPath(taskID), "steps", fmt.Sprintf("%d.log", stepID)) } func (e *Executor) archivePath(taskID string, stepID int) string { return filepath.Join(e.taskPath(taskID), "archives", fmt.Sprintf("%d.tar", stepID)) } -func mkdirAllAndReplace(path string, perm os.FileMode) error { - // if the dir already exists rename it. - _, err := os.Stat(path) - if err != nil && !os.IsNotExist(err) { - return err - } - if os.IsNotExist(err) { - return os.MkdirAll(path, perm) - } - // TODO(sgotti) UnixNano should be enough but doesn't totally avoids name collisions. - return os.Rename(path, fmt.Sprintf("%s.%d", path, time.Now().UnixNano())) -} - func (e *Executor) sendExecutorStatus(ctx context.Context) error { executor := &types.Executor{ ID: e.id, @@ -429,51 +424,35 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) { defer e.runningTasks.delete(et.ID) - rt.et.Status.Phase = types.ExecutorTaskPhaseRunning - rt.et.Status.StartTime = util.TimePtr(time.Now()) - + et.Status.Phase = types.ExecutorTaskPhaseRunning + et.Status.StartTime = util.TimePtr(time.Now()) + et.Status.SetupStep.Phase = types.ExecutorTaskPhaseRunning + et.Status.SetupStep.StartTime = util.TimePtr(time.Now()) if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) + } + + if err := e.setupTask(ctx, rt); err != nil { + rt.et.Status.Phase = types.ExecutorTaskPhaseFailed + et.Status.SetupStep.EndTime = util.TimePtr(time.Now()) + et.Status.SetupStep.Phase = types.ExecutorTaskPhaseFailed + et.Status.SetupStep.EndTime = util.TimePtr(time.Now()) + if err := e.sendExecutorTaskStatus(ctx, et); err != nil { + log.Errorf("err: %+v", err) + } rt.Unlock() return } - cmd := []string{toolboxContainerPath, "sleeper"} - if et.Containers[0].Entrypoint != "" { - cmd = strings.Split(et.Containers[0].Entrypoint, " ") - log.Infof("cmd: %v", cmd) - } - - log.Debugf("starting pod") - podConfig := &driver.PodConfig{ - Labels: createTaskLabels(et.ID), - InitVolumeDir: toolboxContainerDir, - Containers: []*driver.ContainerConfig{ - { - Image: et.Containers[0].Image, - Cmd: cmd, - Env: et.Containers[0].Environment, - WorkingDir: et.WorkingDir, - User: et.Containers[0].User, - Privileged: et.Containers[0].Privileged, - }, - }, - } - pod, err := e.driver.NewPod(ctx, podConfig) - if err != nil { + et.Status.SetupStep.Phase = types.ExecutorTaskPhaseSuccess + et.Status.SetupStep.EndTime = util.TimePtr(time.Now()) + if err := e.sendExecutorTaskStatus(ctx, et); err != nil { log.Errorf("err: %+v", err) - rt.Unlock() - return } - rt.pod = pod - // ignore pod stop errors - defer pod.Stop(ctx) - - log.Debugf("started pod") rt.Unlock() - _, err = e.executeTaskInternal(ctx, et, pod) + _, err := e.executeTaskInternal(ctx, et, rt.pod) rt.Lock() if err != nil { @@ -491,13 +470,63 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) { rt.Unlock() } +func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error { + et := rt.et + if err := os.RemoveAll(e.taskPath(et.ID)); err != nil { + return err + } + if err := os.MkdirAll(e.taskPath(et.ID), 0770); err != nil { + return err + } + + cmd := []string{toolboxContainerPath, "sleeper"} + if et.Containers[0].Entrypoint != "" { + cmd = strings.Split(et.Containers[0].Entrypoint, " ") + log.Infof("cmd: %v", cmd) + } + + log.Debugf("starting pod") + + podConfig := &driver.PodConfig{ + Labels: createTaskLabels(et.ID), + InitVolumeDir: toolboxContainerDir, + Containers: []*driver.ContainerConfig{ + { + Image: et.Containers[0].Image, + Cmd: cmd, + Env: et.Containers[0].Environment, + WorkingDir: et.WorkingDir, + User: et.Containers[0].User, + Privileged: et.Containers[0].Privileged, + }, + }, + } + + setupLogPath := e.setupLogPath(et.ID) + if err := os.MkdirAll(filepath.Dir(setupLogPath), 0770); err != nil { + return err + } + outf, err := os.Create(setupLogPath) + if err != nil { + return err + } + defer outf.Close() + + outf.WriteString("Starting pod.\n") + pod, err := e.driver.NewPod(ctx, podConfig, outf) + if err != nil { + outf.WriteString("Pod failed to start.\n") + return err + } + outf.WriteString("Pod started.\n") + + rt.pod = pod + return nil +} + func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTask, pod driver.Pod) (int, error) { log.Debugf("task: %s", et.TaskName) - if err := mkdirAllAndReplace(e.taskPath(et.ID), 0770); err != nil { - return 0, err - } - for i, step := range et.Steps { //log.Debugf("step: %v", util.Dump(step)) @@ -522,18 +551,18 @@ func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTa case *types.RunStep: log.Debugf("run step: %s", util.Dump(s)) stepName = s.Name - exitCode, err = e.doRunStep(ctx, s, et, pod, e.logPath(et.ID, i)) + exitCode, err = e.doRunStep(ctx, s, et, pod, e.stepLogPath(et.ID, i)) case *types.SaveToWorkspaceStep: log.Debugf("save to workspace step: %s", util.Dump(s)) stepName = s.Name archivePath := e.archivePath(et.ID, i) - exitCode, err = e.doSaveToWorkspaceStep(ctx, s, et, pod, e.logPath(et.ID, i), archivePath) + exitCode, err = e.doSaveToWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i), archivePath) case *types.RestoreWorkspaceStep: log.Debugf("restore workspace step: %s", util.Dump(s)) stepName = s.Name - exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.logPath(et.ID, i)) + exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i)) default: return i, errors.Errorf("unknown step type: %s", util.Dump(s)) diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 908435d..bce631c 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -58,40 +58,53 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() // TODO(sgotti) Check authorized call from client + q := r.URL.Query() - runID := r.URL.Query().Get("runid") + runID := q.Get("runid") if runID == "" { http.Error(w, "", http.StatusBadRequest) return } - taskID := r.URL.Query().Get("taskid") + taskID := q.Get("taskid") if taskID == "" { http.Error(w, "", http.StatusBadRequest) return } - s := r.URL.Query().Get("step") - if s == "" { + + _, setup := q["setup"] + stepStr := q.Get("step") + if !setup && stepStr == "" { http.Error(w, "", http.StatusBadRequest) return } - step, err := strconv.Atoi(s) - if err != nil { + if setup && stepStr != "" { http.Error(w, "", http.StatusBadRequest) return } + + var step int + if stepStr != "" { + var err error + step, err = strconv.Atoi(stepStr) + if err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + } + follow := false - if _, ok := r.URL.Query()["follow"]; ok { + if _, ok := q["follow"]; ok { follow = true } stream := false - if _, ok := r.URL.Query()["stream"]; ok { + if _, ok := q["stream"]; ok { stream = true } if follow { stream = true } - if err, sendError := h.readTaskLogs(ctx, runID, taskID, step, w, follow, stream); err != nil { + if err, sendError := h.readTaskLogs(ctx, runID, taskID, setup, step, w, follow, stream); err != nil { h.log.Errorf("err: %+v", err) if sendError { switch err.(type) { @@ -104,7 +117,7 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, step int, w http.ResponseWriter, follow, stream bool) (error, bool) { +func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) { r, err := store.GetRunEtcdOrLTS(ctx, h.e, h.wal, runID) if err != nil { return err, true @@ -123,7 +136,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, st // if the log has been already fetched use it, otherwise fetch it from the executor if task.Steps[step].LogPhase == types.RunTaskFetchPhaseFinished { - logPath := store.LTSRunLogPath(task.ID, step) + var logPath string + if setup { + logPath = store.LTSRunTaskSetupLogPath(task.ID) + } else { + logPath = store.LTSRunTaskStepLogPath(task.ID, step) + } f, err := h.lts.ReadObject(logPath) if err != nil { if err == objectstorage.ErrNotExist { @@ -147,7 +165,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, st return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true } - url := fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&step=%d", executor.ListenURL, taskID, step) + var url string + if setup { + url = fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&setup", executor.ListenURL, taskID) + } else { + url = fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&step=%d", executor.ListenURL, taskID, step) + } if follow { url += "&follow" } diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/scheduler/api/client.go index dc05e63..0d0b995 100644 --- a/internal/services/runservice/scheduler/api/client.go +++ b/internal/services/runservice/scheduler/api/client.go @@ -236,11 +236,15 @@ func (c *Client) GetRun(ctx context.Context, runID string) (*RunResponse, *http. return runResponse, resp, err } -func (c *Client) GetLogs(ctx context.Context, runID, taskID string, step int, follow, stream bool) (*http.Response, error) { +func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, step int, follow, stream bool) (*http.Response, error) { q := url.Values{} q.Add("runid", runID) q.Add("taskid", taskID) - q.Add("step", strconv.Itoa(step)) + if setup { + q.Add("setup", "") + } else { + q.Add("step", strconv.Itoa(step)) + } if follow { q.Add("follow", "") } diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index 54bb9f8..5512612 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -328,6 +328,10 @@ func (s *CommandHandler) genRunTask(ctx context.Context, rct *types.RunConfigTas if rt.Skip { rt.Status = types.RunTaskStatusSkipped } + rt.SetupStep = types.RunTaskStep{ + Phase: types.ExecutorTaskPhaseNotStarted, + LogPhase: types.RunTaskFetchPhaseNotStarted, + } for i := range rt.Steps { s := &types.RunTaskStep{ Phase: types.ExecutorTaskPhaseNotStarted, diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 13bc148..2a0bfc0 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -381,11 +381,12 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error { } // if the run is finished AND there're no executor tasks scheduled we can mark - // all not started runtasks' fetch phases (logs and archives) as finished + // all not started runtasks' fetch phases (setup step, logs and archives) as finished if r.Phase.IsFinished() { for _, rt := range r.RunTasks { log.Debugf("rt: %s", util.Dump(rt)) if rt.Status == types.RunTaskStatusNotStarted { + rt.SetupStep.LogPhase = types.RunTaskFetchPhaseFinished for _, s := range rt.Steps { s.LogPhase = types.RunTaskFetchPhaseFinished } @@ -483,6 +484,10 @@ func (s *Scheduler) updateRunStatus(ctx context.Context, et *types.ExecutorTask) rt.Status = types.RunTaskStatusFailed } + rt.SetupStep.Phase = et.Status.SetupStep.Phase + rt.SetupStep.StartTime = et.Status.SetupStep.StartTime + rt.SetupStep.EndTime = et.Status.SetupStep.EndTime + for i, s := range et.Status.Steps { rt.Steps[i].Phase = s.Phase rt.Steps[i].StartTime = s.StartTime @@ -676,7 +681,7 @@ func (s *Scheduler) fileExists(path string) (bool, error) { return !os.IsNotExist(err), nil } -func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int) error { +func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { et, err := store.GetExecutorTask(ctx, s.e, rt.ID) if err != nil && err != etcd.ErrKeyNotFound { return err @@ -696,8 +701,13 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int return nil } - path := store.LTSRunLogPath(rt.ID, stepnum) - ok, err := s.fileExists(path) + var logPath string + if setup { + logPath = store.LTSRunTaskSetupLogPath(rt.ID) + } else { + logPath = store.LTSRunTaskStepLogPath(rt.ID, stepnum) + } + ok, err := s.fileExists(logPath) if err != nil { return err } @@ -705,8 +715,12 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int return nil } - u := fmt.Sprintf(executor.ListenURL+"/api/v1alpha/executor/logs?taskid=%s&step=%d", rt.ID, stepnum) - log.Debugf("fetchLog: %s", u) + var u string + if setup { + u = fmt.Sprintf(executor.ListenURL+"/api/v1alpha/executor/logs?taskid=%s&setup", rt.ID) + } else { + u = fmt.Sprintf(executor.ListenURL+"/api/v1alpha/executor/logs?taskid=%s&step=%d", rt.ID, stepnum) + } r, err := http.Get(u) if err != nil { return err @@ -721,10 +735,27 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int return errors.Errorf("received http status: %d", r.StatusCode) } - return s.lts.WriteObject(path, r.Body) + return s.lts.WriteObject(logPath, r.Body) } -func (s *Scheduler) finishLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { +func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { + r, _, err := store.GetRun(ctx, s.e, runID) + if err != nil { + return err + } + rt, ok := r.RunTasks[runTaskID] + if !ok { + return errors.Errorf("no such task with ID %s in run %s", runTaskID, runID) + } + + rt.SetupStep.LogPhase = types.RunTaskFetchPhaseFinished + if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil { + return err + } + return nil +} + +func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { r, _, err := store.GetRun(ctx, s.e, runID) if err != nil { return err @@ -776,14 +807,26 @@ func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID str func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) { log.Debugf("fetchTaskLogs") + + // fetch setup log + if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted { + if err := s.fetchLog(ctx, rt, true, 0); err != nil { + log.Errorf("err: %+v", err) + } + if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil { + log.Errorf("err: %+v", err) + } + } + + // fetch steps logs for i, rts := range rt.Steps { lp := rts.LogPhase if lp == types.RunTaskFetchPhaseNotStarted { - if err := s.fetchLog(ctx, rt, i); err != nil { + if err := s.fetchLog(ctx, rt, false, i); err != nil { log.Errorf("err: %+v", err) continue } - if err := s.finishLogPhase(ctx, runID, rt.ID, i); err != nil { + if err := s.finishStepLogPhase(ctx, runID, rt.ID, i); err != nil { log.Errorf("err: %+v", err) continue } @@ -988,6 +1031,10 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error done := true for _, rt := range r.RunTasks { // check all logs are fetched + if rt.SetupStep.LogPhase != types.RunTaskFetchPhaseFinished { + done = false + break + } for _, rts := range rt.Steps { lp := rts.LogPhase if lp != types.RunTaskFetchPhaseFinished { diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index 2e91c71..c608510 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -129,8 +129,16 @@ func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa return action, nil } -func LTSRunLogPath(rtID string, step int) string { - return path.Join("logs", fmt.Sprintf("%s/%d.log", rtID, step)) +func LTSRunTaskLogsDir(rtID string) string { + return path.Join("logs", rtID) +} + +func LTSRunTaskSetupLogPath(rtID string) string { + return path.Join(LTSRunTaskLogsDir(rtID), "setup.log") +} + +func LTSRunTaskStepLogPath(rtID string, step int) string { + return path.Join(LTSRunTaskLogsDir(rtID), "steps", fmt.Sprintf("%d.log", step)) } func LTSRunArchivePath(rtID string, step int) string { diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 2a91745..71e5ca0 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -173,7 +173,8 @@ type RunTask struct { // This data is opaque to the run service ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` - Steps []*RunTaskStep `json:"steps,omitempty"` + SetupStep RunTaskStep `json:"setup_step,omitempty"` + Steps []*RunTaskStep `json:"steps,omitempty"` // steps numbers of workspace archives, WorkspaceArchives []int `json:"workspace_archives,omitempty"` @@ -405,7 +406,8 @@ type ExecutorTaskStatus struct { ExecutorID string `json:"executor_id,omitempty"` Phase ExecutorTaskPhase `json:"phase,omitempty"` - Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"` + SetupStep ExecutorTaskStepStatus `json:"setup_step,omitempty"` + Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"` StartTime *time.Time `json:"start_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"`