From bec9476d6c5d233f808118a4979bf913f50cbea9 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 8 May 2019 12:11:46 +0200 Subject: [PATCH] runservice: store related runid with logs and archives Logs and archives can be shared by multiple runs. So removing a run doesn't imply that we could also remote the logs and archives since they could be "referenced" by another run. Store also the runids as specific objects along with the logs and archives so, we'll remove them only when no runids objects exist. --- internal/services/runservice/api/executor.go | 2 +- internal/services/runservice/scheduler.go | 43 ++++++++++++++------ internal/services/runservice/store/store.go | 38 ++++++++++++++--- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index bceb6ba..5b28b6a 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -242,7 +242,7 @@ func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error { - archivePath := store.OSTRunArchivePath(rtID, step) + archivePath := store.OSTRunTaskArchivePath(rtID, step) f, err := h.ost.ReadObject(archivePath) if err != nil { if err == objectstorage.ErrNotExist { diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 4d9480b..7c3a381 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "net/http" - "os" "sort" "strconv" "time" @@ -861,12 +860,12 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { return nil } -func (s *Runservice) fileExists(path string) (bool, error) { - _, err := os.Stat(path) - if err != nil && !os.IsNotExist(err) { +func (s *Runservice) OSTFileExists(path string) (bool, error) { + _, err := s.ost.Stat(path) + if err != nil && err != objectstorage.ErrNotExist { return false, err } - return !os.IsNotExist(err), nil + return err == nil, nil } func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { @@ -895,7 +894,7 @@ func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool } else { logPath = store.OSTRunTaskStepLogPath(rt.ID, stepnum) } - ok, err := s.fileExists(logPath) + ok, err := s.OSTFileExists(logPath) if err != nil { return err } @@ -1049,8 +1048,8 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu return nil } - path := store.OSTRunArchivePath(rt.ID, stepnum) - ok, err := s.fileExists(path) + path := store.OSTRunTaskArchivePath(rt.ID, stepnum) + ok, err := s.OSTFileExists(path) if err != nil { return err } @@ -1088,6 +1087,7 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { log.Debugf("fetchTaskArchives") + for i, stepnum := range rt.WorkspaceArchives { phase := rt.WorkspaceArchivesPhase[i] if phase == types.RunTaskFetchPhaseNotStarted { @@ -1132,15 +1132,32 @@ func (s *Runservice) fetcher(ctx context.Context) error { for _, rt := range r.Tasks { log.Debugf("rt: %s", util.Dump(rt)) if rt.Status.IsFinished() { + // write related logs runID + runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID) + exists, err := s.OSTFileExists(runIDPath) + if err != nil { + log.Errorf("err: %+v", err) + } else if !exists { + if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { + log.Errorf("err: %+v", err) + } + } + + // write related archives runID + runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID) + exists, err = s.OSTFileExists(runIDPath) + if err != nil { + log.Errorf("err: %+v", err) + } else if !exists { + if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil { + log.Errorf("err: %+v", err) + } + } + s.fetchTaskLogs(ctx, r.ID, rt) s.fetchTaskArchives(ctx, r.ID, rt) } } - - // We don't update the fetch phases and atomic put the run since fetching may - // take a lot of time and the run will be already updated in the meantime - // causing the atomic put will fail - // Another loop will check if the fetched file exists and update the run } return nil diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 65b1bd3..db77d55 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -111,20 +111,48 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*da return action, nil } -func OSTRunTaskLogsDir(rtID string) string { +func OSTRunTaskLogsBaseDir(rtID string) string { return path.Join("logs", rtID) } +func OSTRunTaskLogsDataDir(rtID string) string { + return path.Join(OSTRunTaskLogsBaseDir(rtID), "data") +} + +func OSTRunTaskLogsRunsDir(rtID string) string { + return path.Join(OSTRunTaskLogsBaseDir(rtID), "runs") +} + func OSTRunTaskSetupLogPath(rtID string) string { - return path.Join(OSTRunTaskLogsDir(rtID), "setup.log") + return path.Join(OSTRunTaskLogsDataDir(rtID), "setup.log") } func OSTRunTaskStepLogPath(rtID string, step int) string { - return path.Join(OSTRunTaskLogsDir(rtID), "steps", fmt.Sprintf("%d.log", step)) + return path.Join(OSTRunTaskLogsDataDir(rtID), "steps", fmt.Sprintf("%d.log", step)) } -func OSTRunArchivePath(rtID string, step int) string { - return path.Join("workspacearchives", fmt.Sprintf("%s/%d.tar", rtID, step)) +func OSTRunTaskLogsRunPath(rtID, runID string) string { + return path.Join(OSTRunTaskLogsRunsDir(rtID), runID) +} + +func OSTRunTaskArchivesBaseDir(rtID string) string { + return path.Join("workspacearchives", rtID) +} + +func OSTRunTaskArchivesDataDir(rtID string) string { + return path.Join(OSTRunTaskArchivesBaseDir(rtID), "data") +} + +func OSTRunTaskArchivesRunsDir(rtID string) string { + return path.Join(OSTRunTaskArchivesBaseDir(rtID), "runs") +} + +func OSTRunTaskArchivePath(rtID string, step int) string { + return path.Join(OSTRunTaskArchivesDataDir(rtID), fmt.Sprintf("%d.tar", step)) +} + +func OSTRunTaskArchivesRunPath(rtID, runID string) string { + return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID) } func OSTCacheDir() string {