From 48ab496beb895574e420ae3bd819956e0727127a Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 29 Mar 2019 12:00:18 +0100 Subject: [PATCH] *: add api to query last run per group --- internal/services/gateway/api/run.go | 3 +- .../services/runservice/scheduler/api/api.go | 3 +- .../runservice/scheduler/api/client.go | 13 +- .../runservice/scheduler/readdb/readdb.go | 160 +++++++++--------- 4 files changed, 93 insertions(+), 86 deletions(-) diff --git a/internal/services/gateway/api/run.go b/internal/services/gateway/api/run.go index 199b7c8..0edbed9 100644 --- a/internal/services/gateway/api/run.go +++ b/internal/services/gateway/api/run.go @@ -302,6 +302,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { phaseFilter := q["phase"] groups := q["group"] changeGroups := q["changegroup"] + _, lastRun := q["lastrun"] limitS := q.Get("limit") limit := DefaultRunsLimit @@ -327,7 +328,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := q.Get("start") - runsResp, resp, err := h.runserviceClient.GetRuns(ctx, phaseFilter, groups, changeGroups, start, limit, asc) + runsResp, resp, err := h.runserviceClient.GetRuns(ctx, phaseFilter, groups, lastRun, changeGroups, start, limit, asc) if err != nil { if resp != nil && resp.StatusCode == http.StatusNotFound { http.Error(w, err.Error(), http.StatusNotFound) diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index bce631c..6aff66a 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -361,6 +361,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { changeGroups := query["changegroup"] groups := query["group"] + _, lastRun := query["lastrun"] limitS := query.Get("limit") limit := DefaultRunsLimit @@ -406,7 +407,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { err = h.readDB.Do(func(tx *db.Tx) error { var err error - runs, err = h.readDB.GetRuns(tx, groups, phaseFilter, start, limit, sortOrder) + runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, start, limit, sortOrder) if err != nil { h.log.Errorf("err: %+v", err) return err diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/scheduler/api/client.go index 0d0b995..856ee81 100644 --- a/internal/services/runservice/scheduler/api/client.go +++ b/internal/services/runservice/scheduler/api/client.go @@ -143,7 +143,7 @@ func (c *Client) GetArchive(ctx context.Context, taskID string, step int) (*http return c.getResponse(ctx, "GET", "/executor/archives", q, nil, nil) } -func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { +func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { q := url.Values{} for _, phase := range phaseFilter { q.Add("phase", phase) @@ -151,6 +151,9 @@ func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups, changeGroups for _, group := range groups { q.Add("group", group) } + if lastRun { + q.Add("lastrun", "") + } for _, changeGroup := range changeGroups { q.Add("changegroup", changeGroup) } @@ -170,19 +173,19 @@ func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups, changeGroups } func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{"."}, nil, start, limit, true) + return c.GetRuns(ctx, []string{"queued"}, []string{}, false, nil, start, limit, true) } func (c *Client) GetGroupQueuedRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{group}, changeGroups, "", limit, false) + return c.GetRuns(ctx, []string{"queued"}, []string{group}, false, changeGroups, "", limit, false) } func (c *Client) GetGroupRunningRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"running"}, []string{group}, changeGroups, "", limit, false) + return c.GetRuns(ctx, []string{"running"}, []string{group}, false, changeGroups, "", limit, false) } func (c *Client) GetGroupFirstQueuedRuns(ctx context.Context, group string, changeGroups []string) (*GetRunsResponse, *http.Response, error) { - return c.GetRuns(ctx, []string{"queued"}, []string{group}, changeGroups, "", 1, true) + return c.GetRuns(ctx, []string{"queued"}, []string{group}, false, changeGroups, "", 1, true) } func (c *Client) CreateRun(ctx context.Context, req *RunCreateRequest) (*http.Response, error) { diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/scheduler/readdb/readdb.go index eb75edf..6c58894 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/scheduler/readdb/readdb.go @@ -349,14 +349,14 @@ func (r *ReadDB) Run(ctx context.Context) { func (r *ReadDB) HandleEvents(ctx context.Context) error { var revision int64 - var lastRuns []*types.Run + var lastRuns []*RunData err := r.rdb.Do(func(tx *db.Tx) error { var err error revision, err = r.getRevision(tx) if err != nil { return err } - lastRuns, err = r.GetActiveRuns(tx, nil, nil, "", 1, types.SortOrderDesc) + lastRuns, err = r.GetActiveRuns(tx, nil, true, nil, "", 1, types.SortOrderDesc) return err }) if err != nil { @@ -370,7 +370,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { var lastRun *types.Run if len(lastRuns) > 0 { - lastRun = lastRuns[0] + lastRun = lastRuns[0].Run } if lastRun != nil { if runSequence == nil { @@ -685,8 +685,8 @@ func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*types return &types.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: changeGroupsRevisions}, nil } -func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { - return r.getRunsFilteredActive(tx, groups, phaseFilter, startRunID, limit, sortOrder) +func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { + return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) } func (r *ReadDB) PrefetchRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) error { @@ -712,7 +712,7 @@ func (r *ReadDB) PrefetchRuns(tx *db.Tx, groups []string, phaseFilter []types.Ru return nil } -func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { +func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { useLTS := false for _, phase := range phaseFilter { if phase == types.RunPhaseFinished { @@ -723,27 +723,44 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhas useLTS = true } - runs, err := r.getRunsFilteredActive(tx, groups, phaseFilter, startRunID, limit, sortOrder) + runDataRDB, err := r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) if err != nil { return nil, err } - if !useLTS { - return runs, err - } - - // skip if the phase requested is not finished - runsltsIDs, err := r.getRunsFilteredLTS(tx, groups, startRunID, limit, sortOrder) - if err != nil { - return nil, err - } - - runsMap := map[string]*types.Run{} - for _, r := range runs { + lastRunsMap := map[string]*RunData{} + runsMap := map[string]*RunData{} + for _, r := range runDataRDB { runsMap[r.ID] = r + lastRunsMap[r.GroupPath] = r } - for _, runID := range runsltsIDs { - if _, ok := runsMap[runID]; !ok { - runsMap[runID] = nil + + if useLTS { + // skip if the phase requested is not finished + runDataLTS, err := r.GetRunsFilteredLTS(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) + if err != nil { + return nil, err + } + + for _, rd := range runDataLTS { + if lastRun { + if lr, ok := lastRunsMap[rd.GroupPath]; ok { + switch sortOrder { + case types.SortOrderAsc: + if rd.ID < lr.ID { + lastRunsMap[rd.GroupPath] = rd + } + case types.SortOrderDesc: + if rd.ID > lr.ID { + lastRunsMap[rd.GroupPath] = rd + } + } + } else { + lastRunsMap[rd.GroupPath] = rd + runsMap[rd.ID] = rd + } + } else { + runsMap[rd.ID] = rd + } } } @@ -767,14 +784,14 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhas } count++ - run := runsMap[runID] - if run != nil { - aruns = append(aruns, run) + rd := runsMap[runID] + if rd.Run != nil { + aruns = append(aruns, rd.Run) continue } // get run from lts - run, err = store.LTSGetRun(r.wal, runID) + run, err := store.LTSGetRun(r.wal, runID) if err != nil { return nil, errors.WithStack(err) } @@ -785,13 +802,13 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhas return aruns, nil } -func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, startRunID string, limit int, sortOrder types.SortOrder, lts bool) sq.SelectBuilder { +func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, lts bool) sq.SelectBuilder { runt := "run" - runlabelt := "rungroup" + rungroupt := "rungroup" fields := []string{"data"} if lts { runt = "run_lts" - runlabelt = "rungroup_lts" + rungroupt = "rungroup_lts" fields = []string{"id"} } @@ -818,20 +835,28 @@ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []str s = s.Limit(uint64(limit)) } + s = s.Join(fmt.Sprintf("%s as rungroup on rungroup.id = run.id", rungroupt)) if len(groups) > 0 { - s = s.Join(fmt.Sprintf("%s as rungroup on rungroup.runid = run.id", runlabelt)) cond := sq.Or{} - for _, group := range groups { - cond = append(cond, sq.Eq{"rungroup.grouppath": group}) + for _, groupPath := range groups { + // add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02) + if !strings.HasSuffix(groupPath, "/") { + groupPath += "/" + } + + cond = append(cond, sq.Like{"run.grouppath": groupPath + "%"}) } s = s.Where(sq.Or{cond}) + if lastRun { + s = s.GroupBy("run.grouppath") + } } return s } -func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { - s := r.getRunsFilteredQuery(phaseFilter, groups, startRunID, limit, sortOrder, false) +func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { + s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, false) q, args, err := s.ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) @@ -842,8 +867,8 @@ func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, phaseFilter [ return fetchRuns(tx, q, args...) } -func (r *ReadDB) getRunsFilteredLTS(tx *db.Tx, groups []string, startRunID string, limit int, sortOrder types.SortOrder) ([]string, error) { - s := r.getRunsFilteredQuery(nil, groups, startRunID, limit, sortOrder, true) +func (r *ReadDB) GetRunsFilteredLTS(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { + s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, true) q, args, err := s.ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) @@ -851,7 +876,7 @@ func (r *ReadDB) getRunsFilteredLTS(tx *db.Tx, groups []string, startRunID strin return nil, errors.Wrap(err, "failed to build query") } - return fetchRunsLTS(tx, q, args...) + return fetchRuns(tx, q, args...) } func (r *ReadDB) GetRun(runID string) (*types.Run, error) { @@ -882,10 +907,17 @@ func (r *ReadDB) getRun(tx *db.Tx, runID string) (*types.Run, error) { if len(runs) == 0 { return nil, nil } - return runs[0], nil + return runs[0].Run, nil } -func fetchRuns(tx *db.Tx, q string, args ...interface{}) ([]*types.Run, error) { +type RunData struct { + ID string + GroupPath string + Phase string + Run *types.Run +} + +func fetchRuns(tx *db.Tx, q string, args ...interface{}) ([]*RunData, error) { rows, err := tx.Query(q, args...) if err != nil { return nil, err @@ -894,38 +926,23 @@ func fetchRuns(tx *db.Tx, q string, args ...interface{}) ([]*types.Run, error) { return scanRuns(rows) } -func fetchRunsLTS(tx *db.Tx, q string, args ...interface{}) ([]string, error) { - rows, err := tx.Query(q, args...) - if err != nil { - return nil, err - } - defer rows.Close() - return scanRunsLTS(rows) -} - -func scanRun(rows *sql.Rows) (*types.Run, error) { +func scanRun(rows *sql.Rows) (*RunData, error) { + r := &RunData{} var data []byte - if err := rows.Scan(&data); err != nil { + if err := rows.Scan(&r.ID, &r.GroupPath, &r.Phase, &data); err != nil { return nil, errors.Wrap(err, "failed to scan rows") } - var run *types.Run - if err := json.Unmarshal(data, &run); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal run") + if len(data) > 0 { + if err := json.Unmarshal(data, &r.Run); err != nil { + return nil, errors.Wrap(err, "failed to unmarshal run") + } } - return run, nil + return r, nil } -func scanRunLTS(rows *sql.Rows) (string, error) { - var id string - if err := rows.Scan(&id); err != nil { - return "", errors.Wrap(err, "failed to scan rows") - } - return id, nil -} - -func scanRuns(rows *sql.Rows) ([]*types.Run, error) { - runs := []*types.Run{} +func scanRuns(rows *sql.Rows) ([]*RunData, error) { + runs := []*RunData{} for rows.Next() { r, err := scanRun(rows) if err != nil { @@ -939,21 +956,6 @@ func scanRuns(rows *sql.Rows) ([]*types.Run, error) { return runs, nil } -func scanRunsLTS(rows *sql.Rows) ([]string, error) { - ids := []string{} - for rows.Next() { - r, err := scanRunLTS(rows) - if err != nil { - return nil, err - } - ids = append(ids, r) - } - if err := rows.Err(); err != nil { - return nil, err - } - return ids, nil -} - func fetchChangeGroupsRevision(tx *db.Tx, q string, args ...interface{}) (types.ChangeGroupsRevisions, error) { rows, err := tx.Query(q, args...) if err != nil {