From bad18bf8148081cca6f7be6541397bef39f6a17f Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 2 May 2019 09:49:55 +0200 Subject: [PATCH] *: report objects size for objectstorage.WriteObject --- internal/datamanager/data.go | 6 ++-- internal/datamanager/wal.go | 6 ++-- internal/services/runservice/executor/api.go | 22 ++++++++++-- .../services/runservice/executor/executor.go | 6 +++- .../services/runservice/scheduler/api/api.go | 2 +- .../runservice/scheduler/api/client.go | 34 +++++++++++-------- .../runservice/scheduler/api/executor.go | 12 ++++++- .../runservice/scheduler/scheduler.go | 23 +++++++++++-- 8 files changed, 82 insertions(+), 29 deletions(-) diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 10de284..b7eaca7 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -89,7 +89,7 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { if err != nil { return err } - if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), -1, true); err != nil { + if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { return err } @@ -203,7 +203,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty pos += len(dataEntryj) } - if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, -1, true); err != nil { + if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, int64(buf.Len()), true); err != nil { return err } @@ -211,7 +211,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty if err != nil { return err } - if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), -1, true); err != nil { + if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil { return err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 0adfba2..bd7e0c6 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -485,7 +485,7 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti return nil, err } } - if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), -1, true); err != nil { + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), int64(buf.Len()), true); err != nil { return nil, err } d.log.Debugf("wrote wal file: %s", walDataFilePath) @@ -629,7 +629,7 @@ func (d *DataManager) sync(ctx context.Context) error { } walFileCommittedPath := walFilePath + ".committed" - if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), -1, true); err != nil { + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil { return err } @@ -659,7 +659,7 @@ func (d *DataManager) sync(ctx context.Context) error { walFilePath := d.storageWalStatusFile(walData.WalSequence) d.log.Debugf("checkpointing committed wal to storage") walFileCheckpointedPath := walFilePath + ".checkpointed" - if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), -1, true); err != nil { + if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), 0, true); err != nil { return err } } diff --git a/internal/services/runservice/executor/api.go b/internal/services/runservice/executor/api.go index 8162755..a79ee19 100644 --- a/internal/services/runservice/executor/api.go +++ b/internal/services/runservice/executor/api.go @@ -123,12 +123,23 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri } defer f.Close() - w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") br := bufio.NewReader(f) + // if not following return the Content-Length and just do io.Copy + if !follow { + fi, err := f.Stat() + if err != nil { + return err + } + w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10)) + + _, err = io.Copy(w, br) + return err + } + var flusher http.Flusher if fl, ok := w.(http.Flusher); ok { flusher = fl @@ -184,7 +195,6 @@ func NewArchivesHandler(e *Executor) *archivesHandler { } func (h *archivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - q := r.URL.Query() taskID := q.Get("taskid") @@ -215,7 +225,7 @@ func (h *archivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (h *archivesHandler) readArchive(taskID string, step int, w io.Writer) error { +func (h *archivesHandler) readArchive(taskID string, step int, w http.ResponseWriter) error { archivePath := h.e.archivePath(taskID, step) f, err := os.Open(archivePath) @@ -223,6 +233,12 @@ func (h *archivesHandler) readArchive(taskID string, step int, w io.Writer) erro return err } defer f.Close() + fi, err := f.Stat() + if err != nil { + return err + } + + w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10)) br := bufio.NewReader(f) diff --git a/internal/services/runservice/executor/executor.go b/internal/services/runservice/executor/executor.go index 358e95c..dcecc8a 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/runservice/executor/executor.go @@ -554,9 +554,13 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep, if err != nil { return -1, err } + fi, err := f.Stat() + if err != nil { + return -1, err + } // send cache archive to scheduler - if resp, err := e.runserviceClient.PutCache(ctx, key, f); err != nil { + if resp, err := e.runserviceClient.PutCache(ctx, key, fi.Size(), f); err != nil { if resp != nil && resp.StatusCode == http.StatusNotModified { return exitCode, nil } diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 5db400c..f124b65 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -23,6 +23,7 @@ import ( "net/http" "strconv" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" @@ -32,7 +33,6 @@ import ( "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/datamanager" "github.com/gorilla/mux" "github.com/pkg/errors" diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/scheduler/api/client.go index 63a22b4..a2e8955 100644 --- a/internal/services/runservice/scheduler/api/client.go +++ b/internal/services/runservice/scheduler/api/client.go @@ -51,7 +51,7 @@ func (c *Client) SetHTTPClient(client *http.Client) { c.client = client } -func (c *Client) doRequest(ctx context.Context, method, path string, query url.Values, header http.Header, ibody io.Reader) (*http.Response, error) { +func (c *Client) doRequest(ctx context.Context, method, path string, query url.Values, contentLength int64, header http.Header, ibody io.Reader) (*http.Response, error) { u, err := url.Parse(c.url + "/api/v1alpha" + path) if err != nil { return nil, err @@ -67,11 +67,15 @@ func (c *Client) doRequest(ctx context.Context, method, path string, query url.V req.Header[k] = v } + if contentLength >= 0 { + req.ContentLength = contentLength + } + return c.client.Do(req) } -func (c *Client) getResponse(ctx context.Context, method, path string, query url.Values, header http.Header, ibody io.Reader) (*http.Response, error) { - resp, err := c.doRequest(ctx, method, path, query, header, ibody) +func (c *Client) getResponse(ctx context.Context, method, path string, query url.Values, contentLength int64, header http.Header, ibody io.Reader) (*http.Response, error) { + resp, err := c.doRequest(ctx, method, path, query, contentLength, header, ibody) if err != nil { return nil, err } @@ -94,7 +98,7 @@ func (c *Client) getResponse(ctx context.Context, method, path string, query url } func (c *Client) getParsedResponse(ctx context.Context, method, path string, query url.Values, header http.Header, ibody io.Reader, obj interface{}) (*http.Response, error) { - resp, err := c.getResponse(ctx, method, path, query, header, ibody) + resp, err := c.getResponse(ctx, method, path, query, -1, header, ibody) if err != nil { return resp, err } @@ -110,7 +114,7 @@ func (c *Client) SendExecutorStatus(ctx context.Context, executor *rstypes.Execu if err != nil { return nil, err } - return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s", executor.ID), nil, jsonContent, bytes.NewReader(executorj)) + return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s", executor.ID), nil, -1, jsonContent, bytes.NewReader(executorj)) } func (c *Client) SendExecutorTaskStatus(ctx context.Context, executorID string, et *rstypes.ExecutorTask) (*http.Response, error) { @@ -118,7 +122,7 @@ func (c *Client) SendExecutorTaskStatus(ctx context.Context, executorID string, if err != nil { return nil, err } - return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s/tasks/%s", executorID, et.ID), nil, jsonContent, bytes.NewReader(etj)) + return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s/tasks/%s", executorID, et.ID), nil, -1, jsonContent, bytes.NewReader(etj)) } func (c *Client) GetExecutorTask(ctx context.Context, executorID, etID string) (*rstypes.ExecutorTask, *http.Response, error) { @@ -138,7 +142,7 @@ func (c *Client) GetArchive(ctx context.Context, taskID string, step int) (*http q.Add("taskid", taskID) q.Add("step", strconv.Itoa(step)) - return c.getResponse(ctx, "GET", "/executor/archives", q, nil, nil) + return c.getResponse(ctx, "GET", "/executor/archives", q, -1, nil, nil) } func (c *Client) CheckCache(ctx context.Context, key string, prefix bool) (*http.Response, error) { @@ -146,7 +150,7 @@ func (c *Client) CheckCache(ctx context.Context, key string, prefix bool) (*http if prefix { q.Add("prefix", "") } - return c.getResponse(ctx, "HEAD", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil) + return c.getResponse(ctx, "HEAD", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, -1, nil, nil) } func (c *Client) GetCache(ctx context.Context, key string, prefix bool) (*http.Response, error) { @@ -154,11 +158,11 @@ func (c *Client) GetCache(ctx context.Context, key string, prefix bool) (*http.R if prefix { q.Add("prefix", "") } - return c.getResponse(ctx, "GET", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil) + return c.getResponse(ctx, "GET", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, -1, nil, nil) } -func (c *Client) PutCache(ctx context.Context, key string, r io.Reader) (*http.Response, error) { - return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, nil, r) +func (c *Client) PutCache(ctx context.Context, key string, size int64, r io.Reader) (*http.Response, error) { + return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, size, nil, r) } func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { @@ -212,7 +216,7 @@ func (c *Client) CreateRun(ctx context.Context, req *RunCreateRequest) (*http.Re return nil, err } - return c.getResponse(ctx, "POST", "/runs", nil, jsonContent, bytes.NewReader(reqj)) + return c.getResponse(ctx, "POST", "/runs", nil, -1, jsonContent, bytes.NewReader(reqj)) } func (c *Client) RunActions(ctx context.Context, runID string, req *RunActionsRequest) (*http.Response, error) { @@ -220,7 +224,7 @@ func (c *Client) RunActions(ctx context.Context, runID string, req *RunActionsRe if err != nil { return nil, err } - return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/actions", runID), nil, jsonContent, bytes.NewReader(reqj)) + return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/actions", runID), nil, -1, jsonContent, bytes.NewReader(reqj)) } func (c *Client) StartRun(ctx context.Context, runID string, changeGroupsUpdateToken string) (*http.Response, error) { @@ -238,7 +242,7 @@ func (c *Client) RunTaskActions(ctx context.Context, runID, taskID string, req * if err != nil { return nil, err } - return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/tasks/%s/actions", runID, taskID), nil, jsonContent, bytes.NewReader(reqj)) + return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/tasks/%s/actions", runID, taskID), nil, -1, jsonContent, bytes.NewReader(reqj)) } func (c *Client) ApproveRunTask(ctx context.Context, runID, taskID string, approvalAnnotations map[string]string, changeGroupsUpdateToken string) (*http.Response, error) { @@ -273,5 +277,5 @@ func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, q.Add("stream", "") } - return c.getResponse(ctx, "GET", "/logs", q, nil, nil) + return c.getResponse(ctx, "GET", "/logs", q, -1, nil, nil) } diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index 43d41f3..e4cb9b7 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -410,8 +410,18 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + size := int64(-1) + sizeStr := r.Header.Get("Content-Length") + if sizeStr != "" { + size, err = strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + } + cachePath := store.OSTCachePath(key) - if err := h.ost.WriteObject(cachePath, r.Body, -1, false); err != nil { + if err := h.ost.WriteObject(cachePath, r.Body, size, false); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index d8dfa43..759d22d 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -24,6 +24,7 @@ import ( "os" "path/filepath" "sort" + "strconv" "time" scommon "github.com/sorintlab/agola/internal/common" @@ -931,7 +932,16 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(logPath, r.Body, -1, false) + size := int64(-1) + sizeStr := r.Header.Get("Content-Length") + if sizeStr != "" { + size, err = strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return errors.Errorf("failed to parse content length %q", sizeStr) + } + } + + return s.ost.WriteObject(logPath, r.Body, size, false) } func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { @@ -1073,7 +1083,16 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(path, r.Body, -1, false) + size := int64(-1) + sizeStr := r.Header.Get("Content-Length") + if sizeStr != "" { + size, err = strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return errors.Errorf("failed to parse content length %q", sizeStr) + } + } + + return s.ost.WriteObject(path, r.Body, size, false) } func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {