*: report objects size for objectstorage.WriteObject

This commit is contained in:
Simone Gotti 2019-05-02 09:49:55 +02:00
parent 34cfdfeb3b
commit bad18bf814
8 changed files with 82 additions and 29 deletions

View File

@ -89,7 +89,7 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error {
if err != nil {
return err
}
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), -1, true); err != nil {
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err
}
@ -203,7 +203,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
pos += len(dataEntryj)
}
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, -1, true); err != nil {
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, int64(buf.Len()), true); err != nil {
return err
}
@ -211,7 +211,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
if err != nil {
return err
}
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), -1, true); err != nil {
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), int64(len(dataFileIndexj)), true); err != nil {
return err
}

View File

@ -485,7 +485,7 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti
return nil, err
}
}
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), -1, true); err != nil {
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), int64(buf.Len()), true); err != nil {
return nil, err
}
d.log.Debugf("wrote wal file: %s", walDataFilePath)
@ -629,7 +629,7 @@ func (d *DataManager) sync(ctx context.Context) error {
}
walFileCommittedPath := walFilePath + ".committed"
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), -1, true); err != nil {
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil {
return err
}
@ -659,7 +659,7 @@ func (d *DataManager) sync(ctx context.Context) error {
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("checkpointing committed wal to storage")
walFileCheckpointedPath := walFilePath + ".checkpointed"
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), -1, true); err != nil {
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), 0, true); err != nil {
return err
}
}

View File

@ -123,12 +123,23 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri
}
defer f.Close()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
br := bufio.NewReader(f)
// if not following return the Content-Length and just do io.Copy
if !follow {
fi, err := f.Stat()
if err != nil {
return err
}
w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10))
_, err = io.Copy(w, br)
return err
}
var flusher http.Flusher
if fl, ok := w.(http.Flusher); ok {
flusher = fl
@ -184,7 +195,6 @@ func NewArchivesHandler(e *Executor) *archivesHandler {
}
func (h *archivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
taskID := q.Get("taskid")
@ -215,7 +225,7 @@ func (h *archivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func (h *archivesHandler) readArchive(taskID string, step int, w io.Writer) error {
func (h *archivesHandler) readArchive(taskID string, step int, w http.ResponseWriter) error {
archivePath := h.e.archivePath(taskID, step)
f, err := os.Open(archivePath)
@ -223,6 +233,12 @@ func (h *archivesHandler) readArchive(taskID string, step int, w io.Writer) erro
return err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return err
}
w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10))
br := bufio.NewReader(f)

View File

@ -554,9 +554,13 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep,
if err != nil {
return -1, err
}
fi, err := f.Stat()
if err != nil {
return -1, err
}
// send cache archive to scheduler
if resp, err := e.runserviceClient.PutCache(ctx, key, f); err != nil {
if resp, err := e.runserviceClient.PutCache(ctx, key, fi.Size(), f); err != nil {
if resp != nil && resp.StatusCode == http.StatusNotModified {
return exitCode, nil
}

View File

@ -23,6 +23,7 @@ import (
"net/http"
"strconv"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
@ -32,7 +33,6 @@ import (
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/gorilla/mux"
"github.com/pkg/errors"

View File

@ -51,7 +51,7 @@ func (c *Client) SetHTTPClient(client *http.Client) {
c.client = client
}
func (c *Client) doRequest(ctx context.Context, method, path string, query url.Values, header http.Header, ibody io.Reader) (*http.Response, error) {
func (c *Client) doRequest(ctx context.Context, method, path string, query url.Values, contentLength int64, header http.Header, ibody io.Reader) (*http.Response, error) {
u, err := url.Parse(c.url + "/api/v1alpha" + path)
if err != nil {
return nil, err
@ -67,11 +67,15 @@ func (c *Client) doRequest(ctx context.Context, method, path string, query url.V
req.Header[k] = v
}
if contentLength >= 0 {
req.ContentLength = contentLength
}
return c.client.Do(req)
}
func (c *Client) getResponse(ctx context.Context, method, path string, query url.Values, header http.Header, ibody io.Reader) (*http.Response, error) {
resp, err := c.doRequest(ctx, method, path, query, header, ibody)
func (c *Client) getResponse(ctx context.Context, method, path string, query url.Values, contentLength int64, header http.Header, ibody io.Reader) (*http.Response, error) {
resp, err := c.doRequest(ctx, method, path, query, contentLength, header, ibody)
if err != nil {
return nil, err
}
@ -94,7 +98,7 @@ func (c *Client) getResponse(ctx context.Context, method, path string, query url
}
func (c *Client) getParsedResponse(ctx context.Context, method, path string, query url.Values, header http.Header, ibody io.Reader, obj interface{}) (*http.Response, error) {
resp, err := c.getResponse(ctx, method, path, query, header, ibody)
resp, err := c.getResponse(ctx, method, path, query, -1, header, ibody)
if err != nil {
return resp, err
}
@ -110,7 +114,7 @@ func (c *Client) SendExecutorStatus(ctx context.Context, executor *rstypes.Execu
if err != nil {
return nil, err
}
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s", executor.ID), nil, jsonContent, bytes.NewReader(executorj))
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s", executor.ID), nil, -1, jsonContent, bytes.NewReader(executorj))
}
func (c *Client) SendExecutorTaskStatus(ctx context.Context, executorID string, et *rstypes.ExecutorTask) (*http.Response, error) {
@ -118,7 +122,7 @@ func (c *Client) SendExecutorTaskStatus(ctx context.Context, executorID string,
if err != nil {
return nil, err
}
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s/tasks/%s", executorID, et.ID), nil, jsonContent, bytes.NewReader(etj))
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/%s/tasks/%s", executorID, et.ID), nil, -1, jsonContent, bytes.NewReader(etj))
}
func (c *Client) GetExecutorTask(ctx context.Context, executorID, etID string) (*rstypes.ExecutorTask, *http.Response, error) {
@ -138,7 +142,7 @@ func (c *Client) GetArchive(ctx context.Context, taskID string, step int) (*http
q.Add("taskid", taskID)
q.Add("step", strconv.Itoa(step))
return c.getResponse(ctx, "GET", "/executor/archives", q, nil, nil)
return c.getResponse(ctx, "GET", "/executor/archives", q, -1, nil, nil)
}
func (c *Client) CheckCache(ctx context.Context, key string, prefix bool) (*http.Response, error) {
@ -146,7 +150,7 @@ func (c *Client) CheckCache(ctx context.Context, key string, prefix bool) (*http
if prefix {
q.Add("prefix", "")
}
return c.getResponse(ctx, "HEAD", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil)
return c.getResponse(ctx, "HEAD", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, -1, nil, nil)
}
func (c *Client) GetCache(ctx context.Context, key string, prefix bool) (*http.Response, error) {
@ -154,11 +158,11 @@ func (c *Client) GetCache(ctx context.Context, key string, prefix bool) (*http.R
if prefix {
q.Add("prefix", "")
}
return c.getResponse(ctx, "GET", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, nil, nil)
return c.getResponse(ctx, "GET", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), q, -1, nil, nil)
}
func (c *Client) PutCache(ctx context.Context, key string, r io.Reader) (*http.Response, error) {
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, nil, r)
func (c *Client) PutCache(ctx context.Context, key string, size int64, r io.Reader) (*http.Response, error) {
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, size, nil, r)
}
func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) {
@ -212,7 +216,7 @@ func (c *Client) CreateRun(ctx context.Context, req *RunCreateRequest) (*http.Re
return nil, err
}
return c.getResponse(ctx, "POST", "/runs", nil, jsonContent, bytes.NewReader(reqj))
return c.getResponse(ctx, "POST", "/runs", nil, -1, jsonContent, bytes.NewReader(reqj))
}
func (c *Client) RunActions(ctx context.Context, runID string, req *RunActionsRequest) (*http.Response, error) {
@ -220,7 +224,7 @@ func (c *Client) RunActions(ctx context.Context, runID string, req *RunActionsRe
if err != nil {
return nil, err
}
return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/actions", runID), nil, jsonContent, bytes.NewReader(reqj))
return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/actions", runID), nil, -1, jsonContent, bytes.NewReader(reqj))
}
func (c *Client) StartRun(ctx context.Context, runID string, changeGroupsUpdateToken string) (*http.Response, error) {
@ -238,7 +242,7 @@ func (c *Client) RunTaskActions(ctx context.Context, runID, taskID string, req *
if err != nil {
return nil, err
}
return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/tasks/%s/actions", runID, taskID), nil, jsonContent, bytes.NewReader(reqj))
return c.getResponse(ctx, "PUT", fmt.Sprintf("/runs/%s/tasks/%s/actions", runID, taskID), nil, -1, jsonContent, bytes.NewReader(reqj))
}
func (c *Client) ApproveRunTask(ctx context.Context, runID, taskID string, approvalAnnotations map[string]string, changeGroupsUpdateToken string) (*http.Response, error) {
@ -273,5 +277,5 @@ func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool,
q.Add("stream", "")
}
return c.getResponse(ctx, "GET", "/logs", q, nil, nil)
return c.getResponse(ctx, "GET", "/logs", q, -1, nil, nil)
}

View File

@ -410,8 +410,18 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
size := int64(-1)
sizeStr := r.Header.Get("Content-Length")
if sizeStr != "" {
size, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
http.Error(w, "", http.StatusBadRequest)
return
}
}
cachePath := store.OSTCachePath(key)
if err := h.ost.WriteObject(cachePath, r.Body, -1, false); err != nil {
if err := h.ost.WriteObject(cachePath, r.Body, size, false); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

View File

@ -24,6 +24,7 @@ import (
"os"
"path/filepath"
"sort"
"strconv"
"time"
scommon "github.com/sorintlab/agola/internal/common"
@ -931,7 +932,16 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool,
return errors.Errorf("received http status: %d", r.StatusCode)
}
return s.ost.WriteObject(logPath, r.Body, -1, false)
size := int64(-1)
sizeStr := r.Header.Get("Content-Length")
if sizeStr != "" {
size, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return errors.Errorf("failed to parse content length %q", sizeStr)
}
}
return s.ost.WriteObject(logPath, r.Body, size, false)
}
func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error {
@ -1073,7 +1083,16 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum
return errors.Errorf("received http status: %d", r.StatusCode)
}
return s.ost.WriteObject(path, r.Body, -1, false)
size := int64(-1)
sizeStr := r.Header.Get("Content-Length")
if sizeStr != "" {
size, err = strconv.ParseInt(sizeStr, 10, 64)
if err != nil {
return errors.Errorf("failed to parse content length %q", sizeStr)
}
}
return s.ost.WriteObject(path, r.Body, size, false)
}
func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {