runservice: add getruns filter by result

This commit is contained in:
Simone Gotti 2019-07-05 10:32:51 +02:00
parent 9d71f32368
commit 04ef20464d
6 changed files with 44 additions and 28 deletions

View File

@ -82,6 +82,7 @@ func (h *ActionHandler) GetRun(ctx context.Context, runID string) (*rsapi.RunRes
type GetRunsRequest struct { type GetRunsRequest struct {
PhaseFilter []string PhaseFilter []string
ResultFilter []string
Group string Group string
LastRun bool LastRun bool
ChangeGroups []string ChangeGroups []string
@ -100,7 +101,7 @@ func (h *ActionHandler) GetRuns(ctx context.Context, req *GetRunsRequest) (*rsap
} }
groups := []string{req.Group} groups := []string{req.Group}
runsResp, resp, err := h.runserviceClient.GetRuns(ctx, req.PhaseFilter, groups, req.LastRun, req.ChangeGroups, req.StartRunID, req.Limit, req.Asc) runsResp, resp, err := h.runserviceClient.GetRuns(ctx, req.PhaseFilter, req.ResultFilter, groups, req.LastRun, req.ChangeGroups, req.StartRunID, req.Limit, req.Asc)
if err != nil { if err != nil {
return nil, ErrFromRemote(resp, err) return nil, ErrFromRemote(resp, err)
} }

View File

@ -445,6 +445,7 @@ func NewRunsHandler(logger *zap.Logger, readDB *readdb.ReadDB) *RunsHandler {
func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query() query := r.URL.Query()
phaseFilter := types.RunPhaseFromStringSlice(query["phase"]) phaseFilter := types.RunPhaseFromStringSlice(query["phase"])
resultFilter := types.RunResultFromStringSlice(query["result"])
changeGroups := query["changegroup"] changeGroups := query["changegroup"]
groups := query["group"] groups := query["group"]
@ -479,7 +480,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err := h.readDB.Do(func(tx *db.Tx) error { err := h.readDB.Do(func(tx *db.Tx) error {
var err error var err error
runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, start, limit, sortOrder) runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, resultFilter, start, limit, sortOrder)
if err != nil { if err != nil {
h.log.Errorf("err: %+v", err) h.log.Errorf("err: %+v", err)
return err return err

View File

@ -165,11 +165,14 @@ func (c *Client) PutCache(ctx context.Context, key string, size int64, r io.Read
return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, size, nil, r) return c.getResponse(ctx, "POST", fmt.Sprintf("/executor/caches/%s", url.PathEscape(key)), nil, size, nil, r)
} }
func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetRuns(ctx context.Context, phaseFilter, resultFilter, groups []string, lastRun bool, changeGroups []string, start string, limit int, asc bool) (*GetRunsResponse, *http.Response, error) {
q := url.Values{} q := url.Values{}
for _, phase := range phaseFilter { for _, phase := range phaseFilter {
q.Add("phase", phase) q.Add("phase", phase)
} }
for _, result := range resultFilter {
q.Add("result", result)
}
for _, group := range groups { for _, group := range groups {
q.Add("group", group) q.Add("group", group)
} }
@ -195,27 +198,27 @@ func (c *Client) GetRuns(ctx context.Context, phaseFilter, groups []string, last
} }
func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetQueuedRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) {
return c.GetRuns(ctx, []string{"queued"}, []string{}, false, changeGroups, start, limit, true) return c.GetRuns(ctx, []string{"queued"}, nil, []string{}, false, changeGroups, start, limit, true)
} }
func (c *Client) GetRunningRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetRunningRuns(ctx context.Context, start string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) {
return c.GetRuns(ctx, []string{"running"}, []string{}, false, changeGroups, start, limit, true) return c.GetRuns(ctx, []string{"running"}, nil, []string{}, false, changeGroups, start, limit, true)
} }
func (c *Client) GetGroupQueuedRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetGroupQueuedRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) {
return c.GetRuns(ctx, []string{"queued"}, []string{group}, false, changeGroups, "", limit, false) return c.GetRuns(ctx, []string{"queued"}, nil, []string{group}, false, changeGroups, "", limit, false)
} }
func (c *Client) GetGroupRunningRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetGroupRunningRuns(ctx context.Context, group string, limit int, changeGroups []string) (*GetRunsResponse, *http.Response, error) {
return c.GetRuns(ctx, []string{"running"}, []string{group}, false, changeGroups, "", limit, false) return c.GetRuns(ctx, []string{"running"}, nil, []string{group}, false, changeGroups, "", limit, false)
} }
func (c *Client) GetGroupFirstQueuedRuns(ctx context.Context, group string, changeGroups []string) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetGroupFirstQueuedRuns(ctx context.Context, group string, changeGroups []string) (*GetRunsResponse, *http.Response, error) {
return c.GetRuns(ctx, []string{"queued"}, []string{group}, false, changeGroups, "", 1, true) return c.GetRuns(ctx, []string{"queued"}, nil, []string{group}, false, changeGroups, "", 1, true)
} }
func (c *Client) GetGroupLastRun(ctx context.Context, group string, changeGroups []string) (*GetRunsResponse, *http.Response, error) { func (c *Client) GetGroupLastRun(ctx context.Context, group string, changeGroups []string) (*GetRunsResponse, *http.Response, error) {
return c.GetRuns(ctx, nil, []string{group}, false, changeGroups, "", 1, false) return c.GetRuns(ctx, nil, nil, []string{group}, false, changeGroups, "", 1, false)
} }
func (c *Client) CreateRun(ctx context.Context, req *RunCreateRequest) (*RunResponse, *http.Response, error) { func (c *Client) CreateRun(ctx context.Context, req *RunCreateRequest) (*RunResponse, *http.Response, error) {

View File

@ -18,7 +18,7 @@ var Stmts = []string{
// last processed etcd event revision // last processed etcd event revision
"create table revision (revision bigint, PRIMARY KEY(revision))", "create table revision (revision bigint, PRIMARY KEY(revision))",
"create table run (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", "create table run (id varchar, grouppath varchar, phase varchar, result varchar, PRIMARY KEY (id, grouppath, phase))",
"create table rundata (id varchar, data bytea, PRIMARY KEY (id))", "create table rundata (id varchar, data bytea, PRIMARY KEY (id))",
@ -35,7 +35,7 @@ var Stmts = []string{
"create table changegrouprevision_ost (id varchar, revision varchar, PRIMARY KEY (id, revision))", "create table changegrouprevision_ost (id varchar, revision varchar, PRIMARY KEY (id, revision))",
"create table run_ost (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", "create table run_ost (id varchar, grouppath varchar, phase varchar, result varchar, PRIMARY KEY (id, grouppath, phase))",
"create table rundata_ost (id varchar, data bytea, PRIMARY KEY (id))", "create table rundata_ost (id varchar, data bytea, PRIMARY KEY (id))",

View File

@ -59,8 +59,8 @@ var (
revisionSelect = sb.Select("revision").From("revision") revisionSelect = sb.Select("revision").From("revision")
revisionInsert = sb.Insert("revision").Columns("revision") revisionInsert = sb.Insert("revision").Columns("revision")
//runSelect = sb.Select("id", "grouppath", "phase").From("run") //runSelect = sb.Select("id", "grouppath", "phase", "result").From("run")
runInsert = sb.Insert("run").Columns("id", "grouppath", "phase") runInsert = sb.Insert("run").Columns("id", "grouppath", "phase", "result")
rundataInsert = sb.Insert("rundata").Columns("id", "data") rundataInsert = sb.Insert("rundata").Columns("id", "data")
@ -74,8 +74,8 @@ var (
//revisionOSTSelect = sb.Select("revision").From("revision_ost") //revisionOSTSelect = sb.Select("revision").From("revision_ost")
revisionOSTInsert = sb.Insert("revision_ost").Columns("revision") revisionOSTInsert = sb.Insert("revision_ost").Columns("revision")
//runOSTSelect = sb.Select("id", "grouppath", "phase").From("run_ost") //runOSTSelect = sb.Select("id", "grouppath", "phase", "result").From("run_ost")
runOSTInsert = sb.Insert("run_ost").Columns("id", "grouppath", "phase") runOSTInsert = sb.Insert("run_ost").Columns("id", "grouppath", "phase", "result")
rundataOSTInsert = sb.Insert("rundata_ost").Columns("id", "data") rundataOSTInsert = sb.Insert("rundata_ost").Columns("id", "data")
@ -339,7 +339,7 @@ func (r *ReadDB) handleEvents(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
lastRuns, err = r.GetActiveRuns(tx, nil, true, nil, "", 1, types.SortOrderDesc) lastRuns, err = r.GetActiveRuns(tx, nil, true, nil, nil, "", 1, types.SortOrderDesc)
return err return err
}) })
if err != nil { if err != nil {
@ -995,7 +995,7 @@ func insertRun(tx *db.Tx, run *types.Run, data []byte) error {
if _, err := tx.Exec("delete from run where id = $1", run.ID); err != nil { if _, err := tx.Exec("delete from run where id = $1", run.ID); err != nil {
return errors.Errorf("failed to delete run: %w", err) return errors.Errorf("failed to delete run: %w", err)
} }
q, args, err := runInsert.Values(run.ID, groupPath, run.Phase).ToSql() q, args, err := runInsert.Values(run.ID, groupPath, run.Phase, run.Result).ToSql()
if err != nil { if err != nil {
return errors.Errorf("failed to build query: %w", err) return errors.Errorf("failed to build query: %w", err)
} }
@ -1029,7 +1029,7 @@ func (r *ReadDB) insertRunOST(tx *db.Tx, run *types.Run, data []byte) error {
if _, err := tx.Exec("delete from run_ost where id = $1", run.ID); err != nil { if _, err := tx.Exec("delete from run_ost where id = $1", run.ID); err != nil {
return errors.Errorf("failed to delete run objectstorage: %w", err) return errors.Errorf("failed to delete run objectstorage: %w", err)
} }
q, args, err := runOSTInsert.Values(run.ID, groupPath, run.Phase).ToSql() q, args, err := runOSTInsert.Values(run.ID, groupPath, run.Phase, run.Result).ToSql()
if err != nil { if err != nil {
return errors.Errorf("failed to build query: %w", err) return errors.Errorf("failed to build query: %w", err)
} }
@ -1121,11 +1121,11 @@ func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*types
return &types.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: changeGroupsRevisions}, nil return &types.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: changeGroupsRevisions}, nil
} }
func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) {
return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, resultFilter, startRunID, limit, sortOrder)
} }
func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) {
useObjectStorage := false useObjectStorage := false
for _, phase := range phaseFilter { for _, phase := range phaseFilter {
if phase == types.RunPhaseFinished || phase == types.RunPhaseCancelled { if phase == types.RunPhaseFinished || phase == types.RunPhaseCancelled {
@ -1136,7 +1136,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [
useObjectStorage = true useObjectStorage = true
} }
runDataRDB, err := r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) runDataRDB, err := r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, resultFilter, startRunID, limit, sortOrder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1149,7 +1149,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [
if useObjectStorage { if useObjectStorage {
// skip if the phase requested is not finished // skip if the phase requested is not finished
runDataOST, err := r.GetRunsFilteredOST(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) runDataOST, err := r.GetRunsFilteredOST(tx, groups, lastRun, phaseFilter, resultFilter, startRunID, limit, sortOrder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1215,7 +1215,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [
return aruns, nil return aruns, nil
} }
func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, objectstorage bool) sq.SelectBuilder { func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, resultFilter []types.RunResult, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, objectstorage bool) sq.SelectBuilder {
runt := "run" runt := "run"
rundatat := "rundata" rundatat := "rundata"
fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"} fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"}
@ -1238,6 +1238,9 @@ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []str
if len(phaseFilter) > 0 { if len(phaseFilter) > 0 {
s = s.Where(sq.Eq{"phase": phaseFilter}) s = s.Where(sq.Eq{"phase": phaseFilter})
} }
if len(resultFilter) > 0 {
s = s.Where(sq.Eq{"result": resultFilter})
}
if startRunID != "" { if startRunID != "" {
if lastRun { if lastRun {
switch sortOrder { switch sortOrder {
@ -1279,8 +1282,8 @@ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []str
return s return s
} }
func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) {
s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, false) s := r.getRunsFilteredQuery(phaseFilter, resultFilter, groups, lastRun, startRunID, limit, sortOrder, false)
q, args, err := s.ToSql() q, args, err := s.ToSql()
r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
@ -1291,8 +1294,8 @@ func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool,
return fetchRuns(tx, q, args...) return fetchRuns(tx, q, args...)
} }
func (r *ReadDB) GetRunsFilteredOST(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { func (r *ReadDB) GetRunsFilteredOST(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, resultFilter []types.RunResult, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) {
s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, true) s := r.getRunsFilteredQuery(phaseFilter, resultFilter, groups, lastRun, startRunID, limit, sortOrder, true)
q, args, err := s.ToSql() q, args, err := s.ToSql()
r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) r.log.Debugf("q: %s, args: %s", q, util.Dump(args))

View File

@ -81,6 +81,14 @@ func RunPhaseFromStringSlice(slice []string) []RunPhase {
return rss return rss
} }
func RunResultFromStringSlice(slice []string) []RunResult {
rss := make([]RunResult, len(slice))
for i, s := range slice {
rss[i] = RunResult(s)
}
return rss
}
// Run is the run status of a RUN. Until the run is not finished it'll live in // Run is the run status of a RUN. Until the run is not finished it'll live in
// etcd. So we should keep it smaller to avoid using too much space // etcd. So we should keep it smaller to avoid using too much space
type Run struct { type Run struct {