diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 6aff66a..96deaaf 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -390,22 +390,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var runs []*types.Run var cgt *types.ChangeGroupsUpdateToken - if len(groups) == 0 { - groups = []string{"."} - } err := h.readDB.Do(func(tx *db.Tx) error { - if err := h.readDB.PrefetchRuns(tx, groups, phaseFilter, start, limit, sortOrder); err != nil { - h.log.Errorf("err: %+v", err) - return err - } - return nil - }) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - err = h.readDB.Do(func(tx *db.Tx) error { var err error runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, start, limit, sortOrder) if err != nil { diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/scheduler/common/common.go index 874d6b0..91b7f1f 100644 --- a/internal/services/runservice/scheduler/common/common.go +++ b/internal/services/runservice/scheduler/common/common.go @@ -43,6 +43,8 @@ var ( EtcdExecutorsDir = "executors" EtcdTasksDir = "tasks" + + EtcdLastIndexKey = "lastindex" ) func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } diff --git a/internal/services/runservice/scheduler/readdb/create.go b/internal/services/runservice/scheduler/readdb/create.go index dd059c6..6a95fdf 100644 --- a/internal/services/runservice/scheduler/readdb/create.go +++ b/internal/services/runservice/scheduler/readdb/create.go @@ -19,12 +19,9 @@ var Stmts = []string{ //"create table revision (clusterid varchar, revision bigint, PRIMARY KEY(revision))", "create table revision (revision bigint, PRIMARY KEY(revision))", - "create table run (id varchar, data bytea, phase varchar, PRIMARY KEY (id))", - "create index run_phase on run(phase)", + "create table run (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", - // rungroup stores the groups associated to a run - "create table rungroup (runid varchar, grouppath varchar, PRIMARY KEY (runid, grouppath), FOREIGN KEY(runid) REFERENCES run(id) ON DELETE CASCADE)", - "create index rungroup_grouppath on rungroup(grouppath)", + "create table rundata (id varchar, data bytea, PRIMARY KEY (id))", "create table runevent (sequence varchar, data bytea, PRIMARY KEY (sequence))", @@ -32,10 +29,12 @@ var Stmts = []string{ "create table changegrouprevision (id varchar, revision varchar, PRIMARY KEY (id, revision))", // LTS - "create table run_lts (id varchar, data bytea, phase varchar, PRIMARY KEY (id))", - "create index run_lts_phase on run_lts(phase)", + "create table revision_lts (revision bigint, PRIMARY KEY(revision))", - // rungroup stores the groups associated to a run - "create table rungroup_lts (runid varchar, grouppath varchar, PRIMARY KEY (runid, grouppath), FOREIGN KEY(runid) REFERENCES run_lts(id) ON DELETE CASCADE)", - "create index rungroup_lts_grouppath on rungroup_lts(grouppath)", + // committedwalsequence stores the last committed wal sequence + "create table committedwalsequence_lts (seq varchar, PRIMARY KEY (seq))", + + "create table run_lts (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", + + "create table rundata_lts (id varchar, data bytea, PRIMARY KEY (id))", } diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/scheduler/readdb/readdb.go index 6c58894..2cdf965 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/scheduler/readdb/readdb.go @@ -19,6 +19,7 @@ import ( "database/sql" "encoding/json" "fmt" + "io" "os" "path" "path/filepath" @@ -46,7 +47,7 @@ import ( ) const ( - MaxFetchSize = 25 + paginationSize = 100 ) var ( @@ -56,11 +57,10 @@ var ( revisionSelect = sb.Select("revision").From("revision") revisionInsert = sb.Insert("revision").Columns("revision") - runSelect = sb.Select("data").From("run") - runInsert = sb.Insert("run").Columns("id", "data", "phase") + runSelect = sb.Select("id", "grouppath", "phase").From("run") + runInsert = sb.Insert("run").Columns("id", "grouppath", "phase") - rungroupSelect = sb.Select("runid", "grouppath").From("rungroup") - rungroupInsert = sb.Insert("rungroup").Columns("runid", "grouppath") + rundataInsert = sb.Insert("rundata").Columns("id", "data") runeventSelect = sb.Select("data").From("runevent") runeventInsert = sb.Insert("runevent").Columns("sequence", "data") @@ -68,11 +68,18 @@ var ( changegrouprevisionSelect = sb.Select("id, revision").From("changegrouprevision") changegrouprevisionInsert = sb.Insert("changegrouprevision").Columns("id", "revision") - runLTSSelect = sb.Select("id").From("run_lts") - runLTSInsert = sb.Insert("run_lts").Columns("id", "data", "phase") + // LTS - rungroupLTSSelect = sb.Select("runid", "grouppath").From("rungroup_lts") - rungroupLTSInsert = sb.Insert("rungroup_lts").Columns("runid", "grouppath") + revisionLTSSelect = sb.Select("revision").From("revision_lts") + revisionLTSInsert = sb.Insert("revision_lts").Columns("revision") + + committedwalsequenceLTSSelect = sb.Select("seq").From("committedwalsequence_lts") + committedwalsequenceLTSInsert = sb.Insert("committedwalsequence_lts").Columns("seq") + + runLTSSelect = sb.Select("id", "grouppath", "phase").From("run_lts") + runLTSInsert = sb.Insert("run_lts").Columns("id", "grouppath", "phase") + + rundataLTSInsert = sb.Insert("rundata_lts").Columns("id", "data") ) type ReadDB struct { @@ -108,26 +115,27 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. rdb: rdb, } - revision, err := readDB.GetRevision() - if err != nil { - return nil, err - } - - if revision == 0 { - if err := readDB.Initialize(ctx); err != nil { - return nil, err - } - } - - readDB.Initialized = true - return readDB, nil } // Initialize populates the readdb with the current etcd data and save the // revision to then feed it with the etcd events func (r *ReadDB) Initialize(ctx context.Context) error { - r.log.Infof("initialize") + if err := r.ResetDB(); err != nil { + return errors.Wrapf(err, "failed to reset db") + } + revision, err := r.SyncLTS(ctx) + if err != nil { + return errors.Wrapf(err, "error syncing lts db") + } + if err := r.SyncRDB(ctx, revision); err != nil { + return errors.Wrapf(err, "error syncing run db") + } + return nil +} + +func (r *ReadDB) ResetDB() error { + // TODO(sgotti) this needs to be protected by a mutex r.rdb.Close() // drop rdb @@ -147,30 +155,17 @@ func (r *ReadDB) Initialize(ctx context.Context) error { r.rdb = rdb - // then sync the rdb - for { - if err := r.SyncRDB(ctx); err != nil { - r.log.Errorf("error syncing run db: %+v, retrying", err) - } else { - break - } - time.Sleep(2 * time.Second) - } - - r.Initialized = true - return nil } -func (r *ReadDB) SyncRDB(ctx context.Context) error { +func (r *ReadDB) SyncRDB(ctx context.Context, revision int64) error { err := r.rdb.Do(func(tx *db.Tx) error { // Do pagination to limit the number of keys per request - var revision int64 key := common.EtcdRunsDir var continuation *etcd.ListPagedContinuation for { - listResp, err := r.e.ListPaged(ctx, key, 0, 10, continuation) + listResp, err := r.e.ListPaged(ctx, key, revision, paginationSize, continuation) if err != nil { return err } @@ -188,7 +183,6 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { if err := json.Unmarshal(kv.Value, &run); err != nil { return err } - run.Revision = kv.ModRevision if err := insertRun(tx, run, kv.Value); err != nil { return err @@ -204,7 +198,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { key = common.EtcdChangeGroupsDir continuation = nil for { - listResp, err := r.e.ListPaged(ctx, key, revision, 10, continuation) + listResp, err := r.e.ListPaged(ctx, key, revision, paginationSize, continuation) if err != nil { return err } @@ -219,128 +213,174 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { } } - if err := insertRevision(tx, revision); err != nil { - return err - } - if !listResp.HasMore { break } } + if err := insertRevision(tx, revision); err != nil { + return err + } + return nil }) return err } -func (r *ReadDB) SyncLTSRuns(tx *db.Tx, groupID, startRunID string, limit int, sortOrder types.SortOrder) error { - doneCh := make(chan struct{}) - defer close(doneCh) - - //q, args, err := rungroupSelect.Where(sq.Eq{"grouppath": groupID}).Limit(1).ToSql() - //r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) - //if err != nil { - // return errors.Wrap(err, "failed to build query") - //} - //hasRow := false - //err = tx.Do(func(tx *db.Tx) error { - // rows, err := tx.Query(q, args...) - // if err != nil { - // return err - // } - // defer rows.Close() - - // for rows.Next() { - // hasRow = true - // break - // } - // if err := rows.Err(); err != nil { - // return err - // } - // return nil - //}) - - //// this means that this rungroup is in sync - //if hasRow { - // return nil - //} - - insertfunc := func(runs []*types.Run) error { - for _, run := range runs { - if err := r.insertRunLTS(tx, run, []byte{}); err != nil { - return err - } - } - return nil +func (r *ReadDB) SyncLTS(ctx context.Context) (int64, error) { + type indexData struct { + ID string + Phase types.RunPhase + Group string } - runs := []*types.Run{} - count := 0 - var start string - if startRunID != "" { - start = store.LTSIndexRunIDOrderPath(groupID, startRunID, sortOrder) - } - for object := range r.wal.List(store.LTSIndexRunIDOrderDir(groupID, sortOrder), start, true, doneCh) { - //r.log.Infof("path: %q", object.Path) - if object.Err != nil { - if object.Err == objectstorage.ErrNotExist { - break - } - return object.Err - } + insertfunc := func(objs []string) error { + err := r.rdb.Do(func(tx *db.Tx) error { + for _, obj := range objs { + f, _, err := r.wal.ReadObject(obj, nil) + if err != nil { + if err == objectstorage.ErrNotExist { + r.log.Warnf("object %s disappeared, ignoring", obj) + } + return err + } - runObj := common.StorageRunFile(path.Base(object.Path)) - f, _, err := r.wal.ReadObject(runObj, nil) - if err != nil && err != objectstorage.ErrNotExist { - return err - } - if err != objectstorage.ErrNotExist { - var run *types.Run - e := json.NewDecoder(f) - if err := e.Decode(&run); err != nil { + dec := json.NewDecoder(f) + for { + var id *indexData + + err := dec.Decode(&id) + if err == io.EOF { + // all done + break + } + if err != nil { + f.Close() + return err + } + + run := &types.Run{ + ID: id.ID, + Group: id.Group, + Phase: id.Phase, + } + if err := r.insertRunLTS(tx, run, []byte{}); err != nil { + f.Close() + return err + } + } f.Close() - return err } - f.Close() - - runs = append(runs, run) - } - - if count > 100 { - if err := insertfunc(runs); err != nil { - return err - } - count = 0 - runs = []*types.Run{} - } else { - count++ - } - if count > limit { - break - } - } - if err := insertfunc(runs); err != nil { + return nil + }) return err } - return nil + resp, err := r.e.Get(ctx, common.EtcdLastIndexKey, 0) + if err != nil { + return 0, err + } + indexDir := string(resp.Kvs[0].Value) + indexRevision := resp.Kvs[0].ModRevision + revision := resp.Header.Revision + + // TODO(sgotti) wait for wal changes to be at a revision >= revision + walChangesRevision, err := r.wal.ChangesCurrentRevision() + if err != nil { + return 0, err + } + if walChangesRevision < indexRevision { + return 0, errors.Errorf("wal changes revision %q is lower than index revision %q", walChangesRevision, revision) + } + + r.log.Infof("indexDir: %s", indexDir) + + objs := []string{} + count := 0 + + doneCh := make(chan struct{}) + defer close(doneCh) + for object := range r.wal.List(path.Join(common.StorageRunsIndexesDir, indexDir)+"/", "", true, doneCh) { + if object.Err != nil { + return 0, object.Err + } + r.log.Infof("path: %s", object.Path) + + objs = append(objs, object.Path) + + if count > paginationSize { + if err := insertfunc(objs); err != nil { + return 0, err + } + count = 0 + objs = []string{} + } else { + count++ + } + } + if err := insertfunc(objs); err != nil { + return 0, err + } + + resp, err = r.e.Get(ctx, common.EtcdLastIndexKey, 0) + if err != nil { + return 0, err + } + curIndexDir := string(resp.Kvs[0].Value) + + if curIndexDir != indexDir { + return 0, errors.Errorf("index dir has changed, used %s, current: %s", indexDir, curIndexDir) + } + + return revision, nil } -func (r *ReadDB) Run(ctx context.Context) { +func (r *ReadDB) Run(ctx context.Context) error { + revision, err := r.GetRevision() + if err != nil { + return err + } + + if revision == 0 { + for { + err := r.Initialize(ctx) + if err == nil { + break + } + r.log.Errorf("initialize err: %+v", err) + + time.Sleep(1 * time.Second) + } + } + r.Initialized = true + for { + for { + initialized := r.Initialized + if initialized { + break + } + err := r.Initialize(ctx) + if err == nil { + r.Initialized = true + break + } + r.log.Errorf("initialize err: %+v", err) + + time.Sleep(1 * time.Second) + } + + r.log.Infof("starting HandleEvents") if err := r.HandleEvents(ctx); err != nil { r.log.Errorf("handleevents err: %+v", err) } - if !r.Initialized { - r.Initialize(ctx) - } select { case <-ctx.Done(): r.log.Infof("readdb exiting") - return - default: + r.rdb.Close() + return nil } time.Sleep(1 * time.Second) @@ -407,7 +447,6 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { // a single transaction for every response (every response contains all the // events happened in an etcd revision). err = r.rdb.Do(func(tx *db.Tx) error { - for _, ev := range wresp.Events { if err := r.handleEvent(tx, ev, &wresp); err != nil { return err @@ -452,7 +491,6 @@ func (r *ReadDB) handleRunEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdcl if err := json.Unmarshal(ev.Kv.Value, &run); err != nil { return errors.Wrap(err, "failed to unmarshal run") } - run.Revision = ev.Kv.ModRevision return insertRun(tx, run, ev.Kv.Value) @@ -521,7 +559,7 @@ func (r *ReadDB) Do(f func(tx *db.Tx) error) error { func insertRevision(tx *db.Tx, revision int64) error { // poor man insert or update that works because transaction isolation level is serializable if _, err := tx.Exec("delete from revision"); err != nil { - return errors.Wrap(err, "failed to delete run") + return errors.Wrap(err, "failed to delete revision") } // TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types... //q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql() @@ -536,11 +574,17 @@ func insertRevision(tx *db.Tx, revision int64) error { } func insertRun(tx *db.Tx, run *types.Run, data []byte) error { + // add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02) + groupPath := run.Group + if !strings.HasSuffix(groupPath, "/") { + groupPath += "/" + } + // poor man insert or update that works because transaction isolation level is serializable if _, err := tx.Exec("delete from run where id = $1", run.ID); err != nil { return errors.Wrap(err, "failed to delete run") } - q, args, err := runInsert.Values(run.ID, data, run.Phase).ToSql() + q, args, err := runInsert.Values(run.ID, groupPath, run.Phase).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -548,40 +592,33 @@ func insertRun(tx *db.Tx, run *types.Run, data []byte) error { return err } - groupPaths := []string{} - p := run.Group - for { - groupPaths = append(groupPaths, p) - prevp := p - p = path.Dir(p) - if p == prevp { - break - } + // poor man insert or update that works because transaction isolation level is serializable + if _, err := tx.Exec("delete from rundata where id = $1", run.ID); err != nil { + return errors.Wrap(err, "failed to delete rundata") } - - for _, groupPath := range groupPaths { - // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from rungroup where runID = $1 and grouppath = $2", run.ID, groupPath); err != nil { - return errors.Wrap(err, "failed to delete rungroup") - } - q, args, err := rungroupInsert.Values(run.ID, groupPath).ToSql() - if err != nil { - return errors.Wrap(err, "failed to build query") - } - if _, err = tx.Exec(q, args...); err != nil { - return err - } + q, args, err = rundataInsert.Values(run.ID, data).ToSql() + if err != nil { + return errors.Wrap(err, "failed to build query") + } + if _, err = tx.Exec(q, args...); err != nil { + return err } return nil } func (r *ReadDB) insertRunLTS(tx *db.Tx, run *types.Run, data []byte) error { + // add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02) + groupPath := run.Group + if !strings.HasSuffix(groupPath, "/") { + groupPath += "/" + } + // poor man insert or update that works because transaction isolation level is serializable if _, err := tx.Exec("delete from run_lts where id = $1", run.ID); err != nil { return errors.Wrap(err, "failed to delete run lts") } - q, args, err := runLTSInsert.Values(run.ID, data, run.Phase).ToSql() + q, args, err := runLTSInsert.Values(run.ID, groupPath, run.Phase).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -589,29 +626,16 @@ func (r *ReadDB) insertRunLTS(tx *db.Tx, run *types.Run, data []byte) error { return err } - groupPaths := []string{} - p := run.Group - for { - groupPaths = append(groupPaths, p) - prevp := p - p = path.Dir(p) - if p == prevp { - break - } + // poor man insert or update that works because transaction isolation level is serializable + if _, err := tx.Exec("delete from rundata_lts where id = $1", run.ID); err != nil { + return errors.Wrap(err, "failed to delete rundata") } - - for _, groupPath := range groupPaths { - // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from rungroup_lts where runID = $1 and grouppath = $2", run.ID, groupPath); err != nil { - return errors.Wrap(err, "failed to delete rungroup") - } - q, args, err := rungroupLTSInsert.Values(run.ID, groupPath).ToSql() - if err != nil { - return errors.Wrap(err, "failed to build query") - } - if _, err = tx.Exec(q, args...); err != nil { - return err - } + q, args, err = rundataLTSInsert.Values(run.ID, data).ToSql() + if err != nil { + return errors.Wrap(err, "failed to build query") + } + if _, err = tx.Exec(q, args...); err != nil { + return err } return nil @@ -689,33 +713,10 @@ func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFi return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) } -func (r *ReadDB) PrefetchRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) error { - useLTS := false - for _, phase := range phaseFilter { - if phase == types.RunPhaseFinished { - useLTS = true - } - } - if len(phaseFilter) == 0 { - useLTS = true - } - if !useLTS { - return nil - } - - for _, group := range groups { - err := r.SyncLTSRuns(tx, group, startRunID, limit, sortOrder) - if err != nil { - return errors.Wrap(err, "failed to sync runs from lts") - } - } - return nil -} - func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { useLTS := false for _, phase := range phaseFilter { - if phase == types.RunPhaseFinished { + if phase == types.RunPhaseFinished || phase == types.RunPhaseCancelled { useLTS = true } } @@ -804,12 +805,14 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, lts bool) sq.SelectBuilder { runt := "run" - rungroupt := "rungroup" - fields := []string{"data"} + rundatat := "rundata" + fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"} + if len(groups) > 0 && lastRun { + fields = []string{"max(run.id)", "run.grouppath", "run.phase", "rundata.data"} + } if lts { runt = "run_lts" - rungroupt = "rungroup_lts" - fields = []string{"id"} + rundatat = "rundata_lts" } r.log.Debugf("runt: %s", runt) @@ -835,7 +838,7 @@ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []str s = s.Limit(uint64(limit)) } - s = s.Join(fmt.Sprintf("%s as rungroup on rungroup.id = run.id", rungroupt)) + s = s.Join(fmt.Sprintf("%s as rundata on rundata.id = run.id", rundatat)) if len(groups) > 0 { cond := sq.Or{} for _, groupPath := range groups { @@ -965,8 +968,8 @@ func fetchChangeGroupsRevision(tx *db.Tx, q string, args ...interface{}) (types. return scanChangeGroupsRevision(rows) } -func scanChangeGroupsRevision(rows *sql.Rows) (map[string]int64, error) { - changegroups := map[string]int64{} +func scanChangeGroupsRevision(rows *sql.Rows) (types.ChangeGroupsRevisions, error) { + changegroups := types.ChangeGroupsRevisions{} for rows.Next() { var ( id string diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 0d4acf3..4907f9b 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -22,10 +22,13 @@ import ( "fmt" "net/http" "os" + "path" "path/filepath" + "strconv" "time" scommon "github.com/sorintlab/agola/internal/common" + "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/objectstorage" @@ -1073,7 +1076,26 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error { if err != nil { return err } - if _, err = s.wal.WriteWal(ctx, []*wal.Action{ra}, nil); err != nil { + + resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) + // if lastIndexKey doesn't exist return an error + if err != nil { + return err + } + lastIndexRevision := resp.Kvs[0].ModRevision + lastIndexDir := string(resp.Kvs[0].Value) + + indexActions, err := s.additionalActions(lastIndexDir, ra) + if err != nil { + return err + } + + actions := append([]*wal.Action{ra}, indexActions...) + + cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)} + then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, lastIndexDir)} + + if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil { return err } @@ -1085,7 +1107,13 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error { return nil } -func (s *Scheduler) additionalActions(action *wal.Action) ([]*wal.Action, error) { +func (s *Scheduler) additionalActions(indexDir string, action *wal.Action) ([]*wal.Action, error) { + type indexData struct { + ID string + Group string + Phase types.RunPhase + } + configType, _ := common.PathToTypeID(action.Path) var actionType wal.ActionType @@ -1103,14 +1131,22 @@ func (s *Scheduler) additionalActions(action *wal.Action) ([]*wal.Action, error) if err := json.Unmarshal(action.Data, &run); err != nil { return nil, errors.Wrap(err, "failed to unmarshal run") } - indexes := store.LTSGenIndexes(s.lts, run) - actions := make([]*wal.Action, len(indexes)) - for i, index := range indexes { - actions[i] = &wal.Action{ + + data := []byte{} + index := path.Join(common.StorageRunsIndexesDir, indexDir, "added", run.ID) + id := &indexData{ID: run.ID, Group: run.Group, Phase: run.Phase} + idj, err := json.Marshal(id) + if err != nil { + return nil, err + } + data = append(data, idj...) + + actions := []*wal.Action{ + &wal.Action{ ActionType: actionType, Path: index, - Data: []byte{}, - } + Data: data, + }, } return actions, nil } @@ -1118,6 +1154,174 @@ func (s *Scheduler) additionalActions(action *wal.Action) ([]*wal.Action, error) return []*wal.Action{}, nil } +func (s *Scheduler) dumpLTSLoop(ctx context.Context) { + for { + log.Debugf("lts dump loop") + + // TODO(sgotti) create new dump only after N files + if err := s.dumpLTS(ctx); err != nil { + log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(10 * time.Second) + } +} + +func (s *Scheduler) dumpLTS(ctx context.Context) error { + type indexData struct { + ID string + Group string + Phase types.RunPhase + } + + resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) + // if lastIndexKey doesn't exist return an error + if err != nil { + return err + } + lastIndexRevision := resp.Kvs[0].ModRevision + revision := resp.Header.Revision + + indexDir := strconv.FormatInt(time.Now().UnixNano(), 10) + + readdbRevision, err := s.readDB.GetRevision() + if err != nil { + return err + } + if readdbRevision < revision { + return errors.Errorf("readdb revision %d is lower than index revision %d", readdbRevision, revision) + } + + runs := []*readdb.RunData{} + var lastRunID string + stop := false + for { + err := s.readDB.Do(func(tx *db.Tx) error { + var err error + lruns, err := s.readDB.GetRunsFilteredLTS(tx, nil, false, nil, lastRunID, 10000, types.SortOrderDesc) + if err != nil { + return err + } + if len(lruns) == 0 { + stop = true + } else { + lastRunID = lruns[len(lruns)-1].ID + } + runs = append(runs, lruns...) + return nil + }) + if err != nil { + return err + } + if stop { + break + } + } + + data := []byte{} + for _, run := range runs { + id := &indexData{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)} + idj, err := json.Marshal(id) + if err != nil { + return err + } + data = append(data, idj...) + } + + index := path.Join(common.StorageRunsIndexesDir, indexDir, "all") + + cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)} + then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, indexDir)} + + actions := []*wal.Action{ + &wal.Action{ + ActionType: wal.ActionTypePut, + Path: index, + Data: data, + }, + } + + if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil { + return err + } + + return nil +} + +func (s *Scheduler) dumpLTSCleanerLoop(ctx context.Context) { + for { + log.Infof("lts dump cleaner loop") + + if err := s.dumpLTSCleaner(ctx); err != nil { + log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(10 * time.Second) + } +} + +func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error { + type indexData struct { + ID string + Group string + Phase types.RunPhase + } + + resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) + // if lastIndexKey doesn't exist return an error + if err != nil { + return err + } + lastIndexDir := string(resp.Kvs[0].Value) + + // collect all object that don't pertain to the lastIndexDir + objects := []string{} + doneCh := make(chan struct{}) + defer close(doneCh) + for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { + if object.Err != nil { + return object.Err + } + + h := util.PathList(object.Path) + if len(h) < 2 { + return errors.Errorf("wrong index dir path %q", object.Path) + } + indexDir := h[1] + if indexDir != lastIndexDir { + objects = append(objects, object.Path) + } + } + + actions := make([]*wal.Action, len(objects)) + for i, object := range objects { + actions[i] = &wal.Action{ + ActionType: wal.ActionTypeDelete, + Path: object, + } + } + + if len(actions) > 0 { + if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil { + return err + } + } + + return nil +} + type Scheduler struct { c *config.RunServiceScheduler e *etcd.Store @@ -1141,17 +1345,6 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule return nil, err } - // Create changegroup min revision if it doesn't exists - cmp := []etcdclientv3.Cmp{} - then := []etcdclientv3.Op{} - - cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0)) - then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, "")) - txn := e.Client().Txn(ctx).If(cmp...).Then(then...) - if _, err := txn.Commit(); err != nil { - return nil, etcd.FromEtcdError(err) - } - s := &Scheduler{ c: c, e: e, @@ -1159,9 +1352,8 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule } walConf := &wal.WalManagerConfig{ - E: e, - Lts: lts, - AdditionalActionsFunc: s.additionalActions, + E: e, + Lts: lts, } wal, err := wal.NewWalManager(ctx, logger, walConf) if err != nil { @@ -1181,6 +1373,56 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule return s, nil } +func (s *Scheduler) InitEtcd(ctx context.Context) error { + // Create changegroup min revision if it doesn't exists + cmp := []etcdclientv3.Cmp{} + then := []etcdclientv3.Op{} + + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0)) + then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, "")) + txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...) + if _, err := txn.Commit(); err != nil { + return etcd.FromEtcdError(err) + } + + // Populate lastIndexDir if empty + doneCh := make(chan struct{}) + defer close(doneCh) + + _, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + // EtcdLastIndexKey already exist + if err == nil { + return nil + } + + var indexDir string + // take the last (greater) indexdir + for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { + if object.Err != nil { + return object.Err + } + + h := util.PathList(object.Path) + if len(h) < 2 { + return errors.Errorf("wrong index dir path %q", object.Path) + } + indexDir = h[1] + } + + // if an indexDir doesn't exist in lts then initialize + if indexDir == "" { + indexDir = strconv.FormatInt(time.Now().UnixNano(), 10) + if _, err := s.e.AtomicPut(ctx, common.EtcdLastIndexKey, []byte(indexDir), 0, nil); err != nil { + return err + } + } + + return nil +} + func (s *Scheduler) Run(ctx context.Context) error { errCh := make(chan error) walReadyCh := make(chan struct{}) @@ -1190,6 +1432,14 @@ func (s *Scheduler) Run(ctx context.Context) error { // wait for wal to be ready <-walReadyCh + for { + err := s.InitEtcd(ctx) + if err == nil { + break + } + log.Errorf("failed to initialize etcd: %+v", err) + time.Sleep(1 * time.Second) + } go s.readDB.Run(ctx) @@ -1256,6 +1506,8 @@ func (s *Scheduler) Run(ctx context.Context) error { go s.runTasksUpdaterLoop(ctx) go s.fetcherLoop(ctx) go s.finishedRunsArchiverLoop(ctx) + go s.dumpLTSLoop(ctx) + go s.dumpLTSCleanerLoop(ctx) go s.compactChangeGroupsLoop(ctx) go s.runScheduler(ctx, ch) diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index 438d5a7..46f93ec 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -23,7 +23,6 @@ import ( "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" @@ -33,49 +32,56 @@ import ( etcdclientv3 "go.etcd.io/etcd/clientv3" ) +func LTSSubGroupsAndGroupTypes(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + return h +} + +func LTSRootGroup(group string) string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + return h[2] +} + func LTSSubGroups(group string) []string { - return util.PathHierarchy(group) + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) + } + + // remove group types + sg := []string{} + for i, g := range h { + if i%2 == 0 { + sg = append(sg, g) + } + } + + return sg } -func LTSIndexGroupDir(group string) string { - groupPath := util.EncodeSha256Hex(group) - if group == "." || group == "/" { - groupPath = "all" - } - return path.Join(common.StorageRunsIndexesDir, groupPath) -} - -func LTSIndexRunIDOrderDir(group string, sortOrder types.SortOrder) string { - var dir string - switch sortOrder { - case types.SortOrderAsc: - dir = "runid/asc" - case types.SortOrderDesc: - dir = "runid/desc" - } - return path.Join(LTSIndexGroupDir(group), dir) -} - -func LTSIndexRunIDOrderPath(group, runID string, sortOrder types.SortOrder) string { - s, err := sequence.Parse(runID) - if err != nil { - panic(err) +func LTSSubGroupTypes(group string) []string { + h := util.PathHierarchy(group) + if len(h)%2 != 1 { + panic(fmt.Errorf("wrong group path %q", group)) } - order := runID - if sortOrder == types.SortOrderDesc { - order = s.Reverse().String() + // remove group names + sg := []string{} + for i, g := range h { + if i%2 == 1 { + sg = append(sg, g) + } } - return path.Join(LTSIndexRunIDOrderDir(group, sortOrder), order, runID) -} -func LTSIndexRunIDOrderPaths(group, runID string, sortOrder types.SortOrder) []string { - paths := []string{} - subGroups := LTSSubGroups(group) - for _, subGroup := range subGroups { - paths = append(paths, LTSIndexRunIDOrderPath(subGroup, runID, sortOrder)) - } - return paths + return sg } func LTSRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string { @@ -89,12 +95,12 @@ func LTSRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string func LTSGetRunCounter(wal *wal.WalManager, group string) (uint64, *wal.ChangeGroupsUpdateToken, error) { // use the first group dir after the root - ph := util.PathHierarchy(group) - if len(ph) < 2 { + pl := util.PathList(group) + if len(pl) < 2 { return 0, nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group) } - runCounterPath := common.StorageCounterFile(ph[1]) - rcf, cgt, err := wal.ReadObject(runCounterPath, []string{"counter-" + ph[1]}) + runCounterPath := common.StorageCounterFile(pl[1]) + rcf, cgt, err := wal.ReadObject(runCounterPath, []string{"counter-" + pl[1]}) if err != nil { return 0, cgt, err } @@ -110,8 +116,8 @@ func LTSGetRunCounter(wal *wal.WalManager, group string) (uint64, *wal.ChangeGro func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) { // use the first group dir after the root - ph := util.PathHierarchy(group) - if len(ph) < 2 { + pl := util.PathList(group) + if len(pl) < 2 { return nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group) } @@ -122,7 +128,7 @@ func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa action := &wal.Action{ ActionType: wal.ActionTypePut, - Path: common.StorageCounterFile(ph[1]), + Path: common.StorageCounterFile(pl[1]), Data: cj, } @@ -239,15 +245,6 @@ func LTSSaveRunAction(r *types.Run) (*wal.Action, error) { return action, nil } -func LTSGenIndexes(lts *objectstorage.ObjStorage, r *types.Run) []string { - indexes := []string{} - for _, order := range []types.SortOrder{types.SortOrderAsc, types.SortOrderDesc} { - indexes = append(indexes, LTSIndexRunIDOrderPaths(r.Group, r.ID, order)...) - //indexes = append(indexes, LTSIndexRunArchiveOrderPaths(r.Group, r.LTSSequence, r.ID, order)...) - } - return indexes -} - func GetExecutor(ctx context.Context, e *etcd.Store, executorID string) (*types.Executor, error) { resp, err := e.Get(ctx, common.EtcdExecutorKey(executorID), 0) if err != nil { diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go index 2ab261b..f0e21e0 100644 --- a/internal/services/scheduler/scheduler.go +++ b/internal/services/scheduler/scheduler.go @@ -66,7 +66,7 @@ func (s *Scheduler) schedule(ctx context.Context) error { } } - for groupID, _ := range groups { + for groupID := range groups { if err := s.scheduleRun(ctx, groupID); err != nil { log.Errorf("scheduler err: %v", err) }