diff --git a/internal/services/executor/api.go b/internal/services/executor/api.go index a79ee19..8976976 100644 --- a/internal/services/executor/api.go +++ b/internal/services/executor/api.go @@ -126,18 +126,15 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - br := bufio.NewReader(f) + buf := make([]byte, 4096) - // if not following return the Content-Length and just do io.Copy + // if not following return the Content-Length if !follow { fi, err := f.Stat() if err != nil { return err } w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10)) - - _, err = io.Copy(w, br) - return err } var flusher http.Flusher @@ -150,13 +147,13 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri if stop { return nil } - data, err := br.ReadBytes('\n') + n, err := f.Read(buf) if err != nil { if err != io.EOF { return err } if !flushstop && follow { - if _, err := f.Seek(-int64(len(data)), io.SeekCurrent); err != nil { + if _, err := f.Seek(-int64(n), io.SeekCurrent); err != nil { return errors.Wrapf(err, "failed to seek in log file %q", logPath) } // check if the step is finished, if so flush until EOF and stop @@ -177,7 +174,7 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri stop = true } } - if _, err := w.Write(data); err != nil { + if _, err := w.Write(buf[:n]); err != nil { return err } if flusher != nil { diff --git a/internal/services/gateway/action/run.go b/internal/services/gateway/action/run.go index e49a82b..4fcafec 100644 --- a/internal/services/gateway/action/run.go +++ b/internal/services/gateway/action/run.go @@ -77,7 +77,6 @@ type GetLogsRequest struct { Setup bool Step int Follow bool - Stream bool } func (h *ActionHandler) GetLogs(ctx context.Context, req *GetLogsRequest) (*http.Response, error) { @@ -93,7 +92,7 @@ func (h *ActionHandler) GetLogs(ctx context.Context, req *GetLogsRequest) (*http return nil, util.NewErrForbidden(errors.Errorf("user not authorized")) } - resp, err = h.runserviceClient.GetLogs(ctx, req.RunID, req.TaskID, req.Setup, req.Step, req.Follow, req.Stream) + resp, err = h.runserviceClient.GetLogs(ctx, req.RunID, req.TaskID, req.Setup, req.Step, req.Follow) if err != nil { return nil, ErrFromRemote(resp, err) } diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index 813a3d7..e5bb9fc 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -15,7 +15,6 @@ package api import ( - "bufio" "encoding/json" "io" "net/http" @@ -511,13 +510,6 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if _, ok := q["follow"]; ok { follow = true } - stream := false - if _, ok := q["stream"]; ok { - stream = true - } - if follow { - stream = true - } areq := &action.GetLogsRequest{ RunID: runID, @@ -525,7 +517,6 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { Setup: setup, Step: step, Follow: follow, - Stream: stream, } resp, err := h.ah.GetLogs(ctx, areq) @@ -534,31 +525,19 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - if stream { - w.Header().Set("Content-Type", "text/event-stream") - } w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") defer resp.Body.Close() - if stream { - if err := sendLogs(w, resp.Body); err != nil { - h.log.Errorf("err: %+v", err) - return - } - } else { - if _, err := io.Copy(w, resp.Body); err != nil { - h.log.Errorf("err: %+v", err) - return - } + if err := sendLogs(w, resp.Body); err != nil { + h.log.Errorf("err: %+v", err) + return } } -// sendLogs is used during streaming to flush logs lines -// TODO(sgotti) there's no need to do br.ReadBytes since the response is -// already flushed by the runservice. +// sendLogs streams received logs lines and flushes them func sendLogs(w io.Writer, r io.Reader) error { - br := bufio.NewReader(r) + buf := make([]byte, 4096) var flusher http.Flusher if fl, ok := w.(http.Flusher); ok { @@ -569,17 +548,17 @@ func sendLogs(w io.Writer, r io.Reader) error { if stop { return nil } - data, err := br.ReadBytes('\n') + n, err := r.Read(buf) if err != nil { if err != io.EOF { return err } - if len(data) == 0 { + if n == 0 { return nil } stop = true } - if _, err := w.Write(data); err != nil { + if _, err := w.Write(buf[:n]); err != nil { return err } if flusher != nil { diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 410b5b0..926cd4f 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -15,7 +15,6 @@ package api import ( - "bufio" "context" "encoding/json" "fmt" @@ -158,15 +157,8 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if _, ok := q["follow"]; ok { follow = true } - stream := false - if _, ok := q["stream"]; ok { - stream = true - } - if follow { - stream = true - } - if err, sendError := h.readTaskLogs(ctx, runID, taskID, setup, step, w, follow, stream); err != nil { + if err, sendError := h.readTaskLogs(ctx, runID, taskID, setup, step, w, follow); err != nil { h.log.Errorf("err: %+v", err) if sendError { switch err.(type) { @@ -179,7 +171,7 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) { +func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow bool) (error, bool) { r, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, runID) if err != nil { return err, true @@ -212,7 +204,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se return err, true } defer f.Close() - return sendLogs(w, f, stream), false + return sendLogs(w, f), false } et, err := store.GetExecutorTask(ctx, h.e, task.ID) @@ -248,18 +240,14 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se return errors.Errorf("received http status: %d", req.StatusCode), true } - return sendLogs(w, req.Body, stream), false + return sendLogs(w, req.Body), false } -func sendLogs(w http.ResponseWriter, r io.Reader, stream bool) error { - if stream { - w.Header().Set("Content-Type", "text/event-stream") - } - +func sendLogs(w http.ResponseWriter, r io.Reader) error { w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - br := bufio.NewReader(r) + buf := make([]byte, 406) var flusher http.Flusher if fl, ok := w.(http.Flusher); ok { @@ -270,24 +258,19 @@ func sendLogs(w http.ResponseWriter, r io.Reader, stream bool) error { if stop { return nil } - data, err := br.ReadBytes('\n') + n, err := r.Read(buf) + //data, err := br.ReadBytes('\n') if err != nil { if err != io.EOF { return err } - if len(data) == 0 { + if n == 0 { return nil } stop = true } - if stream { - if _, err := w.Write([]byte(fmt.Sprintf("data: %s\n", data))); err != nil { - return err - } - } else { - if _, err := w.Write(data); err != nil { - return err - } + if _, err := w.Write(buf[:n]); err != nil { + return err } if flusher != nil { flusher.Flush() diff --git a/internal/services/runservice/api/client.go b/internal/services/runservice/api/client.go index ff44ba7..04f3fd3 100644 --- a/internal/services/runservice/api/client.go +++ b/internal/services/runservice/api/client.go @@ -285,7 +285,7 @@ func (c *Client) GetRun(ctx context.Context, runID string, changeGroups []string return runResponse, resp, err } -func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, step int, follow, stream bool) (*http.Response, error) { +func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, step int, follow bool) (*http.Response, error) { q := url.Values{} q.Add("runid", runID) q.Add("taskid", taskID) @@ -297,9 +297,6 @@ func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, if follow { q.Add("follow", "") } - if stream { - q.Add("stream", "") - } return c.getResponse(ctx, "GET", "/logs", q, -1, nil, nil) }