diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index bceb6ba..5b28b6a 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -242,7 +242,7 @@ func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error { - archivePath := store.OSTRunArchivePath(rtID, step) + archivePath := store.OSTRunTaskArchivePath(rtID, step) f, err := h.ost.ReadObject(archivePath) if err != nil { if err == objectstorage.ErrNotExist { diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 4d9480b..7c3a381 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "net/http" - "os" "sort" "strconv" "time" @@ -861,12 +860,12 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { return nil } -func (s *Runservice) fileExists(path string) (bool, error) { - _, err := os.Stat(path) - if err != nil && !os.IsNotExist(err) { +func (s *Runservice) OSTFileExists(path string) (bool, error) { + _, err := s.ost.Stat(path) + if err != nil && err != objectstorage.ErrNotExist { return false, err } - return !os.IsNotExist(err), nil + return err == nil, nil } func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { @@ -895,7 +894,7 @@ func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool } else { logPath = store.OSTRunTaskStepLogPath(rt.ID, stepnum) } - ok, err := s.fileExists(logPath) + ok, err := s.OSTFileExists(logPath) if err != nil { return err } @@ -1049,8 +1048,8 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu return nil } - path := store.OSTRunArchivePath(rt.ID, stepnum) - ok, err := s.fileExists(path) + path := store.OSTRunTaskArchivePath(rt.ID, stepnum) + ok, err := s.OSTFileExists(path) if err != nil { return err } @@ -1088,6 +1087,7 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { log.Debugf("fetchTaskArchives") + for i, stepnum := range rt.WorkspaceArchives { phase := rt.WorkspaceArchivesPhase[i] if phase == types.RunTaskFetchPhaseNotStarted { @@ -1132,15 +1132,32 @@ func (s *Runservice) fetcher(ctx context.Context) error { for _, rt := range r.Tasks { log.Debugf("rt: %s", util.Dump(rt)) if rt.Status.IsFinished() { + // write related logs runID + runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID) + exists, err := s.OSTFileExists(runIDPath) + if err != nil { + log.Errorf("err: %+v", err) + } else if !exists { + if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { + log.Errorf("err: %+v", err) + } + } + + // write related archives runID + runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID) + exists, err = s.OSTFileExists(runIDPath) + if err != nil { + log.Errorf("err: %+v", err) + } else if !exists { + if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { + log.Errorf("err: %+v", err) + } + } + s.fetchTaskLogs(ctx, r.ID, rt) s.fetchTaskArchives(ctx, r.ID, rt) } } - - // We don't update the fetch phases and atomic put the run since fetching may - // take a lot of time and the run will be already updated in the meantime - // causing the atomic put will fail - // Another loop will check if the fetched file exists and update the run } return nil diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 65b1bd3..db77d55 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -111,20 +111,48 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*da return action, nil } -func OSTRunTaskLogsDir(rtID string) string { +func OSTRunTaskLogsBaseDir(rtID string) string { return path.Join("logs", rtID) } +func OSTRunTaskLogsDataDir(rtID string) string { + return path.Join(OSTRunTaskLogsBaseDir(rtID), "data") +} + +func OSTRunTaskLogsRunsDir(rtID string) string { + return path.Join(OSTRunTaskLogsBaseDir(rtID), "runs") +} + func OSTRunTaskSetupLogPath(rtID string) string { - return path.Join(OSTRunTaskLogsDir(rtID), "setup.log") + return path.Join(OSTRunTaskLogsDataDir(rtID), "setup.log") } func OSTRunTaskStepLogPath(rtID string, step int) string { - return path.Join(OSTRunTaskLogsDir(rtID), "steps", fmt.Sprintf("%d.log", step)) + return path.Join(OSTRunTaskLogsDataDir(rtID), "steps", fmt.Sprintf("%d.log", step)) } -func OSTRunArchivePath(rtID string, step int) string { - return path.Join("workspacearchives", fmt.Sprintf("%s/%d.tar", rtID, step)) +func OSTRunTaskLogsRunPath(rtID, runID string) string { + return path.Join(OSTRunTaskLogsRunsDir(rtID), runID) +} + +func OSTRunTaskArchivesBaseDir(rtID string) string { + return path.Join("workspacearchives", rtID) +} + +func OSTRunTaskArchivesDataDir(rtID string) string { + return path.Join(OSTRunTaskArchivesBaseDir(rtID), "data") +} + +func OSTRunTaskArchivesRunsDir(rtID string) string { + return path.Join(OSTRunTaskArchivesBaseDir(rtID), "runs") +} + +func OSTRunTaskArchivePath(rtID string, step int) string { + return path.Join(OSTRunTaskArchivesDataDir(rtID), fmt.Sprintf("%d.tar", step)) +} + +func OSTRunTaskArchivesRunPath(rtID, runID string) string { + return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID) } func OSTCacheDir() string {