*: implement setup step

This commit is contained in:
Simone Gotti 2019-03-13 15:48:35 +01:00
parent 41ac92086a
commit 8f4a5b29b9
12 changed files with 290 additions and 130 deletions

View File

@ -76,7 +76,16 @@ type RunTaskResponse struct {
Name string `json:"name"` Name string `json:"name"`
Status rstypes.RunTaskStatus `json:"status"` Status rstypes.RunTaskStatus `json:"status"`
Steps []*RunTaskResponseStep `json:"steps"` SetupStep *RunTaskResponseSetupStep `json:"setup_step"`
Steps []*RunTaskResponseStep `json:"steps"`
StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"`
}
type RunTaskResponseSetupStep struct {
Phase rstypes.ExecutorTaskPhase `json:"phase"`
Name string `json:"name"`
StartTime *time.Time `json:"start_time"` StartTime *time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time"` EndTime *time.Time `json:"end_time"`
@ -142,13 +151,20 @@ func createRunTaskResponse(rt *rstypes.RunTask, rct *rstypes.RunConfigTask) *Run
EndTime: rt.EndTime, EndTime: rt.EndTime,
} }
t.SetupStep = &RunTaskResponseSetupStep{
Name: "Task setup",
Phase: rt.SetupStep.Phase,
StartTime: rt.SetupStep.StartTime,
EndTime: rt.SetupStep.EndTime,
}
for i := 0; i < len(t.Steps); i++ { for i := 0; i < len(t.Steps); i++ {
s := &RunTaskResponseStep{ s := &RunTaskResponseStep{
Phase: rt.Steps[i].Phase,
StartTime: rt.Steps[i].StartTime, StartTime: rt.Steps[i].StartTime,
EndTime: rt.Steps[i].EndTime, EndTime: rt.Steps[i].EndTime,
} }
rcts := rct.Steps[i] rcts := rct.Steps[i]
s.Phase = rt.Steps[i].Phase
switch rcts := rcts.(type) { switch rcts := rcts.(type) {
case *rstypes.RunStep: case *rstypes.RunStep:
s.Name = rcts.Name s.Name = rcts.Name
@ -279,12 +295,13 @@ func NewRunsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *RunsHan
func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
query := r.URL.Query() q := r.URL.Query()
phaseFilter := query["phase"]
groups := query["group"]
changeGroups := query["changegroup"]
limitS := query.Get("limit") phaseFilter := q["phase"]
groups := q["group"]
changeGroups := q["changegroup"]
limitS := q.Get("limit")
limit := DefaultRunsLimit limit := DefaultRunsLimit
if limitS != "" { if limitS != "" {
var err error var err error
@ -302,11 +319,11 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
limit = MaxRunsLimit limit = MaxRunsLimit
} }
asc := false asc := false
if _, ok := query["asc"]; ok { if _, ok := q["asc"]; ok {
asc = true asc = true
} }
start := query.Get("start") start := q.Get("start")
runsResp, resp, err := h.runserviceClient.GetRuns(ctx, phaseFilter, groups, changeGroups, start, limit, asc) runsResp, resp, err := h.runserviceClient.GetRuns(ctx, phaseFilter, groups, changeGroups, start, limit, asc)
if err != nil { if err != nil {
@ -413,41 +430,53 @@ func NewLogsHandler(logger *zap.Logger, runserviceClient *rsapi.Client) *LogsHan
func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
// TODO(sgotti) Check authorized call from client q := r.URL.Query()
runID := r.URL.Query().Get("runID") runID := q.Get("runID")
if runID == "" { if runID == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
taskID := r.URL.Query().Get("taskID") taskID := q.Get("taskID")
if taskID == "" { if taskID == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
s := r.URL.Query().Get("step")
if s == "" { _, setup := q["setup"]
stepStr := q.Get("step")
if !setup && stepStr == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
step, err := strconv.Atoi(s) if setup && stepStr != "" {
if err != nil {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
var step int
if stepStr != "" {
var err error
step, err = strconv.Atoi(stepStr)
if err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
}
follow := false follow := false
if _, ok := r.URL.Query()["follow"]; ok { if _, ok := q["follow"]; ok {
follow = true follow = true
} }
stream := false stream := false
if _, ok := r.URL.Query()["stream"]; ok { if _, ok := q["stream"]; ok {
stream = true stream = true
} }
if follow { if follow {
stream = true stream = true
} }
resp, err := h.runserviceClient.GetLogs(ctx, runID, taskID, step, follow, stream) resp, err := h.runserviceClient.GetLogs(ctx, runID, taskID, setup, step, follow, stream)
if err != nil { if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound { if resp != nil && resp.StatusCode == http.StatusNotFound {
http.Error(w, err.Error(), http.StatusNotFound) http.Error(w, err.Error(), http.StatusNotFound)

View File

@ -61,44 +61,57 @@ func NewLogsHandler(logger *zap.Logger, e *Executor) *logsHandler {
} }
func (h *logsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *logsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO(sgotti) Check authorized call from scheduler q := r.URL.Query()
w.Header().Set("Content-Type", "text/event-stream") taskID := q.Get("taskid")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
taskID := r.URL.Query().Get("taskid")
if taskID == "" { if taskID == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
s := r.URL.Query().Get("step")
if s == "" { _, setup := q["setup"]
stepStr := q.Get("step")
if !setup && stepStr == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
step, err := strconv.Atoi(s) if setup && stepStr != "" {
if err != nil {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
var step int
if stepStr != "" {
var err error
step, err = strconv.Atoi(stepStr)
if err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
}
follow := false follow := false
_, ok := r.URL.Query()["follow"] _, ok := q["follow"]
if ok { if ok {
follow = true follow = true
} }
if err := h.readTaskLogs(taskID, step, w, follow); err != nil { if err := h.readTaskLogs(taskID, setup, step, w, follow); err != nil {
h.log.Errorf("err: %+v", err) h.log.Errorf("err: %+v", err)
} }
} }
func (h *logsHandler) readTaskLogs(taskID string, step int, w http.ResponseWriter, follow bool) error { func (h *logsHandler) readTaskLogs(taskID string, setup bool, step int, w http.ResponseWriter, follow bool) error {
logPath := h.e.logPath(taskID, step) var logPath string
return h.readLogs(taskID, step, logPath, w, follow) if setup {
logPath = h.e.setupLogPath(taskID)
} else {
logPath = h.e.stepLogPath(taskID, step)
}
return h.readLogs(taskID, setup, step, logPath, w, follow)
} }
func (h *logsHandler) readLogs(taskID string, step int, logPath string, w http.ResponseWriter, follow bool) error { func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath string, w http.ResponseWriter, follow bool) error {
f, err := os.Open(logPath) f, err := os.Open(logPath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -135,7 +148,7 @@ func (h *logsHandler) readLogs(taskID string, step int, logPath string, w http.R
if _, err := f.Seek(-int64(len(data)), io.SeekCurrent); err != nil { if _, err := f.Seek(-int64(len(data)), io.SeekCurrent); err != nil {
return errors.Wrapf(err, "failed to seek in log file %q", logPath) return errors.Wrapf(err, "failed to seek in log file %q", logPath)
} }
// check if the step is finished, is so flush until EOF and stop // check if the step is finished, if so flush until EOF and stop
rt, ok := h.e.runningTasks.get(taskID) rt, ok := h.e.runningTasks.get(taskID)
if !ok { if !ok {
flushstop = true flushstop = true
@ -171,14 +184,15 @@ func NewArchivesHandler(e *Executor) *archivesHandler {
} }
func (h *archivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *archivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// TODO(sgotti) Check authorized call from scheduler
taskID := r.URL.Query().Get("taskid") q := r.URL.Query()
taskID := q.Get("taskid")
if taskID == "" { if taskID == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
s := r.URL.Query().Get("step") s := q.Get("step")
if s == "" { if s == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return

View File

@ -109,7 +109,7 @@ func (d *DockerDriver) CopyToolbox(ctx context.Context, toolboxPath string) erro
return nil return nil
} }
func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig) (Pod, error) { func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) {
if len(podConfig.Containers) == 0 { if len(podConfig.Containers) == 0 {
return nil, errors.Errorf("empty container config") return nil, errors.Errorf("empty container config")
} }
@ -122,7 +122,7 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig) (Pod, e
if err != nil { if err != nil {
return nil, err return nil, err
} }
io.Copy(os.Stdout, reader) io.Copy(out, reader)
podID := uuid.NewV4().String() podID := uuid.NewV4().String()

View File

@ -99,7 +99,7 @@ func TestPod(t *testing.T) {
}, },
}, },
InitVolumeDir: "/tmp/agola", InitVolumeDir: "/tmp/agola",
}) }, ioutil.Discard)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -116,7 +116,7 @@ func TestPod(t *testing.T) {
}, },
}, },
InitVolumeDir: "/tmp/agola", InitVolumeDir: "/tmp/agola",
}) }, ioutil.Discard)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -155,7 +155,7 @@ func TestPod(t *testing.T) {
}, },
}, },
InitVolumeDir: "/tmp/agola", InitVolumeDir: "/tmp/agola",
}) }, ioutil.Discard)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -206,7 +206,7 @@ func TestPod(t *testing.T) {
}, },
}, },
InitVolumeDir: "/tmp/agola", InitVolumeDir: "/tmp/agola",
}) }, ioutil.Discard)
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }

View File

@ -38,7 +38,7 @@ const (
// * Kubernetes pods // * Kubernetes pods
// * A Virtual Machine on which we execute multiple processes // * A Virtual Machine on which we execute multiple processes
type Driver interface { type Driver interface {
NewPod(ctx context.Context, podConfig *PodConfig) (Pod, error) NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error)
GetPodsByLabels(ctx context.Context, labels map[string]string, all bool) ([]Pod, error) GetPodsByLabels(ctx context.Context, labels map[string]string, all bool) ([]Pod, error)
GetPodByID(ctx context.Context, containerID string) (Pod, error) GetPodByID(ctx context.Context, containerID string) (Pod, error)
} }

View File

@ -340,27 +340,22 @@ func (e *Executor) taskPath(taskID string) string {
return filepath.Join(e.tasksDir(), taskID) return filepath.Join(e.tasksDir(), taskID)
} }
func (e *Executor) logPath(taskID string, stepID int) string { func (e *Executor) taskLogsPath(taskID string) string {
return filepath.Join(e.taskPath(taskID), "logs", fmt.Sprintf("%d.log", stepID)) return filepath.Join(e.tasksDir(), taskID, "logs")
}
func (e *Executor) setupLogPath(taskID string) string {
return filepath.Join(e.taskLogsPath(taskID), "setup.log")
}
func (e *Executor) stepLogPath(taskID string, stepID int) string {
return filepath.Join(e.taskLogsPath(taskID), "steps", fmt.Sprintf("%d.log", stepID))
} }
func (e *Executor) archivePath(taskID string, stepID int) string { func (e *Executor) archivePath(taskID string, stepID int) string {
return filepath.Join(e.taskPath(taskID), "archives", fmt.Sprintf("%d.tar", stepID)) return filepath.Join(e.taskPath(taskID), "archives", fmt.Sprintf("%d.tar", stepID))
} }
func mkdirAllAndReplace(path string, perm os.FileMode) error {
// if the dir already exists rename it.
_, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) {
return err
}
if os.IsNotExist(err) {
return os.MkdirAll(path, perm)
}
// TODO(sgotti) UnixNano should be enough but doesn't totally avoids name collisions.
return os.Rename(path, fmt.Sprintf("%s.%d", path, time.Now().UnixNano()))
}
func (e *Executor) sendExecutorStatus(ctx context.Context) error { func (e *Executor) sendExecutorStatus(ctx context.Context) error {
executor := &types.Executor{ executor := &types.Executor{
ID: e.id, ID: e.id,
@ -429,51 +424,35 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) {
defer e.runningTasks.delete(et.ID) defer e.runningTasks.delete(et.ID)
rt.et.Status.Phase = types.ExecutorTaskPhaseRunning et.Status.Phase = types.ExecutorTaskPhaseRunning
rt.et.Status.StartTime = util.TimePtr(time.Now()) et.Status.StartTime = util.TimePtr(time.Now())
et.Status.SetupStep.Phase = types.ExecutorTaskPhaseRunning
et.Status.SetupStep.StartTime = util.TimePtr(time.Now())
if err := e.sendExecutorTaskStatus(ctx, et); err != nil { if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
}
if err := e.setupTask(ctx, rt); err != nil {
rt.et.Status.Phase = types.ExecutorTaskPhaseFailed
et.Status.SetupStep.EndTime = util.TimePtr(time.Now())
et.Status.SetupStep.Phase = types.ExecutorTaskPhaseFailed
et.Status.SetupStep.EndTime = util.TimePtr(time.Now())
if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
log.Errorf("err: %+v", err)
}
rt.Unlock() rt.Unlock()
return return
} }
cmd := []string{toolboxContainerPath, "sleeper"} et.Status.SetupStep.Phase = types.ExecutorTaskPhaseSuccess
if et.Containers[0].Entrypoint != "" { et.Status.SetupStep.EndTime = util.TimePtr(time.Now())
cmd = strings.Split(et.Containers[0].Entrypoint, " ") if err := e.sendExecutorTaskStatus(ctx, et); err != nil {
log.Infof("cmd: %v", cmd)
}
log.Debugf("starting pod")
podConfig := &driver.PodConfig{
Labels: createTaskLabels(et.ID),
InitVolumeDir: toolboxContainerDir,
Containers: []*driver.ContainerConfig{
{
Image: et.Containers[0].Image,
Cmd: cmd,
Env: et.Containers[0].Environment,
WorkingDir: et.WorkingDir,
User: et.Containers[0].User,
Privileged: et.Containers[0].Privileged,
},
},
}
pod, err := e.driver.NewPod(ctx, podConfig)
if err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
rt.Unlock()
return
} }
rt.pod = pod
// ignore pod stop errors
defer pod.Stop(ctx)
log.Debugf("started pod")
rt.Unlock() rt.Unlock()
_, err = e.executeTaskInternal(ctx, et, pod) _, err := e.executeTaskInternal(ctx, et, rt.pod)
rt.Lock() rt.Lock()
if err != nil { if err != nil {
@ -491,13 +470,63 @@ func (e *Executor) executeTask(ctx context.Context, et *types.ExecutorTask) {
rt.Unlock() rt.Unlock()
} }
func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
et := rt.et
if err := os.RemoveAll(e.taskPath(et.ID)); err != nil {
return err
}
if err := os.MkdirAll(e.taskPath(et.ID), 0770); err != nil {
return err
}
cmd := []string{toolboxContainerPath, "sleeper"}
if et.Containers[0].Entrypoint != "" {
cmd = strings.Split(et.Containers[0].Entrypoint, " ")
log.Infof("cmd: %v", cmd)
}
log.Debugf("starting pod")
podConfig := &driver.PodConfig{
Labels: createTaskLabels(et.ID),
InitVolumeDir: toolboxContainerDir,
Containers: []*driver.ContainerConfig{
{
Image: et.Containers[0].Image,
Cmd: cmd,
Env: et.Containers[0].Environment,
WorkingDir: et.WorkingDir,
User: et.Containers[0].User,
Privileged: et.Containers[0].Privileged,
},
},
}
setupLogPath := e.setupLogPath(et.ID)
if err := os.MkdirAll(filepath.Dir(setupLogPath), 0770); err != nil {
return err
}
outf, err := os.Create(setupLogPath)
if err != nil {
return err
}
defer outf.Close()
outf.WriteString("Starting pod.\n")
pod, err := e.driver.NewPod(ctx, podConfig, outf)
if err != nil {
outf.WriteString("Pod failed to start.\n")
return err
}
outf.WriteString("Pod started.\n")
rt.pod = pod
return nil
}
func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTask, pod driver.Pod) (int, error) { func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTask, pod driver.Pod) (int, error) {
log.Debugf("task: %s", et.TaskName) log.Debugf("task: %s", et.TaskName)
if err := mkdirAllAndReplace(e.taskPath(et.ID), 0770); err != nil {
return 0, err
}
for i, step := range et.Steps { for i, step := range et.Steps {
//log.Debugf("step: %v", util.Dump(step)) //log.Debugf("step: %v", util.Dump(step))
@ -522,18 +551,18 @@ func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTa
case *types.RunStep: case *types.RunStep:
log.Debugf("run step: %s", util.Dump(s)) log.Debugf("run step: %s", util.Dump(s))
stepName = s.Name stepName = s.Name
exitCode, err = e.doRunStep(ctx, s, et, pod, e.logPath(et.ID, i)) exitCode, err = e.doRunStep(ctx, s, et, pod, e.stepLogPath(et.ID, i))
case *types.SaveToWorkspaceStep: case *types.SaveToWorkspaceStep:
log.Debugf("save to workspace step: %s", util.Dump(s)) log.Debugf("save to workspace step: %s", util.Dump(s))
stepName = s.Name stepName = s.Name
archivePath := e.archivePath(et.ID, i) archivePath := e.archivePath(et.ID, i)
exitCode, err = e.doSaveToWorkspaceStep(ctx, s, et, pod, e.logPath(et.ID, i), archivePath) exitCode, err = e.doSaveToWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i), archivePath)
case *types.RestoreWorkspaceStep: case *types.RestoreWorkspaceStep:
log.Debugf("restore workspace step: %s", util.Dump(s)) log.Debugf("restore workspace step: %s", util.Dump(s))
stepName = s.Name stepName = s.Name
exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.logPath(et.ID, i)) exitCode, err = e.doRestoreWorkspaceStep(ctx, s, et, pod, e.stepLogPath(et.ID, i))
default: default:
return i, errors.Errorf("unknown step type: %s", util.Dump(s)) return i, errors.Errorf("unknown step type: %s", util.Dump(s))

View File

@ -58,40 +58,53 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
// TODO(sgotti) Check authorized call from client // TODO(sgotti) Check authorized call from client
q := r.URL.Query()
runID := r.URL.Query().Get("runid") runID := q.Get("runid")
if runID == "" { if runID == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
taskID := r.URL.Query().Get("taskid") taskID := q.Get("taskid")
if taskID == "" { if taskID == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
s := r.URL.Query().Get("step")
if s == "" { _, setup := q["setup"]
stepStr := q.Get("step")
if !setup && stepStr == "" {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
step, err := strconv.Atoi(s) if setup && stepStr != "" {
if err != nil {
http.Error(w, "", http.StatusBadRequest) http.Error(w, "", http.StatusBadRequest)
return return
} }
var step int
if stepStr != "" {
var err error
step, err = strconv.Atoi(stepStr)
if err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
}
follow := false follow := false
if _, ok := r.URL.Query()["follow"]; ok { if _, ok := q["follow"]; ok {
follow = true follow = true
} }
stream := false stream := false
if _, ok := r.URL.Query()["stream"]; ok { if _, ok := q["stream"]; ok {
stream = true stream = true
} }
if follow { if follow {
stream = true stream = true
} }
if err, sendError := h.readTaskLogs(ctx, runID, taskID, step, w, follow, stream); err != nil { if err, sendError := h.readTaskLogs(ctx, runID, taskID, setup, step, w, follow, stream); err != nil {
h.log.Errorf("err: %+v", err) h.log.Errorf("err: %+v", err)
if sendError { if sendError {
switch err.(type) { switch err.(type) {
@ -104,7 +117,7 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, step int, w http.ResponseWriter, follow, stream bool) (error, bool) { func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) {
r, err := store.GetRunEtcdOrLTS(ctx, h.e, h.wal, runID) r, err := store.GetRunEtcdOrLTS(ctx, h.e, h.wal, runID)
if err != nil { if err != nil {
return err, true return err, true
@ -123,7 +136,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, st
// if the log has been already fetched use it, otherwise fetch it from the executor // if the log has been already fetched use it, otherwise fetch it from the executor
if task.Steps[step].LogPhase == types.RunTaskFetchPhaseFinished { if task.Steps[step].LogPhase == types.RunTaskFetchPhaseFinished {
logPath := store.LTSRunLogPath(task.ID, step) var logPath string
if setup {
logPath = store.LTSRunTaskSetupLogPath(task.ID)
} else {
logPath = store.LTSRunTaskStepLogPath(task.ID, step)
}
f, err := h.lts.ReadObject(logPath) f, err := h.lts.ReadObject(logPath)
if err != nil { if err != nil {
if err == objectstorage.ErrNotExist { if err == objectstorage.ErrNotExist {
@ -147,7 +165,12 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, st
return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true return common.NewErrNotExist(errors.Errorf("executor with id %q doesn't exist", et.Status.ExecutorID)), true
} }
url := fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&step=%d", executor.ListenURL, taskID, step) var url string
if setup {
url = fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&setup", executor.ListenURL, taskID)
} else {
url = fmt.Sprintf("%s/api/v1alpha/executor/logs?taskid=%s&step=%d", executor.ListenURL, taskID, step)
}
if follow { if follow {
url += "&follow" url += "&follow"
} }

View File

@ -236,11 +236,15 @@ func (c *Client) GetRun(ctx context.Context, runID string) (*RunResponse, *http.
return runResponse, resp, err return runResponse, resp, err
} }
func (c *Client) GetLogs(ctx context.Context, runID, taskID string, step int, follow, stream bool) (*http.Response, error) { func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, step int, follow, stream bool) (*http.Response, error) {
q := url.Values{} q := url.Values{}
q.Add("runid", runID) q.Add("runid", runID)
q.Add("taskid", taskID) q.Add("taskid", taskID)
q.Add("step", strconv.Itoa(step)) if setup {
q.Add("setup", "")
} else {
q.Add("step", strconv.Itoa(step))
}
if follow { if follow {
q.Add("follow", "") q.Add("follow", "")
} }

View File

@ -328,6 +328,10 @@ func (s *CommandHandler) genRunTask(ctx context.Context, rct *types.RunConfigTas
if rt.Skip { if rt.Skip {
rt.Status = types.RunTaskStatusSkipped rt.Status = types.RunTaskStatusSkipped
} }
rt.SetupStep = types.RunTaskStep{
Phase: types.ExecutorTaskPhaseNotStarted,
LogPhase: types.RunTaskFetchPhaseNotStarted,
}
for i := range rt.Steps { for i := range rt.Steps {
s := &types.RunTaskStep{ s := &types.RunTaskStep{
Phase: types.ExecutorTaskPhaseNotStarted, Phase: types.ExecutorTaskPhaseNotStarted,

View File

@ -381,11 +381,12 @@ func (s *Scheduler) advanceRun(ctx context.Context, runID string) error {
} }
// if the run is finished AND there're no executor tasks scheduled we can mark // if the run is finished AND there're no executor tasks scheduled we can mark
// all not started runtasks' fetch phases (logs and archives) as finished // all not started runtasks' fetch phases (setup step, logs and archives) as finished
if r.Phase.IsFinished() { if r.Phase.IsFinished() {
for _, rt := range r.RunTasks { for _, rt := range r.RunTasks {
log.Debugf("rt: %s", util.Dump(rt)) log.Debugf("rt: %s", util.Dump(rt))
if rt.Status == types.RunTaskStatusNotStarted { if rt.Status == types.RunTaskStatusNotStarted {
rt.SetupStep.LogPhase = types.RunTaskFetchPhaseFinished
for _, s := range rt.Steps { for _, s := range rt.Steps {
s.LogPhase = types.RunTaskFetchPhaseFinished s.LogPhase = types.RunTaskFetchPhaseFinished
} }
@ -483,6 +484,10 @@ func (s *Scheduler) updateRunStatus(ctx context.Context, et *types.ExecutorTask)
rt.Status = types.RunTaskStatusFailed rt.Status = types.RunTaskStatusFailed
} }
rt.SetupStep.Phase = et.Status.SetupStep.Phase
rt.SetupStep.StartTime = et.Status.SetupStep.StartTime
rt.SetupStep.EndTime = et.Status.SetupStep.EndTime
for i, s := range et.Status.Steps { for i, s := range et.Status.Steps {
rt.Steps[i].Phase = s.Phase rt.Steps[i].Phase = s.Phase
rt.Steps[i].StartTime = s.StartTime rt.Steps[i].StartTime = s.StartTime
@ -676,7 +681,7 @@ func (s *Scheduler) fileExists(path string) (bool, error) {
return !os.IsNotExist(err), nil return !os.IsNotExist(err), nil
} }
func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int) error { func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error {
et, err := store.GetExecutorTask(ctx, s.e, rt.ID) et, err := store.GetExecutorTask(ctx, s.e, rt.ID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
@ -696,8 +701,13 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int
return nil return nil
} }
path := store.LTSRunLogPath(rt.ID, stepnum) var logPath string
ok, err := s.fileExists(path) if setup {
logPath = store.LTSRunTaskSetupLogPath(rt.ID)
} else {
logPath = store.LTSRunTaskStepLogPath(rt.ID, stepnum)
}
ok, err := s.fileExists(logPath)
if err != nil { if err != nil {
return err return err
} }
@ -705,8 +715,12 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int
return nil return nil
} }
u := fmt.Sprintf(executor.ListenURL+"/api/v1alpha/executor/logs?taskid=%s&step=%d", rt.ID, stepnum) var u string
log.Debugf("fetchLog: %s", u) if setup {
u = fmt.Sprintf(executor.ListenURL+"/api/v1alpha/executor/logs?taskid=%s&setup", rt.ID)
} else {
u = fmt.Sprintf(executor.ListenURL+"/api/v1alpha/executor/logs?taskid=%s&step=%d", rt.ID, stepnum)
}
r, err := http.Get(u) r, err := http.Get(u)
if err != nil { if err != nil {
return err return err
@ -721,10 +735,27 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, stepnum int
return errors.Errorf("received http status: %d", r.StatusCode) return errors.Errorf("received http status: %d", r.StatusCode)
} }
return s.lts.WriteObject(path, r.Body) return s.lts.WriteObject(logPath, r.Body)
} }
func (s *Scheduler) finishLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error {
r, _, err := store.GetRun(ctx, s.e, runID)
if err != nil {
return err
}
rt, ok := r.RunTasks[runTaskID]
if !ok {
return errors.Errorf("no such task with ID %s in run %s", runTaskID, runID)
}
rt.SetupStep.LogPhase = types.RunTaskFetchPhaseFinished
if _, err := store.AtomicPutRun(ctx, s.e, r, "", nil); err != nil {
return err
}
return nil
}
func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error {
r, _, err := store.GetRun(ctx, s.e, runID) r, _, err := store.GetRun(ctx, s.e, runID)
if err != nil { if err != nil {
return err return err
@ -776,14 +807,26 @@ func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID str
func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) { func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) {
log.Debugf("fetchTaskLogs") log.Debugf("fetchTaskLogs")
// fetch setup log
if rt.SetupStep.LogPhase == types.RunTaskFetchPhaseNotStarted {
if err := s.fetchLog(ctx, rt, true, 0); err != nil {
log.Errorf("err: %+v", err)
}
if err := s.finishSetupLogPhase(ctx, runID, rt.ID); err != nil {
log.Errorf("err: %+v", err)
}
}
// fetch steps logs
for i, rts := range rt.Steps { for i, rts := range rt.Steps {
lp := rts.LogPhase lp := rts.LogPhase
if lp == types.RunTaskFetchPhaseNotStarted { if lp == types.RunTaskFetchPhaseNotStarted {
if err := s.fetchLog(ctx, rt, i); err != nil { if err := s.fetchLog(ctx, rt, false, i); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
continue continue
} }
if err := s.finishLogPhase(ctx, runID, rt.ID, i); err != nil { if err := s.finishStepLogPhase(ctx, runID, rt.ID, i); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
continue continue
} }
@ -988,6 +1031,10 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error
done := true done := true
for _, rt := range r.RunTasks { for _, rt := range r.RunTasks {
// check all logs are fetched // check all logs are fetched
if rt.SetupStep.LogPhase != types.RunTaskFetchPhaseFinished {
done = false
break
}
for _, rts := range rt.Steps { for _, rts := range rt.Steps {
lp := rts.LogPhase lp := rts.LogPhase
if lp != types.RunTaskFetchPhaseFinished { if lp != types.RunTaskFetchPhaseFinished {

View File

@ -129,8 +129,16 @@ func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa
return action, nil return action, nil
} }
func LTSRunLogPath(rtID string, step int) string { func LTSRunTaskLogsDir(rtID string) string {
return path.Join("logs", fmt.Sprintf("%s/%d.log", rtID, step)) return path.Join("logs", rtID)
}
func LTSRunTaskSetupLogPath(rtID string) string {
return path.Join(LTSRunTaskLogsDir(rtID), "setup.log")
}
func LTSRunTaskStepLogPath(rtID string, step int) string {
return path.Join(LTSRunTaskLogsDir(rtID), "steps", fmt.Sprintf("%d.log", step))
} }
func LTSRunArchivePath(rtID string, step int) string { func LTSRunArchivePath(rtID string, step int) string {

View File

@ -173,7 +173,8 @@ type RunTask struct {
// This data is opaque to the run service // This data is opaque to the run service
ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"` ApprovalAnnotations map[string]string `json:"approval_annotations,omitempty"`
Steps []*RunTaskStep `json:"steps,omitempty"` SetupStep RunTaskStep `json:"setup_step,omitempty"`
Steps []*RunTaskStep `json:"steps,omitempty"`
// steps numbers of workspace archives, // steps numbers of workspace archives,
WorkspaceArchives []int `json:"workspace_archives,omitempty"` WorkspaceArchives []int `json:"workspace_archives,omitempty"`
@ -405,7 +406,8 @@ type ExecutorTaskStatus struct {
ExecutorID string `json:"executor_id,omitempty"` ExecutorID string `json:"executor_id,omitempty"`
Phase ExecutorTaskPhase `json:"phase,omitempty"` Phase ExecutorTaskPhase `json:"phase,omitempty"`
Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"` SetupStep ExecutorTaskStepStatus `json:"setup_step,omitempty"`
Steps []*ExecutorTaskStepStatus `json:"steps,omitempty"`
StartTime *time.Time `json:"start_time,omitempty"` StartTime *time.Time `json:"start_time,omitempty"`
EndTime *time.Time `json:"end_time,omitempty"` EndTime *time.Time `json:"end_time,omitempty"`