*: remove server sent events from logs handlers

Just use basic http streaming and send all the data as it's available without
splitting by new lines
This commit is contained in:
Simone Gotti 2019-05-19 14:35:04 +02:00
parent b5f2281d07
commit 0e10a406f9
5 changed files with 26 additions and 71 deletions

View File

@ -126,18 +126,15 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
br := bufio.NewReader(f) buf := make([]byte, 4096)
// if not following return the Content-Length and just do io.Copy // if not following return the Content-Length
if !follow { if !follow {
fi, err := f.Stat() fi, err := f.Stat()
if err != nil { if err != nil {
return err return err
} }
w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10)) w.Header().Set("Content-Length", strconv.FormatInt(fi.Size(), 10))
_, err = io.Copy(w, br)
return err
} }
var flusher http.Flusher var flusher http.Flusher
@ -150,13 +147,13 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri
if stop { if stop {
return nil return nil
} }
data, err := br.ReadBytes('\n') n, err := f.Read(buf)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
return err return err
} }
if !flushstop && follow { if !flushstop && follow {
if _, err := f.Seek(-int64(len(data)), io.SeekCurrent); err != nil { if _, err := f.Seek(-int64(n), io.SeekCurrent); err != nil {
return errors.Wrapf(err, "failed to seek in log file %q", logPath) return errors.Wrapf(err, "failed to seek in log file %q", logPath)
} }
// check if the step is finished, if so flush until EOF and stop // check if the step is finished, if so flush until EOF and stop
@ -177,7 +174,7 @@ func (h *logsHandler) readLogs(taskID string, setup bool, step int, logPath stri
stop = true stop = true
} }
} }
if _, err := w.Write(data); err != nil { if _, err := w.Write(buf[:n]); err != nil {
return err return err
} }
if flusher != nil { if flusher != nil {

View File

@ -77,7 +77,6 @@ type GetLogsRequest struct {
Setup bool Setup bool
Step int Step int
Follow bool Follow bool
Stream bool
} }
func (h *ActionHandler) GetLogs(ctx context.Context, req *GetLogsRequest) (*http.Response, error) { func (h *ActionHandler) GetLogs(ctx context.Context, req *GetLogsRequest) (*http.Response, error) {
@ -93,7 +92,7 @@ func (h *ActionHandler) GetLogs(ctx context.Context, req *GetLogsRequest) (*http
return nil, util.NewErrForbidden(errors.Errorf("user not authorized")) return nil, util.NewErrForbidden(errors.Errorf("user not authorized"))
} }
resp, err = h.runserviceClient.GetLogs(ctx, req.RunID, req.TaskID, req.Setup, req.Step, req.Follow, req.Stream) resp, err = h.runserviceClient.GetLogs(ctx, req.RunID, req.TaskID, req.Setup, req.Step, req.Follow)
if err != nil { if err != nil {
return nil, ErrFromRemote(resp, err) return nil, ErrFromRemote(resp, err)
} }

View File

@ -15,7 +15,6 @@
package api package api
import ( import (
"bufio"
"encoding/json" "encoding/json"
"io" "io"
"net/http" "net/http"
@ -511,13 +510,6 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if _, ok := q["follow"]; ok { if _, ok := q["follow"]; ok {
follow = true follow = true
} }
stream := false
if _, ok := q["stream"]; ok {
stream = true
}
if follow {
stream = true
}
areq := &action.GetLogsRequest{ areq := &action.GetLogsRequest{
RunID: runID, RunID: runID,
@ -525,7 +517,6 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Setup: setup, Setup: setup,
Step: step, Step: step,
Follow: follow, Follow: follow,
Stream: stream,
} }
resp, err := h.ah.GetLogs(ctx, areq) resp, err := h.ah.GetLogs(ctx, areq)
@ -534,31 +525,19 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
if stream {
w.Header().Set("Content-Type", "text/event-stream")
}
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
defer resp.Body.Close() defer resp.Body.Close()
if stream {
if err := sendLogs(w, resp.Body); err != nil { if err := sendLogs(w, resp.Body); err != nil {
h.log.Errorf("err: %+v", err) h.log.Errorf("err: %+v", err)
return return
} }
} else {
if _, err := io.Copy(w, resp.Body); err != nil {
h.log.Errorf("err: %+v", err)
return
}
}
} }
// sendLogs is used during streaming to flush logs lines // sendLogs streams received logs lines and flushes them
// TODO(sgotti) there's no need to do br.ReadBytes since the response is
// already flushed by the runservice.
func sendLogs(w io.Writer, r io.Reader) error { func sendLogs(w io.Writer, r io.Reader) error {
br := bufio.NewReader(r) buf := make([]byte, 4096)
var flusher http.Flusher var flusher http.Flusher
if fl, ok := w.(http.Flusher); ok { if fl, ok := w.(http.Flusher); ok {
@ -569,17 +548,17 @@ func sendLogs(w io.Writer, r io.Reader) error {
if stop { if stop {
return nil return nil
} }
data, err := br.ReadBytes('\n') n, err := r.Read(buf)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
return err return err
} }
if len(data) == 0 { if n == 0 {
return nil return nil
} }
stop = true stop = true
} }
if _, err := w.Write(data); err != nil { if _, err := w.Write(buf[:n]); err != nil {
return err return err
} }
if flusher != nil { if flusher != nil {

View File

@ -15,7 +15,6 @@
package api package api
import ( import (
"bufio"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -158,15 +157,8 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if _, ok := q["follow"]; ok { if _, ok := q["follow"]; ok {
follow = true follow = true
} }
stream := false
if _, ok := q["stream"]; ok {
stream = true
}
if follow {
stream = true
}
if err, sendError := h.readTaskLogs(ctx, runID, taskID, setup, step, w, follow, stream); err != nil { if err, sendError := h.readTaskLogs(ctx, runID, taskID, setup, step, w, follow); err != nil {
h.log.Errorf("err: %+v", err) h.log.Errorf("err: %+v", err)
if sendError { if sendError {
switch err.(type) { switch err.(type) {
@ -179,7 +171,7 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) { func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow bool) (error, bool) {
r, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, runID) r, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, runID)
if err != nil { if err != nil {
return err, true return err, true
@ -212,7 +204,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se
return err, true return err, true
} }
defer f.Close() defer f.Close()
return sendLogs(w, f, stream), false return sendLogs(w, f), false
} }
et, err := store.GetExecutorTask(ctx, h.e, task.ID) et, err := store.GetExecutorTask(ctx, h.e, task.ID)
@ -248,18 +240,14 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se
return errors.Errorf("received http status: %d", req.StatusCode), true return errors.Errorf("received http status: %d", req.StatusCode), true
} }
return sendLogs(w, req.Body, stream), false return sendLogs(w, req.Body), false
} }
func sendLogs(w http.ResponseWriter, r io.Reader, stream bool) error { func sendLogs(w http.ResponseWriter, r io.Reader) error {
if stream {
w.Header().Set("Content-Type", "text/event-stream")
}
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
br := bufio.NewReader(r) buf := make([]byte, 406)
var flusher http.Flusher var flusher http.Flusher
if fl, ok := w.(http.Flusher); ok { if fl, ok := w.(http.Flusher); ok {
@ -270,25 +258,20 @@ func sendLogs(w http.ResponseWriter, r io.Reader, stream bool) error {
if stop { if stop {
return nil return nil
} }
data, err := br.ReadBytes('\n') n, err := r.Read(buf)
//data, err := br.ReadBytes('\n')
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
return err return err
} }
if len(data) == 0 { if n == 0 {
return nil return nil
} }
stop = true stop = true
} }
if stream { if _, err := w.Write(buf[:n]); err != nil {
if _, err := w.Write([]byte(fmt.Sprintf("data: %s\n", data))); err != nil {
return err return err
} }
} else {
if _, err := w.Write(data); err != nil {
return err
}
}
if flusher != nil { if flusher != nil {
flusher.Flush() flusher.Flush()
} }

View File

@ -285,7 +285,7 @@ func (c *Client) GetRun(ctx context.Context, runID string, changeGroups []string
return runResponse, resp, err return runResponse, resp, err
} }
func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, step int, follow, stream bool) (*http.Response, error) { func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, step int, follow bool) (*http.Response, error) {
q := url.Values{} q := url.Values{}
q.Add("runid", runID) q.Add("runid", runID)
q.Add("taskid", taskID) q.Add("taskid", taskID)
@ -297,9 +297,6 @@ func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool,
if follow { if follow {
q.Add("follow", "") q.Add("follow", "")
} }
if stream {
q.Add("stream", "")
}
return c.getResponse(ctx, "GET", "/logs", q, -1, nil, nil) return c.getResponse(ctx, "GET", "/logs", q, -1, nil, nil)
} }