From 81d557d785274042fc67559eae354a0e223dd592 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 15 May 2019 09:46:21 +0200 Subject: [PATCH] runservice: add runEvents handler --- internal/services/runservice/api/api.go | 80 +++++++++++++++++++++- internal/services/runservice/api/client.go | 7 ++ internal/services/runservice/runservice.go | 3 + 3 files changed, 89 insertions(+), 1 deletion(-) diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 951fd88..410b5b0 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -36,6 +36,9 @@ import ( "github.com/gorilla/mux" "github.com/pkg/errors" + etcdclientv3 "go.etcd.io/etcd/clientv3" + etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" ) @@ -117,7 +120,6 @@ func NewLogsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjSto func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - // TODO(sgotti) Check authorized call from client q := r.URL.Query() runID := q.Get("runid") @@ -706,3 +708,79 @@ func (h *RunTaskActionsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request return } } + +type RunEventsHandler struct { + log *zap.SugaredLogger + e *etcd.Store + ost *objectstorage.ObjStorage + dm *datamanager.DataManager +} + +func NewRunEventsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *RunEventsHandler { + return &RunEventsHandler{ + log: logger.Sugar(), + e: e, + ost: ost, + dm: dm, + } +} + +// +func (h *RunEventsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + q := r.URL.Query() + + // TODO(sgotti) handle additional events filtering (by type, etc...) + startRunEventID := q.Get("startruneventid") + + if err := h.sendRunEvents(ctx, startRunEventID, w); err != nil { + h.log.Errorf("err: %+v", err) + } +} + +func (h *RunEventsHandler) sendRunEvents(ctx context.Context, startRunEventID string, w http.ResponseWriter) error { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + var flusher http.Flusher + if fl, ok := w.(http.Flusher); ok { + flusher = fl + } + + // TODO(sgotti) fetch from previous events (handle startRunEventID). + // Use the readdb instead of etcd + + wctx, cancel := context.WithCancel(ctx) + defer cancel() + wctx = etcdclientv3.WithRequireLeader(wctx) + wch := h.e.WatchKey(wctx, common.EtcdRunEventKey, 0) + for wresp := range wch { + if wresp.Canceled { + err := wresp.Err() + if err == etcdclientv3rpc.ErrCompacted { + h.log.Errorf("required events already compacted") + } + return errors.Wrapf(err, "watch error") + } + + for _, ev := range wresp.Events { + switch ev.Type { + case mvccpb.PUT: + var runEvent *types.RunEvent + if err := json.Unmarshal(ev.Kv.Value, &runEvent); err != nil { + return errors.Wrap(err, "failed to unmarshal run") + } + if _, err := w.Write([]byte(fmt.Sprintf("data: %s\n\n", ev.Kv.Value))); err != nil { + return err + } + } + } + if flusher != nil { + flusher.Flush() + } + } + + return nil +} diff --git a/internal/services/runservice/api/client.go b/internal/services/runservice/api/client.go index e2a4f16..ff44ba7 100644 --- a/internal/services/runservice/api/client.go +++ b/internal/services/runservice/api/client.go @@ -303,3 +303,10 @@ func (c *Client) GetLogs(ctx context.Context, runID, taskID string, setup bool, return c.getResponse(ctx, "GET", "/logs", q, -1, nil, nil) } + +func (c *Client) GetRunEvents(ctx context.Context, startRunEventID string) (*http.Response, error) { + q := url.Values{} + q.Add("startruneventid", startRunEventID) + + return c.getResponse(ctx, "GET", "/runs/events", q, -1, nil, nil) +} diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index 4dbf53a..5bcbab0 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -189,6 +189,8 @@ func (s *Runservice) Run(ctx context.Context) error { runsHandler := api.NewRunsHandler(logger, s.readDB) runActionsHandler := api.NewRunActionsHandler(logger, s.ah) runCreateHandler := api.NewRunCreateHandler(logger, s.ah) + runEventsHandler := api.NewRunEventsHandler(logger, s.e, s.ost, s.dm) + changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB) router := mux.NewRouter() @@ -209,6 +211,7 @@ func (s *Runservice) Run(ctx context.Context) error { apirouter.Handle("/logs", logsHandler).Methods("GET") + apirouter.Handle("/runs/events", runEventsHandler).Methods("GET") apirouter.Handle("/runs/{runid}", runHandler).Methods("GET") apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT") apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT")