runservice: rework store and readdb logic

* Remove all the small index files on the lts
* Keep on s3 only a full index of all runs containing the runid, grouppath and phase
  million of runs can take only some hundred of megabytes
* Periodically create a new dump of the index
This commit is contained in:
Simone Gotti 2019-03-29 12:15:48 +01:00
parent 3c5eb71ba8
commit e46766829c
7 changed files with 552 additions and 314 deletions

View File

@ -390,22 +390,7 @@ func (h *RunsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var runs []*types.Run
var cgt *types.ChangeGroupsUpdateToken
if len(groups) == 0 {
groups = []string{"."}
}
err := h.readDB.Do(func(tx *db.Tx) error {
if err := h.readDB.PrefetchRuns(tx, groups, phaseFilter, start, limit, sortOrder); err != nil {
h.log.Errorf("err: %+v", err)
return err
}
return nil
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = h.readDB.Do(func(tx *db.Tx) error {
var err error
runs, err = h.readDB.GetRuns(tx, groups, lastRun, phaseFilter, start, limit, sortOrder)
if err != nil {

View File

@ -43,6 +43,8 @@ var (
EtcdExecutorsDir = "executors"
EtcdTasksDir = "tasks"
EtcdLastIndexKey = "lastindex"
)
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }

View File

@ -19,12 +19,9 @@ var Stmts = []string{
//"create table revision (clusterid varchar, revision bigint, PRIMARY KEY(revision))",
"create table revision (revision bigint, PRIMARY KEY(revision))",
"create table run (id varchar, data bytea, phase varchar, PRIMARY KEY (id))",
"create index run_phase on run(phase)",
"create table run (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))",
// rungroup stores the groups associated to a run
"create table rungroup (runid varchar, grouppath varchar, PRIMARY KEY (runid, grouppath), FOREIGN KEY(runid) REFERENCES run(id) ON DELETE CASCADE)",
"create index rungroup_grouppath on rungroup(grouppath)",
"create table rundata (id varchar, data bytea, PRIMARY KEY (id))",
"create table runevent (sequence varchar, data bytea, PRIMARY KEY (sequence))",
@ -32,10 +29,12 @@ var Stmts = []string{
"create table changegrouprevision (id varchar, revision varchar, PRIMARY KEY (id, revision))",
// LTS
"create table run_lts (id varchar, data bytea, phase varchar, PRIMARY KEY (id))",
"create index run_lts_phase on run_lts(phase)",
"create table revision_lts (revision bigint, PRIMARY KEY(revision))",
// rungroup stores the groups associated to a run
"create table rungroup_lts (runid varchar, grouppath varchar, PRIMARY KEY (runid, grouppath), FOREIGN KEY(runid) REFERENCES run_lts(id) ON DELETE CASCADE)",
"create index rungroup_lts_grouppath on rungroup_lts(grouppath)",
// committedwalsequence stores the last committed wal sequence
"create table committedwalsequence_lts (seq varchar, PRIMARY KEY (seq))",
"create table run_lts (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))",
"create table rundata_lts (id varchar, data bytea, PRIMARY KEY (id))",
}

View File

@ -19,6 +19,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"io"
"os"
"path"
"path/filepath"
@ -46,7 +47,7 @@ import (
)
const (
MaxFetchSize = 25
paginationSize = 100
)
var (
@ -56,11 +57,10 @@ var (
revisionSelect = sb.Select("revision").From("revision")
revisionInsert = sb.Insert("revision").Columns("revision")
runSelect = sb.Select("data").From("run")
runInsert = sb.Insert("run").Columns("id", "data", "phase")
runSelect = sb.Select("id", "grouppath", "phase").From("run")
runInsert = sb.Insert("run").Columns("id", "grouppath", "phase")
rungroupSelect = sb.Select("runid", "grouppath").From("rungroup")
rungroupInsert = sb.Insert("rungroup").Columns("runid", "grouppath")
rundataInsert = sb.Insert("rundata").Columns("id", "data")
runeventSelect = sb.Select("data").From("runevent")
runeventInsert = sb.Insert("runevent").Columns("sequence", "data")
@ -68,11 +68,18 @@ var (
changegrouprevisionSelect = sb.Select("id, revision").From("changegrouprevision")
changegrouprevisionInsert = sb.Insert("changegrouprevision").Columns("id", "revision")
runLTSSelect = sb.Select("id").From("run_lts")
runLTSInsert = sb.Insert("run_lts").Columns("id", "data", "phase")
// LTS
rungroupLTSSelect = sb.Select("runid", "grouppath").From("rungroup_lts")
rungroupLTSInsert = sb.Insert("rungroup_lts").Columns("runid", "grouppath")
revisionLTSSelect = sb.Select("revision").From("revision_lts")
revisionLTSInsert = sb.Insert("revision_lts").Columns("revision")
committedwalsequenceLTSSelect = sb.Select("seq").From("committedwalsequence_lts")
committedwalsequenceLTSInsert = sb.Insert("committedwalsequence_lts").Columns("seq")
runLTSSelect = sb.Select("id", "grouppath", "phase").From("run_lts")
runLTSInsert = sb.Insert("run_lts").Columns("id", "grouppath", "phase")
rundataLTSInsert = sb.Insert("rundata_lts").Columns("id", "data")
)
type ReadDB struct {
@ -108,26 +115,27 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.
rdb: rdb,
}
revision, err := readDB.GetRevision()
if err != nil {
return nil, err
}
if revision == 0 {
if err := readDB.Initialize(ctx); err != nil {
return nil, err
}
}
readDB.Initialized = true
return readDB, nil
}
// Initialize populates the readdb with the current etcd data and save the
// revision to then feed it with the etcd events
func (r *ReadDB) Initialize(ctx context.Context) error {
r.log.Infof("initialize")
if err := r.ResetDB(); err != nil {
return errors.Wrapf(err, "failed to reset db")
}
revision, err := r.SyncLTS(ctx)
if err != nil {
return errors.Wrapf(err, "error syncing lts db")
}
if err := r.SyncRDB(ctx, revision); err != nil {
return errors.Wrapf(err, "error syncing run db")
}
return nil
}
func (r *ReadDB) ResetDB() error {
// TODO(sgotti) this needs to be protected by a mutex
r.rdb.Close()
// drop rdb
@ -147,30 +155,17 @@ func (r *ReadDB) Initialize(ctx context.Context) error {
r.rdb = rdb
// then sync the rdb
for {
if err := r.SyncRDB(ctx); err != nil {
r.log.Errorf("error syncing run db: %+v, retrying", err)
} else {
break
}
time.Sleep(2 * time.Second)
}
r.Initialized = true
return nil
}
func (r *ReadDB) SyncRDB(ctx context.Context) error {
func (r *ReadDB) SyncRDB(ctx context.Context, revision int64) error {
err := r.rdb.Do(func(tx *db.Tx) error {
// Do pagination to limit the number of keys per request
var revision int64
key := common.EtcdRunsDir
var continuation *etcd.ListPagedContinuation
for {
listResp, err := r.e.ListPaged(ctx, key, 0, 10, continuation)
listResp, err := r.e.ListPaged(ctx, key, revision, paginationSize, continuation)
if err != nil {
return err
}
@ -188,7 +183,6 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
if err := json.Unmarshal(kv.Value, &run); err != nil {
return err
}
run.Revision = kv.ModRevision
if err := insertRun(tx, run, kv.Value); err != nil {
return err
@ -204,7 +198,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
key = common.EtcdChangeGroupsDir
continuation = nil
for {
listResp, err := r.e.ListPaged(ctx, key, revision, 10, continuation)
listResp, err := r.e.ListPaged(ctx, key, revision, paginationSize, continuation)
if err != nil {
return err
}
@ -219,128 +213,174 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
}
}
if err := insertRevision(tx, revision); err != nil {
return err
}
if !listResp.HasMore {
break
}
}
if err := insertRevision(tx, revision); err != nil {
return err
}
return nil
})
return err
}
func (r *ReadDB) SyncLTSRuns(tx *db.Tx, groupID, startRunID string, limit int, sortOrder types.SortOrder) error {
doneCh := make(chan struct{})
defer close(doneCh)
func (r *ReadDB) SyncLTS(ctx context.Context) (int64, error) {
type indexData struct {
ID string
Phase types.RunPhase
Group string
}
//q, args, err := rungroupSelect.Where(sq.Eq{"grouppath": groupID}).Limit(1).ToSql()
//r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
//if err != nil {
// return errors.Wrap(err, "failed to build query")
//}
//hasRow := false
//err = tx.Do(func(tx *db.Tx) error {
// rows, err := tx.Query(q, args...)
// if err != nil {
// return err
// }
// defer rows.Close()
// for rows.Next() {
// hasRow = true
// break
// }
// if err := rows.Err(); err != nil {
// return err
// }
// return nil
//})
//// this means that this rungroup is in sync
//if hasRow {
// return nil
//}
insertfunc := func(runs []*types.Run) error {
for _, run := range runs {
if err := r.insertRunLTS(tx, run, []byte{}); err != nil {
insertfunc := func(objs []string) error {
err := r.rdb.Do(func(tx *db.Tx) error {
for _, obj := range objs {
f, _, err := r.wal.ReadObject(obj, nil)
if err != nil {
if err == objectstorage.ErrNotExist {
r.log.Warnf("object %s disappeared, ignoring", obj)
}
return err
}
}
return nil
}
runs := []*types.Run{}
count := 0
var start string
if startRunID != "" {
start = store.LTSIndexRunIDOrderPath(groupID, startRunID, sortOrder)
}
for object := range r.wal.List(store.LTSIndexRunIDOrderDir(groupID, sortOrder), start, true, doneCh) {
//r.log.Infof("path: %q", object.Path)
if object.Err != nil {
if object.Err == objectstorage.ErrNotExist {
dec := json.NewDecoder(f)
for {
var id *indexData
err := dec.Decode(&id)
if err == io.EOF {
// all done
break
}
return object.Err
}
runObj := common.StorageRunFile(path.Base(object.Path))
f, _, err := r.wal.ReadObject(runObj, nil)
if err != nil && err != objectstorage.ErrNotExist {
return err
}
if err != objectstorage.ErrNotExist {
var run *types.Run
e := json.NewDecoder(f)
if err := e.Decode(&run); err != nil {
if err != nil {
f.Close()
return err
}
f.Close()
runs = append(runs, run)
run := &types.Run{
ID: id.ID,
Group: id.Group,
Phase: id.Phase,
}
if err := r.insertRunLTS(tx, run, []byte{}); err != nil {
f.Close()
return err
}
}
f.Close()
}
return nil
})
return err
}
if count > 100 {
if err := insertfunc(runs); err != nil {
return err
resp, err := r.e.Get(ctx, common.EtcdLastIndexKey, 0)
if err != nil {
return 0, err
}
indexDir := string(resp.Kvs[0].Value)
indexRevision := resp.Kvs[0].ModRevision
revision := resp.Header.Revision
// TODO(sgotti) wait for wal changes to be at a revision >= revision
walChangesRevision, err := r.wal.ChangesCurrentRevision()
if err != nil {
return 0, err
}
if walChangesRevision < indexRevision {
return 0, errors.Errorf("wal changes revision %q is lower than index revision %q", walChangesRevision, revision)
}
r.log.Infof("indexDir: %s", indexDir)
objs := []string{}
count := 0
doneCh := make(chan struct{})
defer close(doneCh)
for object := range r.wal.List(path.Join(common.StorageRunsIndexesDir, indexDir)+"/", "", true, doneCh) {
if object.Err != nil {
return 0, object.Err
}
r.log.Infof("path: %s", object.Path)
objs = append(objs, object.Path)
if count > paginationSize {
if err := insertfunc(objs); err != nil {
return 0, err
}
count = 0
runs = []*types.Run{}
objs = []string{}
} else {
count++
}
if count > limit {
break
}
if err := insertfunc(objs); err != nil {
return 0, err
}
if err := insertfunc(runs); err != nil {
resp, err = r.e.Get(ctx, common.EtcdLastIndexKey, 0)
if err != nil {
return 0, err
}
curIndexDir := string(resp.Kvs[0].Value)
if curIndexDir != indexDir {
return 0, errors.Errorf("index dir has changed, used %s, current: %s", indexDir, curIndexDir)
}
return revision, nil
}
func (r *ReadDB) Run(ctx context.Context) error {
revision, err := r.GetRevision()
if err != nil {
return err
}
return nil
if revision == 0 {
for {
err := r.Initialize(ctx)
if err == nil {
break
}
r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second)
}
}
r.Initialized = true
for {
for {
initialized := r.Initialized
if initialized {
break
}
err := r.Initialize(ctx)
if err == nil {
r.Initialized = true
break
}
r.log.Errorf("initialize err: %+v", err)
time.Sleep(1 * time.Second)
}
func (r *ReadDB) Run(ctx context.Context) {
for {
r.log.Infof("starting HandleEvents")
if err := r.HandleEvents(ctx); err != nil {
r.log.Errorf("handleevents err: %+v", err)
}
if !r.Initialized {
r.Initialize(ctx)
}
select {
case <-ctx.Done():
r.log.Infof("readdb exiting")
return
default:
r.rdb.Close()
return nil
}
time.Sleep(1 * time.Second)
@ -407,7 +447,6 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error {
// a single transaction for every response (every response contains all the
// events happened in an etcd revision).
err = r.rdb.Do(func(tx *db.Tx) error {
for _, ev := range wresp.Events {
if err := r.handleEvent(tx, ev, &wresp); err != nil {
return err
@ -452,7 +491,6 @@ func (r *ReadDB) handleRunEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdcl
if err := json.Unmarshal(ev.Kv.Value, &run); err != nil {
return errors.Wrap(err, "failed to unmarshal run")
}
run.Revision = ev.Kv.ModRevision
return insertRun(tx, run, ev.Kv.Value)
@ -521,7 +559,7 @@ func (r *ReadDB) Do(f func(tx *db.Tx) error) error {
func insertRevision(tx *db.Tx, revision int64) error {
// poor man insert or update that works because transaction isolation level is serializable
if _, err := tx.Exec("delete from revision"); err != nil {
return errors.Wrap(err, "failed to delete run")
return errors.Wrap(err, "failed to delete revision")
}
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
//q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql()
@ -536,11 +574,17 @@ func insertRevision(tx *db.Tx, revision int64) error {
}
func insertRun(tx *db.Tx, run *types.Run, data []byte) error {
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
groupPath := run.Group
if !strings.HasSuffix(groupPath, "/") {
groupPath += "/"
}
// poor man insert or update that works because transaction isolation level is serializable
if _, err := tx.Exec("delete from run where id = $1", run.ID); err != nil {
return errors.Wrap(err, "failed to delete run")
}
q, args, err := runInsert.Values(run.ID, data, run.Phase).ToSql()
q, args, err := runInsert.Values(run.ID, groupPath, run.Phase).ToSql()
if err != nil {
return errors.Wrap(err, "failed to build query")
}
@ -548,40 +592,33 @@ func insertRun(tx *db.Tx, run *types.Run, data []byte) error {
return err
}
groupPaths := []string{}
p := run.Group
for {
groupPaths = append(groupPaths, p)
prevp := p
p = path.Dir(p)
if p == prevp {
break
}
}
for _, groupPath := range groupPaths {
// poor man insert or update that works because transaction isolation level is serializable
if _, err := tx.Exec("delete from rungroup where runID = $1 and grouppath = $2", run.ID, groupPath); err != nil {
return errors.Wrap(err, "failed to delete rungroup")
if _, err := tx.Exec("delete from rundata where id = $1", run.ID); err != nil {
return errors.Wrap(err, "failed to delete rundata")
}
q, args, err := rungroupInsert.Values(run.ID, groupPath).ToSql()
q, args, err = rundataInsert.Values(run.ID, data).ToSql()
if err != nil {
return errors.Wrap(err, "failed to build query")
}
if _, err = tx.Exec(q, args...); err != nil {
return err
}
}
return nil
}
func (r *ReadDB) insertRunLTS(tx *db.Tx, run *types.Run, data []byte) error {
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
groupPath := run.Group
if !strings.HasSuffix(groupPath, "/") {
groupPath += "/"
}
// poor man insert or update that works because transaction isolation level is serializable
if _, err := tx.Exec("delete from run_lts where id = $1", run.ID); err != nil {
return errors.Wrap(err, "failed to delete run lts")
}
q, args, err := runLTSInsert.Values(run.ID, data, run.Phase).ToSql()
q, args, err := runLTSInsert.Values(run.ID, groupPath, run.Phase).ToSql()
if err != nil {
return errors.Wrap(err, "failed to build query")
}
@ -589,30 +626,17 @@ func (r *ReadDB) insertRunLTS(tx *db.Tx, run *types.Run, data []byte) error {
return err
}
groupPaths := []string{}
p := run.Group
for {
groupPaths = append(groupPaths, p)
prevp := p
p = path.Dir(p)
if p == prevp {
break
}
}
for _, groupPath := range groupPaths {
// poor man insert or update that works because transaction isolation level is serializable
if _, err := tx.Exec("delete from rungroup_lts where runID = $1 and grouppath = $2", run.ID, groupPath); err != nil {
return errors.Wrap(err, "failed to delete rungroup")
if _, err := tx.Exec("delete from rundata_lts where id = $1", run.ID); err != nil {
return errors.Wrap(err, "failed to delete rundata")
}
q, args, err := rungroupLTSInsert.Values(run.ID, groupPath).ToSql()
q, args, err = rundataLTSInsert.Values(run.ID, data).ToSql()
if err != nil {
return errors.Wrap(err, "failed to build query")
}
if _, err = tx.Exec(q, args...); err != nil {
return err
}
}
return nil
}
@ -689,33 +713,10 @@ func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFi
return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder)
}
func (r *ReadDB) PrefetchRuns(tx *db.Tx, groups []string, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) error {
useLTS := false
for _, phase := range phaseFilter {
if phase == types.RunPhaseFinished {
useLTS = true
}
}
if len(phaseFilter) == 0 {
useLTS = true
}
if !useLTS {
return nil
}
for _, group := range groups {
err := r.SyncLTSRuns(tx, group, startRunID, limit, sortOrder)
if err != nil {
return errors.Wrap(err, "failed to sync runs from lts")
}
}
return nil
}
func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) {
useLTS := false
for _, phase := range phaseFilter {
if phase == types.RunPhaseFinished {
if phase == types.RunPhaseFinished || phase == types.RunPhaseCancelled {
useLTS = true
}
}
@ -804,12 +805,14 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [
func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, lts bool) sq.SelectBuilder {
runt := "run"
rungroupt := "rungroup"
fields := []string{"data"}
rundatat := "rundata"
fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"}
if len(groups) > 0 && lastRun {
fields = []string{"max(run.id)", "run.grouppath", "run.phase", "rundata.data"}
}
if lts {
runt = "run_lts"
rungroupt = "rungroup_lts"
fields = []string{"id"}
rundatat = "rundata_lts"
}
r.log.Debugf("runt: %s", runt)
@ -835,7 +838,7 @@ func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []str
s = s.Limit(uint64(limit))
}
s = s.Join(fmt.Sprintf("%s as rungroup on rungroup.id = run.id", rungroupt))
s = s.Join(fmt.Sprintf("%s as rundata on rundata.id = run.id", rundatat))
if len(groups) > 0 {
cond := sq.Or{}
for _, groupPath := range groups {
@ -965,8 +968,8 @@ func fetchChangeGroupsRevision(tx *db.Tx, q string, args ...interface{}) (types.
return scanChangeGroupsRevision(rows)
}
func scanChangeGroupsRevision(rows *sql.Rows) (map[string]int64, error) {
changegroups := map[string]int64{}
func scanChangeGroupsRevision(rows *sql.Rows) (types.ChangeGroupsRevisions, error) {
changegroups := types.ChangeGroupsRevisions{}
for rows.Next() {
var (
id string

View File

@ -22,10 +22,13 @@ import (
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"time"
scommon "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
slog "github.com/sorintlab/agola/internal/log"
"github.com/sorintlab/agola/internal/objectstorage"
@ -1073,7 +1076,26 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error {
if err != nil {
return err
}
if _, err = s.wal.WriteWal(ctx, []*wal.Action{ra}, nil); err != nil {
resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
// if lastIndexKey doesn't exist return an error
if err != nil {
return err
}
lastIndexRevision := resp.Kvs[0].ModRevision
lastIndexDir := string(resp.Kvs[0].Value)
indexActions, err := s.additionalActions(lastIndexDir, ra)
if err != nil {
return err
}
actions := append([]*wal.Action{ra}, indexActions...)
cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)}
then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, lastIndexDir)}
if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil {
return err
}
@ -1085,7 +1107,13 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error {
return nil
}
func (s *Scheduler) additionalActions(action *wal.Action) ([]*wal.Action, error) {
func (s *Scheduler) additionalActions(indexDir string, action *wal.Action) ([]*wal.Action, error) {
type indexData struct {
ID string
Group string
Phase types.RunPhase
}
configType, _ := common.PathToTypeID(action.Path)
var actionType wal.ActionType
@ -1103,14 +1131,22 @@ func (s *Scheduler) additionalActions(action *wal.Action) ([]*wal.Action, error)
if err := json.Unmarshal(action.Data, &run); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal run")
}
indexes := store.LTSGenIndexes(s.lts, run)
actions := make([]*wal.Action, len(indexes))
for i, index := range indexes {
actions[i] = &wal.Action{
data := []byte{}
index := path.Join(common.StorageRunsIndexesDir, indexDir, "added", run.ID)
id := &indexData{ID: run.ID, Group: run.Group, Phase: run.Phase}
idj, err := json.Marshal(id)
if err != nil {
return nil, err
}
data = append(data, idj...)
actions := []*wal.Action{
&wal.Action{
ActionType: actionType,
Path: index,
Data: []byte{},
}
Data: data,
},
}
return actions, nil
}
@ -1118,6 +1154,174 @@ func (s *Scheduler) additionalActions(action *wal.Action) ([]*wal.Action, error)
return []*wal.Action{}, nil
}
func (s *Scheduler) dumpLTSLoop(ctx context.Context) {
for {
log.Debugf("lts dump loop")
// TODO(sgotti) create new dump only after N files
if err := s.dumpLTS(ctx); err != nil {
log.Errorf("err: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(10 * time.Second)
}
}
func (s *Scheduler) dumpLTS(ctx context.Context) error {
type indexData struct {
ID string
Group string
Phase types.RunPhase
}
resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
// if lastIndexKey doesn't exist return an error
if err != nil {
return err
}
lastIndexRevision := resp.Kvs[0].ModRevision
revision := resp.Header.Revision
indexDir := strconv.FormatInt(time.Now().UnixNano(), 10)
readdbRevision, err := s.readDB.GetRevision()
if err != nil {
return err
}
if readdbRevision < revision {
return errors.Errorf("readdb revision %d is lower than index revision %d", readdbRevision, revision)
}
runs := []*readdb.RunData{}
var lastRunID string
stop := false
for {
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
lruns, err := s.readDB.GetRunsFilteredLTS(tx, nil, false, nil, lastRunID, 10000, types.SortOrderDesc)
if err != nil {
return err
}
if len(lruns) == 0 {
stop = true
} else {
lastRunID = lruns[len(lruns)-1].ID
}
runs = append(runs, lruns...)
return nil
})
if err != nil {
return err
}
if stop {
break
}
}
data := []byte{}
for _, run := range runs {
id := &indexData{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)}
idj, err := json.Marshal(id)
if err != nil {
return err
}
data = append(data, idj...)
}
index := path.Join(common.StorageRunsIndexesDir, indexDir, "all")
cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)}
then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, indexDir)}
actions := []*wal.Action{
&wal.Action{
ActionType: wal.ActionTypePut,
Path: index,
Data: data,
},
}
if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil {
return err
}
return nil
}
func (s *Scheduler) dumpLTSCleanerLoop(ctx context.Context) {
for {
log.Infof("lts dump cleaner loop")
if err := s.dumpLTSCleaner(ctx); err != nil {
log.Errorf("err: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(10 * time.Second)
}
}
func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error {
type indexData struct {
ID string
Group string
Phase types.RunPhase
}
resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
// if lastIndexKey doesn't exist return an error
if err != nil {
return err
}
lastIndexDir := string(resp.Kvs[0].Value)
// collect all object that don't pertain to the lastIndexDir
objects := []string{}
doneCh := make(chan struct{})
defer close(doneCh)
for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
h := util.PathList(object.Path)
if len(h) < 2 {
return errors.Errorf("wrong index dir path %q", object.Path)
}
indexDir := h[1]
if indexDir != lastIndexDir {
objects = append(objects, object.Path)
}
}
actions := make([]*wal.Action, len(objects))
for i, object := range objects {
actions[i] = &wal.Action{
ActionType: wal.ActionTypeDelete,
Path: object,
}
}
if len(actions) > 0 {
if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil {
return err
}
}
return nil
}
type Scheduler struct {
c *config.RunServiceScheduler
e *etcd.Store
@ -1141,17 +1345,6 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule
return nil, err
}
// Create changegroup min revision if it doesn't exists
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0))
then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, ""))
txn := e.Client().Txn(ctx).If(cmp...).Then(then...)
if _, err := txn.Commit(); err != nil {
return nil, etcd.FromEtcdError(err)
}
s := &Scheduler{
c: c,
e: e,
@ -1161,7 +1354,6 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule
walConf := &wal.WalManagerConfig{
E: e,
Lts: lts,
AdditionalActionsFunc: s.additionalActions,
}
wal, err := wal.NewWalManager(ctx, logger, walConf)
if err != nil {
@ -1181,6 +1373,56 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule
return s, nil
}
func (s *Scheduler) InitEtcd(ctx context.Context) error {
// Create changegroup min revision if it doesn't exists
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0))
then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, ""))
txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...)
if _, err := txn.Commit(); err != nil {
return etcd.FromEtcdError(err)
}
// Populate lastIndexDir if empty
doneCh := make(chan struct{})
defer close(doneCh)
_, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
// EtcdLastIndexKey already exist
if err == nil {
return nil
}
var indexDir string
// take the last (greater) indexdir
for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
h := util.PathList(object.Path)
if len(h) < 2 {
return errors.Errorf("wrong index dir path %q", object.Path)
}
indexDir = h[1]
}
// if an indexDir doesn't exist in lts then initialize
if indexDir == "" {
indexDir = strconv.FormatInt(time.Now().UnixNano(), 10)
if _, err := s.e.AtomicPut(ctx, common.EtcdLastIndexKey, []byte(indexDir), 0, nil); err != nil {
return err
}
}
return nil
}
func (s *Scheduler) Run(ctx context.Context) error {
errCh := make(chan error)
walReadyCh := make(chan struct{})
@ -1190,6 +1432,14 @@ func (s *Scheduler) Run(ctx context.Context) error {
// wait for wal to be ready
<-walReadyCh
for {
err := s.InitEtcd(ctx)
if err == nil {
break
}
log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
}
go s.readDB.Run(ctx)
@ -1256,6 +1506,8 @@ func (s *Scheduler) Run(ctx context.Context) error {
go s.runTasksUpdaterLoop(ctx)
go s.fetcherLoop(ctx)
go s.finishedRunsArchiverLoop(ctx)
go s.dumpLTSLoop(ctx)
go s.dumpLTSCleanerLoop(ctx)
go s.compactChangeGroupsLoop(ctx)
go s.runScheduler(ctx, ch)

View File

@ -23,7 +23,6 @@ import (
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/sequence"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
@ -33,49 +32,56 @@ import (
etcdclientv3 "go.etcd.io/etcd/clientv3"
)
func LTSSubGroupsAndGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
return h
}
func LTSRootGroup(group string) string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
return h[2]
}
func LTSSubGroups(group string) []string {
return util.PathHierarchy(group)
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
func LTSIndexGroupDir(group string) string {
groupPath := util.EncodeSha256Hex(group)
if group == "." || group == "/" {
groupPath = "all"
// remove group types
sg := []string{}
for i, g := range h {
if i%2 == 0 {
sg = append(sg, g)
}
return path.Join(common.StorageRunsIndexesDir, groupPath)
}
func LTSIndexRunIDOrderDir(group string, sortOrder types.SortOrder) string {
var dir string
switch sortOrder {
case types.SortOrderAsc:
dir = "runid/asc"
case types.SortOrderDesc:
dir = "runid/desc"
}
return path.Join(LTSIndexGroupDir(group), dir)
return sg
}
func LTSIndexRunIDOrderPath(group, runID string, sortOrder types.SortOrder) string {
s, err := sequence.Parse(runID)
if err != nil {
panic(err)
func LTSSubGroupTypes(group string) []string {
h := util.PathHierarchy(group)
if len(h)%2 != 1 {
panic(fmt.Errorf("wrong group path %q", group))
}
order := runID
if sortOrder == types.SortOrderDesc {
order = s.Reverse().String()
// remove group names
sg := []string{}
for i, g := range h {
if i%2 == 1 {
sg = append(sg, g)
}
return path.Join(LTSIndexRunIDOrderDir(group, sortOrder), order, runID)
}
func LTSIndexRunIDOrderPaths(group, runID string, sortOrder types.SortOrder) []string {
paths := []string{}
subGroups := LTSSubGroups(group)
for _, subGroup := range subGroups {
paths = append(paths, LTSIndexRunIDOrderPath(subGroup, runID, sortOrder))
}
return paths
return sg
}
func LTSRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string {
@ -89,12 +95,12 @@ func LTSRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string
func LTSGetRunCounter(wal *wal.WalManager, group string) (uint64, *wal.ChangeGroupsUpdateToken, error) {
// use the first group dir after the root
ph := util.PathHierarchy(group)
if len(ph) < 2 {
pl := util.PathList(group)
if len(pl) < 2 {
return 0, nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group)
}
runCounterPath := common.StorageCounterFile(ph[1])
rcf, cgt, err := wal.ReadObject(runCounterPath, []string{"counter-" + ph[1]})
runCounterPath := common.StorageCounterFile(pl[1])
rcf, cgt, err := wal.ReadObject(runCounterPath, []string{"counter-" + pl[1]})
if err != nil {
return 0, cgt, err
}
@ -110,8 +116,8 @@ func LTSGetRunCounter(wal *wal.WalManager, group string) (uint64, *wal.ChangeGro
func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) {
// use the first group dir after the root
ph := util.PathHierarchy(group)
if len(ph) < 2 {
pl := util.PathList(group)
if len(pl) < 2 {
return nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group)
}
@ -122,7 +128,7 @@ func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa
action := &wal.Action{
ActionType: wal.ActionTypePut,
Path: common.StorageCounterFile(ph[1]),
Path: common.StorageCounterFile(pl[1]),
Data: cj,
}
@ -239,15 +245,6 @@ func LTSSaveRunAction(r *types.Run) (*wal.Action, error) {
return action, nil
}
func LTSGenIndexes(lts *objectstorage.ObjStorage, r *types.Run) []string {
indexes := []string{}
for _, order := range []types.SortOrder{types.SortOrderAsc, types.SortOrderDesc} {
indexes = append(indexes, LTSIndexRunIDOrderPaths(r.Group, r.ID, order)...)
//indexes = append(indexes, LTSIndexRunArchiveOrderPaths(r.Group, r.LTSSequence, r.ID, order)...)
}
return indexes
}
func GetExecutor(ctx context.Context, e *etcd.Store, executorID string) (*types.Executor, error) {
resp, err := e.Get(ctx, common.EtcdExecutorKey(executorID), 0)
if err != nil {

View File

@ -66,7 +66,7 @@ func (s *Scheduler) schedule(ctx context.Context) error {
}
}
for groupID, _ := range groups {
for groupID := range groups {
if err := s.scheduleRun(ctx, groupID); err != nil {
log.Errorf("scheduler err: %v", err)
}