runservice: store related runid with logs and archives

Logs and archives can be shared by multiple runs. So removing a run doesn't
imply that we could also remote the logs and archives since they could be
"referenced" by another run.

Store also the runids as specific objects along with the logs and archives so,
we'll remove them only when no runids objects exist.
This commit is contained in:
Simone Gotti 2019-05-08 12:11:46 +02:00
parent 43341f2cba
commit bec9476d6c
3 changed files with 64 additions and 19 deletions

View File

@ -242,7 +242,7 @@ func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error { func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error {
archivePath := store.OSTRunArchivePath(rtID, step) archivePath := store.OSTRunTaskArchivePath(rtID, step)
f, err := h.ost.ReadObject(archivePath) f, err := h.ost.ReadObject(archivePath)
if err != nil { if err != nil {
if err == objectstorage.ErrNotExist { if err == objectstorage.ErrNotExist {

View File

@ -20,7 +20,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"os"
"sort" "sort"
"strconv" "strconv"
"time" "time"
@ -861,12 +860,12 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error {
return nil return nil
} }
func (s *Runservice) fileExists(path string) (bool, error) { func (s *Runservice) OSTFileExists(path string) (bool, error) {
_, err := os.Stat(path) _, err := s.ost.Stat(path)
if err != nil && !os.IsNotExist(err) { if err != nil && err != objectstorage.ErrNotExist {
return false, err return false, err
} }
return !os.IsNotExist(err), nil return err == nil, nil
} }
func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error {
@ -895,7 +894,7 @@ func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool
} else { } else {
logPath = store.OSTRunTaskStepLogPath(rt.ID, stepnum) logPath = store.OSTRunTaskStepLogPath(rt.ID, stepnum)
} }
ok, err := s.fileExists(logPath) ok, err := s.OSTFileExists(logPath)
if err != nil { if err != nil {
return err return err
} }
@ -1049,8 +1048,8 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu
return nil return nil
} }
path := store.OSTRunArchivePath(rt.ID, stepnum) path := store.OSTRunTaskArchivePath(rt.ID, stepnum)
ok, err := s.fileExists(path) ok, err := s.OSTFileExists(path)
if err != nil { if err != nil {
return err return err
} }
@ -1088,6 +1087,7 @@ func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnu
func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {
log.Debugf("fetchTaskArchives") log.Debugf("fetchTaskArchives")
for i, stepnum := range rt.WorkspaceArchives { for i, stepnum := range rt.WorkspaceArchives {
phase := rt.WorkspaceArchivesPhase[i] phase := rt.WorkspaceArchivesPhase[i]
if phase == types.RunTaskFetchPhaseNotStarted { if phase == types.RunTaskFetchPhaseNotStarted {
@ -1132,15 +1132,32 @@ func (s *Runservice) fetcher(ctx context.Context) error {
for _, rt := range r.Tasks { for _, rt := range r.Tasks {
log.Debugf("rt: %s", util.Dump(rt)) log.Debugf("rt: %s", util.Dump(rt))
if rt.Status.IsFinished() { if rt.Status.IsFinished() {
// write related logs runID
runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID)
exists, err := s.OSTFileExists(runIDPath)
if err != nil {
log.Errorf("err: %+v", err)
} else if !exists {
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
log.Errorf("err: %+v", err)
}
}
// write related archives runID
runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID)
exists, err = s.OSTFileExists(runIDPath)
if err != nil {
log.Errorf("err: %+v", err)
} else if !exists {
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
log.Errorf("err: %+v", err)
}
}
s.fetchTaskLogs(ctx, r.ID, rt) s.fetchTaskLogs(ctx, r.ID, rt)
s.fetchTaskArchives(ctx, r.ID, rt) s.fetchTaskArchives(ctx, r.ID, rt)
} }
} }
// We don't update the fetch phases and atomic put the run since fetching may
// take a lot of time and the run will be already updated in the meantime
// causing the atomic put will fail
// Another loop will check if the fetched file exists and update the run
} }
return nil return nil

View File

@ -111,20 +111,48 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*da
return action, nil return action, nil
} }
func OSTRunTaskLogsDir(rtID string) string { func OSTRunTaskLogsBaseDir(rtID string) string {
return path.Join("logs", rtID) return path.Join("logs", rtID)
} }
func OSTRunTaskLogsDataDir(rtID string) string {
return path.Join(OSTRunTaskLogsBaseDir(rtID), "data")
}
func OSTRunTaskLogsRunsDir(rtID string) string {
return path.Join(OSTRunTaskLogsBaseDir(rtID), "runs")
}
func OSTRunTaskSetupLogPath(rtID string) string { func OSTRunTaskSetupLogPath(rtID string) string {
return path.Join(OSTRunTaskLogsDir(rtID), "setup.log") return path.Join(OSTRunTaskLogsDataDir(rtID), "setup.log")
} }
func OSTRunTaskStepLogPath(rtID string, step int) string { func OSTRunTaskStepLogPath(rtID string, step int) string {
return path.Join(OSTRunTaskLogsDir(rtID), "steps", fmt.Sprintf("%d.log", step)) return path.Join(OSTRunTaskLogsDataDir(rtID), "steps", fmt.Sprintf("%d.log", step))
} }
func OSTRunArchivePath(rtID string, step int) string { func OSTRunTaskLogsRunPath(rtID, runID string) string {
return path.Join("workspacearchives", fmt.Sprintf("%s/%d.tar", rtID, step)) return path.Join(OSTRunTaskLogsRunsDir(rtID), runID)
}
func OSTRunTaskArchivesBaseDir(rtID string) string {
return path.Join("workspacearchives", rtID)
}
func OSTRunTaskArchivesDataDir(rtID string) string {
return path.Join(OSTRunTaskArchivesBaseDir(rtID), "data")
}
func OSTRunTaskArchivesRunsDir(rtID string) string {
return path.Join(OSTRunTaskArchivesBaseDir(rtID), "runs")
}
func OSTRunTaskArchivePath(rtID string, step int) string {
return path.Join(OSTRunTaskArchivesDataDir(rtID), fmt.Sprintf("%d.tar", step))
}
func OSTRunTaskArchivesRunPath(rtID, runID string) string {
return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID)
} }
func OSTCacheDir() string { func OSTCacheDir() string {