diff --git a/internal/services/configstore/command/command.go b/internal/services/configstore/command/command.go index 9189287..d3c56a7 100644 --- a/internal/services/configstore/command/command.go +++ b/internal/services/configstore/command/command.go @@ -20,7 +20,6 @@ import ( "path" "github.com/sorintlab/agola/internal/db" - "github.com/sorintlab/agola/internal/services/configstore/common" "github.com/sorintlab/agola/internal/services/configstore/readdb" "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" @@ -110,7 +109,8 @@ func (s *CommandHandler) CreateProjectGroup(ctx context.Context, projectGroup *t actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageProjectGroupFile(projectGroup.ID), + DataType: string(types.ConfigTypeProjectGroup), + ID: projectGroup.ID, Data: pcj, }, } @@ -185,7 +185,8 @@ func (s *CommandHandler) CreateProject(ctx context.Context, project *types.Proje actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageProjectFile(project.ID), + DataType: string(types.ConfigTypeProject), + ID: project.ID, Data: pcj, }, } @@ -231,7 +232,8 @@ func (s *CommandHandler) DeleteProject(ctx context.Context, projectRef string) e actions := []*wal.Action{ { ActionType: wal.ActionTypeDelete, - Path: common.StorageProjectFile(project.ID), + DataType: string(types.ConfigTypeProject), + ID: project.ID, }, } @@ -335,12 +337,14 @@ func (s *CommandHandler) CreateUser(ctx context.Context, req *CreateUserRequest) actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageUserFile(user.ID), + DataType: string(types.ConfigTypeUser), + ID: user.ID, Data: userj, }, { ActionType: wal.ActionTypePut, - Path: common.StorageProjectGroupFile(pg.ID), + DataType: string(types.ConfigTypeProjectGroup), + ID: pg.ID, Data: pgj, }, } @@ -380,7 +384,8 @@ func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error actions := []*wal.Action{ { ActionType: wal.ActionTypeDelete, - Path: common.StorageUserFile(user.ID), + DataType: string(types.ConfigTypeUser), + ID: user.ID, }, } @@ -474,7 +479,8 @@ func (s *CommandHandler) CreateUserLA(ctx context.Context, req *CreateUserLARequ actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageUserFile(user.ID), + DataType: string(types.ConfigTypeUser), + ID: user.ID, Data: userj, }, } @@ -532,7 +538,8 @@ func (s *CommandHandler) DeleteUserLA(ctx context.Context, userName, laID string actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageUserFile(user.ID), + DataType: string(types.ConfigTypeUser), + ID: user.ID, Data: userj, }, } @@ -611,7 +618,8 @@ func (s *CommandHandler) UpdateUserLA(ctx context.Context, req *UpdateUserLARequ actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageUserFile(user.ID), + DataType: string(types.ConfigTypeUser), + ID: user.ID, Data: userj, }, } @@ -666,7 +674,8 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageUserFile(user.ID), + DataType: string(types.ConfigTypeUser), + ID: user.ID, Data: userj, }, } @@ -715,7 +724,8 @@ func (s *CommandHandler) CreateRemoteSource(ctx context.Context, remoteSource *t actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageRemoteSourceFile(remoteSource.ID), + DataType: string(types.ConfigTypeRemoteSource), + ID: remoteSource.ID, Data: rsj, }, } @@ -755,7 +765,8 @@ func (s *CommandHandler) DeleteRemoteSource(ctx context.Context, remoteSourceNam actions := []*wal.Action{ { ActionType: wal.ActionTypeDelete, - Path: common.StorageRemoteSourceFile(remoteSource.ID), + DataType: string(types.ConfigTypeRemoteSource), + ID: remoteSource.ID, }, } @@ -814,12 +825,14 @@ func (s *CommandHandler) CreateOrg(ctx context.Context, org *types.Organization) actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageOrgFile(org.ID), + DataType: string(types.ConfigTypeOrg), + ID: org.ID, Data: orgj, }, { ActionType: wal.ActionTypePut, - Path: common.StorageProjectGroupFile(pg.ID), + DataType: string(types.ConfigTypeProjectGroup), + ID: pg.ID, Data: pgj, }, } @@ -861,14 +874,16 @@ func (s *CommandHandler) DeleteOrg(ctx context.Context, orgName string) error { actions := []*wal.Action{ { ActionType: wal.ActionTypeDelete, - Path: common.StorageOrgFile(org.ID), + DataType: string(types.ConfigTypeOrg), + ID: org.ID, }, } // delete all org projects for _, project := range projects { actions = append(actions, &wal.Action{ ActionType: wal.ActionTypeDelete, - Path: common.StorageProjectFile(project.ID), + DataType: string(types.ConfigTypeProject), + ID: project.ID, }) } @@ -931,7 +946,8 @@ func (s *CommandHandler) CreateSecret(ctx context.Context, secret *types.Secret) actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageSecretFile(secret.ID), + DataType: string(types.ConfigTypeSecret), + ID: secret.ID, Data: secretj, }, } @@ -977,7 +993,8 @@ func (s *CommandHandler) DeleteSecret(ctx context.Context, parentType types.Conf actions := []*wal.Action{ { ActionType: wal.ActionTypeDelete, - Path: common.StorageSecretFile(secret.ID), + DataType: string(types.ConfigTypeSecret), + ID: secret.ID, }, } @@ -1040,7 +1057,8 @@ func (s *CommandHandler) CreateVariable(ctx context.Context, variable *types.Var actions := []*wal.Action{ { ActionType: wal.ActionTypePut, - Path: common.StorageVariableFile(variable.ID), + DataType: string(types.ConfigTypeVariable), + ID: variable.ID, Data: variablej, }, } @@ -1085,7 +1103,8 @@ func (s *CommandHandler) DeleteVariable(ctx context.Context, parentType types.Co actions := []*wal.Action{ { ActionType: wal.ActionTypeDelete, - Path: common.StorageVariableFile(variable.ID), + DataType: string(types.ConfigTypeVariable), + ID: variable.ID, }, } diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 52afd48..144932d 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -178,9 +178,11 @@ func (r *ReadDB) SyncFromFiles() (string, error) { } f.Close() + configType, id := common.PathToTypeID(obj) action := &wal.Action{ ActionType: wal.ActionTypePut, - Path: obj, + DataType: string(configType), + ID: id, Data: data, } if err := r.applyAction(tx, action); err != nil { @@ -616,11 +618,9 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { } func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { - configType, ID := common.PathToTypeID(action.Path) - switch action.ActionType { case wal.ActionTypePut: - switch configType { + switch types.ConfigType(action.DataType) { case types.ConfigTypeUser: if err := r.insertUser(tx, action.Data); err != nil { return err @@ -652,40 +652,40 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { } case wal.ActionTypeDelete: - switch configType { + switch types.ConfigType(action.DataType) { case types.ConfigTypeUser: - r.log.Debugf("deleting user with id: %s", ID) - if err := r.deleteUser(tx, ID); err != nil { + r.log.Debugf("deleting user with id: %s", action.ID) + if err := r.deleteUser(tx, action.ID); err != nil { return err } case types.ConfigTypeOrg: - r.log.Debugf("deleting org with id: %s", ID) - if err := r.deleteOrg(tx, ID); err != nil { + r.log.Debugf("deleting org with id: %s", action.ID) + if err := r.deleteOrg(tx, action.ID); err != nil { return err } case types.ConfigTypeProjectGroup: - r.log.Debugf("deleting project group with id: %s", ID) - if err := r.deleteProjectGroup(tx, ID); err != nil { + r.log.Debugf("deleting project group with id: %s", action.ID) + if err := r.deleteProjectGroup(tx, action.ID); err != nil { return err } case types.ConfigTypeProject: - r.log.Debugf("deleting project with id: %s", ID) - if err := r.deleteProject(tx, ID); err != nil { + r.log.Debugf("deleting project with id: %s", action.ID) + if err := r.deleteProject(tx, action.ID); err != nil { return err } case types.ConfigTypeRemoteSource: - r.log.Debugf("deleting remote source with id: %s", ID) - if err := r.deleteRemoteSource(tx, ID); err != nil { + r.log.Debugf("deleting remote source with id: %s", action.ID) + if err := r.deleteRemoteSource(tx, action.ID); err != nil { return err } case types.ConfigTypeSecret: - r.log.Debugf("deleting secret with id: %s", ID) - if err := r.deleteSecret(tx, ID); err != nil { + r.log.Debugf("deleting secret with id: %s", action.ID) + if err := r.deleteSecret(tx, action.ID); err != nil { return err } case types.ConfigTypeVariable: - r.log.Debugf("deleting variable with id: %s", ID) - if err := r.deleteVariable(tx, ID); err != nil { + r.log.Debugf("deleting variable with id: %s", action.ID) + if err := r.deleteVariable(tx, action.ID); err != nil { return err } } diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index 05f4e9d..8fb86d1 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -20,11 +20,13 @@ import ( "time" uuid "github.com/satori/go.uuid" + "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" + "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" @@ -35,18 +37,20 @@ import ( ) type CommandHandler struct { - log *zap.SugaredLogger - e *etcd.Store - lts *objectstorage.ObjStorage - wal *wal.WalManager + log *zap.SugaredLogger + e *etcd.Store + readDB *readdb.ReadDB + lts *objectstorage.ObjStorage + wal *wal.WalManager } -func NewCommandHandler(logger *zap.Logger, e *etcd.Store, lts *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler { +func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, lts *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler { return &CommandHandler{ - log: logger.Sugar(), - e: e, - lts: lts, - wal: wal, + log: logger.Sugar(), + e: e, + readDB: readDB, + lts: lts, + wal: wal, } } @@ -293,9 +297,9 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg rc := rb.Rc rd := rb.Rd - c, cgt, err := store.LTSGetRunCounter(s.wal, run.Group) + c, cgt, err := s.getRunCounter(run.Group) s.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt)) - if err != nil && err != objectstorage.ErrNotExist { + if err != nil { return err } c++ @@ -450,3 +454,28 @@ func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string) return nil } + +func (s *CommandHandler) getRunCounter(group string) (uint64, *wal.ChangeGroupsUpdateToken, error) { + // use the first group dir after the root + pl := util.PathList(group) + if len(pl) < 2 { + return 0, nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group) + } + + var c uint64 + var cgt *wal.ChangeGroupsUpdateToken + err := s.readDB.Do(func(tx *db.Tx) error { + var err error + c, err = s.readDB.GetRunCounterLTS(tx, pl[1]) + if err != nil { + return err + } + cgt, err = s.readDB.GetChangeGroupsUpdateTokensLTS(tx, []string{"counter-" + pl[1]}) + return err + }) + if err != nil { + return 0, nil, err + } + + return c, cgt, nil +} diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/scheduler/common/common.go index 91b7f1f..24a6003 100644 --- a/internal/services/runservice/scheduler/common/common.go +++ b/internal/services/runservice/scheduler/common/common.go @@ -17,7 +17,6 @@ package common import ( "fmt" "path" - "strings" ) type ErrNotExist struct { @@ -43,8 +42,6 @@ var ( EtcdExecutorsDir = "executors" EtcdTasksDir = "tasks" - - EtcdLastIndexKey = "lastindex" ) func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } @@ -80,37 +77,30 @@ func StorageRunConfigFile(runID string) string { return path.Join(StorageRunsConfigDir, runID) } -func StorageCounterFile(group string) string { +func StorageRunCounterFile(group string) string { return path.Join(StorageCountersDir, group) } -type ConfigType int +type DataType string const ( - ConfigTypeRun ConfigType = iota + 1 - ConfigTypeRunData - ConfigTypeRunConfig - ConfigTypeCounter + DataTypeRun DataType = "run" + DataTypeRunData DataType = "rundata" + DataTypeRunConfig DataType = "runconfig" + DataTypeRunCounter DataType = "runcounter" ) -func PathToTypeID(p string) (ConfigType, string) { - var configType ConfigType - switch path.Dir(p) { - case StorageRunsDir: - configType = ConfigTypeRun - case StorageRunsDataDir: - configType = ConfigTypeRunData - case StorageRunsConfigDir: - configType = ConfigTypeRunConfig +func DataToPathFunc(dataType string, id string) string { + switch DataType(dataType) { + case DataTypeRun: + return StorageRunFile(id) + case DataTypeRunData: + return StorageRunDataFile(id) + case DataTypeRunConfig: + return StorageRunConfigFile(id) + case DataTypeRunCounter: + return StorageRunCounterFile(id) } - if strings.HasPrefix(p, StorageCountersDir+"/") { - configType = ConfigTypeCounter - } - - if configType == 0 { - panic(fmt.Errorf("cannot determine configtype for path: %q", p)) - } - - return configType, path.Base(p) + panic(fmt.Errorf("unknown data type %q", dataType)) } diff --git a/internal/services/runservice/scheduler/readdb/create.go b/internal/services/runservice/scheduler/readdb/create.go index 6a95fdf..26bc2fc 100644 --- a/internal/services/runservice/scheduler/readdb/create.go +++ b/internal/services/runservice/scheduler/readdb/create.go @@ -34,7 +34,11 @@ var Stmts = []string{ // committedwalsequence stores the last committed wal sequence "create table committedwalsequence_lts (seq varchar, PRIMARY KEY (seq))", + "create table changegrouprevision_lts (id varchar, revision varchar, PRIMARY KEY (id, revision))", + "create table run_lts (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", "create table rundata_lts (id varchar, data bytea, PRIMARY KEY (id))", + + "create table runcounter_lts (groupid varchar, counter bigint, PRIMARY KEY (groupid))", } diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/scheduler/readdb/readdb.go index b25ab38..dbd8f59 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/scheduler/readdb/readdb.go @@ -54,6 +54,7 @@ var ( // Use postgresql $ placeholder. It'll be converted to ? from the provided db functions sb = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) + // readdb tables based on etcd data revisionSelect = sb.Select("revision").From("revision") revisionInsert = sb.Insert("revision").Columns("revision") @@ -68,18 +69,23 @@ var ( changegrouprevisionSelect = sb.Select("id, revision").From("changegrouprevision") changegrouprevisionInsert = sb.Insert("changegrouprevision").Columns("id", "revision") - // LTS - + // readdb tables based on lts data revisionLTSSelect = sb.Select("revision").From("revision_lts") revisionLTSInsert = sb.Insert("revision_lts").Columns("revision") - committedwalsequenceLTSSelect = sb.Select("seq").From("committedwalsequence_lts") - committedwalsequenceLTSInsert = sb.Insert("committedwalsequence_lts").Columns("seq") - runLTSSelect = sb.Select("id", "grouppath", "phase").From("run_lts") runLTSInsert = sb.Insert("run_lts").Columns("id", "grouppath", "phase") rundataLTSInsert = sb.Insert("rundata_lts").Columns("id", "data") + + committedwalsequenceLTSSelect = sb.Select("seq").From("committedwalsequence_lts") + committedwalsequenceLTSInsert = sb.Insert("committedwalsequence_lts").Columns("seq") + + changegrouprevisionLTSSelect = sb.Select("id, revision").From("changegrouprevision_lts") + changegrouprevisionLTSInsert = sb.Insert("changegrouprevision_lts").Columns("id", "revision") + + runcounterLTSSelect = sb.Select("groupid", "counter").From("runcounter_lts") + runcounterLTSInsert = sb.Insert("runcounter_lts").Columns("groupid", "counter") ) type ReadDB struct { @@ -87,13 +93,22 @@ type ReadDB struct { dataDir string e *etcd.Store rdb *db.DB + lts *objectstorage.ObjStorage wal *wal.WalManager Initialized bool - m sync.Mutex + initLock sync.Mutex + + // dbWriteLock is used to have only one concurrent write transaction or sqlite + // will return a deadlock error (since we are using the unlock/notify api) if + // two write transactions acquire a lock on each other (we cannot specificy + // that a transaction will be a write tx so it'll start as a read tx, can + // acquire a lock on another read tx, when both become write tx the deadlock + // detector will return an error) + dbWriteLock sync.Mutex } -func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, wal *wal.WalManager) (*ReadDB, error) { +func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, lts *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) { if err := os.MkdirAll(dataDir, 0770); err != nil { return nil, err } @@ -111,6 +126,7 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. log: logger.Sugar(), e: e, dataDir: dataDir, + lts: lts, wal: wal, rdb: rdb, } @@ -119,14 +135,14 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. } func (r *ReadDB) SetInitialized(initialized bool) { - r.m.Lock() + r.initLock.Lock() r.Initialized = initialized - r.m.Unlock() + r.initLock.Unlock() } func (r *ReadDB) IsInitialized() bool { - r.m.Lock() - defer r.m.Unlock() + r.initLock.Lock() + defer r.initLock.Unlock() return r.Initialized } @@ -136,11 +152,10 @@ func (r *ReadDB) Initialize(ctx context.Context) error { if err := r.ResetDB(); err != nil { return errors.Wrapf(err, "failed to reset db") } - revision, err := r.SyncLTS(ctx) - if err != nil { + if err := r.SyncLTS(ctx); err != nil { return errors.Wrapf(err, "error syncing lts db") } - if err := r.SyncRDB(ctx, revision); err != nil { + if err := r.SyncRDB(ctx); err != nil { return errors.Wrapf(err, "error syncing run db") } return nil @@ -170,9 +185,10 @@ func (r *ReadDB) ResetDB() error { return nil } -func (r *ReadDB) SyncRDB(ctx context.Context, revision int64) error { +func (r *ReadDB) SyncRDB(ctx context.Context) error { err := r.rdb.Do(func(tx *db.Tx) error { // Do pagination to limit the number of keys per request + var revision int64 key := common.EtcdRunsDir var continuation *etcd.ListPagedContinuation @@ -240,114 +256,6 @@ func (r *ReadDB) SyncRDB(ctx context.Context, revision int64) error { return err } -func (r *ReadDB) SyncLTS(ctx context.Context) (int64, error) { - type indexData struct { - ID string - Phase types.RunPhase - Group string - } - - insertfunc := func(objs []string) error { - err := r.rdb.Do(func(tx *db.Tx) error { - for _, obj := range objs { - f, _, err := r.wal.ReadObject(obj, nil) - if err != nil { - if err == objectstorage.ErrNotExist { - r.log.Warnf("object %s disappeared, ignoring", obj) - } - return err - } - - dec := json.NewDecoder(f) - for { - var id *indexData - - err := dec.Decode(&id) - if err == io.EOF { - // all done - break - } - if err != nil { - f.Close() - return err - } - - run := &types.Run{ - ID: id.ID, - Group: id.Group, - Phase: id.Phase, - } - if err := r.insertRunLTS(tx, run, []byte{}); err != nil { - f.Close() - return err - } - } - f.Close() - } - return nil - }) - return err - } - - resp, err := r.e.Get(ctx, common.EtcdLastIndexKey, 0) - if err != nil { - return 0, err - } - indexDir := string(resp.Kvs[0].Value) - indexRevision := resp.Kvs[0].ModRevision - revision := resp.Header.Revision - - // TODO(sgotti) wait for wal changes to be at a revision >= revision - walChangesRevision, err := r.wal.ChangesCurrentRevision() - if err != nil { - return 0, err - } - if walChangesRevision < indexRevision { - return 0, errors.Errorf("wal changes revision %q is lower than index revision %q", walChangesRevision, revision) - } - - r.log.Infof("indexDir: %s", indexDir) - - objs := []string{} - count := 0 - - doneCh := make(chan struct{}) - defer close(doneCh) - for object := range r.wal.List(path.Join(common.StorageRunsIndexesDir, indexDir)+"/", "", true, doneCh) { - if object.Err != nil { - return 0, object.Err - } - r.log.Infof("path: %s", object.Path) - - objs = append(objs, object.Path) - - if count > paginationSize { - if err := insertfunc(objs); err != nil { - return 0, err - } - count = 0 - objs = []string{} - } else { - count++ - } - } - if err := insertfunc(objs); err != nil { - return 0, err - } - - resp, err = r.e.Get(ctx, common.EtcdLastIndexKey, 0) - if err != nil { - return 0, err - } - curIndexDir := string(resp.Kvs[0].Value) - - if curIndexDir != indexDir { - return 0, errors.Errorf("index dir has changed, used %s, current: %s", indexDir, curIndexDir) - } - - return revision, nil -} - func (r *ReadDB) Run(ctx context.Context) error { revision, err := r.GetRevision() if err != nil { @@ -367,6 +275,7 @@ func (r *ReadDB) Run(ctx context.Context) error { } r.SetInitialized(true) + errCh := make(chan error) for { for { initialized := r.IsInitialized() @@ -383,16 +292,39 @@ func (r *ReadDB) Run(ctx context.Context) error { time.Sleep(1 * time.Second) } - r.log.Infof("starting HandleEvents") - if err := r.HandleEvents(ctx); err != nil { - r.log.Errorf("handleevents err: %+v", err) - } + ctx, cancel := context.WithCancel(ctx) + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + r.log.Infof("starting HandleEvents") + if err := r.HandleEvents(ctx); err != nil { + r.log.Errorf("handleevents err: %+v", err) + errCh <- err + } + wg.Done() + }() + + wg.Add(1) + go func() { + r.log.Infof("starting HandleEventsLTS") + if err := r.HandleEventsLTS(ctx); err != nil { + r.log.Errorf("handleevents lts err: %+v", err) + errCh <- err + } + wg.Done() + }() select { case <-ctx.Done(): r.log.Infof("readdb exiting") + cancel() r.rdb.Close() return nil + case <-errCh: + // cancel context and wait for the all the goroutines to exit + cancel() + wg.Wait() } time.Sleep(1 * time.Second) @@ -458,6 +390,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { // a single transaction for every response (every response contains all the // events happened in an etcd revision). + r.dbWriteLock.Lock() err = r.rdb.Do(func(tx *db.Tx) error { for _, ev := range wresp.Events { if err := r.handleEvent(tx, ev, &wresp); err != nil { @@ -470,6 +403,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { } return nil }) + r.dbWriteLock.Unlock() if err != nil { return err } @@ -514,6 +448,9 @@ func (r *ReadDB) handleRunEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdcl } // Run has been deleted from etcd, this means that it was stored in the LTS + // TODO(sgotti) this is here just to avoid a window where the run is not in + // run table and in the run_lts table but should be changed/removed when we'll + // implement run removal run, err := store.LTSGetRun(r.wal, runID) if err != nil { return err @@ -564,6 +501,495 @@ func (r *ReadDB) handleChangeGroupEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp return nil } +func (r *ReadDB) SyncLTS(ctx context.Context) error { + // get the last committed storage wal sequence saved in the rdb + curWalSeq := "" + err := r.rdb.Do(func(tx *db.Tx) error { + var err error + curWalSeq, err = r.GetCommittedWalSequenceLTS(tx) + if err != nil { + return err + } + return nil + }) + if err != nil { + return err + } + + lastCommittedStorageWal, _, err := r.wal.LastCommittedStorageWal(ctx) + if err != nil { + return err + } + + doFullSync := false + if curWalSeq == "" { + doFullSync = true + r.log.Warn("no startWalSeq in db, doing a full sync") + } else { + ok, err := r.wal.HasLtsWal(curWalSeq) + if err != nil { + return err + } + if !ok { + r.log.Warnf("no wal with seq %q in lts, doing a full sync", curWalSeq) + doFullSync = true + } + + // if the epoch of the wals has changed this means etcd has been reset. If so + // we should do a full resync since we are saving in the rdb also data that + // was not yet committed to lts so we should have the rdb ahead of the current + // lts data + // TODO(sgotti) improve this to avoid doing a full resync + curWalSequence, err := sequence.Parse(curWalSeq) + if err != nil { + return err + } + curWalEpoch := curWalSequence.Epoch + + lastCommittedStorageWalSequence, err := sequence.Parse(lastCommittedStorageWal) + if err != nil { + return err + } + if curWalEpoch != lastCommittedStorageWalSequence.Epoch { + r.log.Warnf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, doing a full sync", curWalEpoch, lastCommittedStorageWalSequence.Epoch) + doFullSync = true + } + } + + if doFullSync { + r.log.Infof("doing a full sync from lts files") + if err := r.ResetDB(); err != nil { + return err + } + + var err error + curWalSeq, err = r.SyncFromDump() + if err != nil { + return err + } + } + + r.log.Infof("startWalSeq: %s", curWalSeq) + + // Sync from wals + // sync from lts until the current known lastCommittedStorageWal in etcd + // since wals are first committed to lts and then in etcd we would like to + // avoid to store in rdb something that is not yet marked as committedstorage + // in etcd + curWalSeq, err = r.SyncFromWals(curWalSeq, lastCommittedStorageWal) + if err != nil { + return errors.Wrap(err, "failed to sync from wals") + } + + // Get the first available wal from etcd and check that our current walseq + // from wals on lts is >= + // if not (this happens when syncFromWals takes some time and in the meantime + // many new wals are written, the next sync should be faster and able to continue + firstAvailableWalData, revision, err := r.wal.FirstAvailableWalData(ctx) + if err != nil { + return errors.Wrap(err, "failed to get first available wal data") + } + r.log.Infof("firstAvailableWalData: %s", util.Dump(firstAvailableWalData)) + r.log.Infof("revision: %d", revision) + if firstAvailableWalData == nil { + if curWalSeq != "" { + // this happens if etcd has been reset + return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq) + } + } + if firstAvailableWalData != nil { + if curWalSeq < firstAvailableWalData.WalSequence { + return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence) + } + } + + err = r.rdb.Do(func(tx *db.Tx) error { + if err := insertRevisionLTS(tx, revision); err != nil { + return err + } + + // use the same revision as previous operation + for walElement := range r.wal.ListEtcdWals(ctx, revision) { + if walElement.Err != nil { + return err + } + if walElement.WalData.WalSequence <= curWalSeq { + continue + } + + if err := r.insertCommittedWalSequenceLTS(tx, walElement.WalData.WalSequence); err != nil { + return err + } + + r.log.Debugf("applying wal to db") + if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil { + return err + } + } + + return nil + }) + + return err +} + +func (r *ReadDB) SyncFromDump() (string, error) { + type indexHeader struct { + LastWalSequence string + } + type indexData struct { + DataType string + Data json.RawMessage + } + + type indexDataRun struct { + ID string + Phase types.RunPhase + Group string + } + + type indexDataRunCounter struct { + Group string + Counter uint64 + } + + var iheader *indexHeader + insertfunc := func(ids []*indexData) error { + err := r.rdb.Do(func(tx *db.Tx) error { + for _, id := range ids { + switch common.DataType(id.DataType) { + case common.DataTypeRun: + var ir *indexDataRun + if err := json.Unmarshal(id.Data, &ir); err != nil { + return err + } + run := &types.Run{ + ID: ir.ID, + Group: ir.Group, + Phase: ir.Phase, + } + r.log.Infof("inserting run %q", run.ID) + if err := r.insertRunLTS(tx, run, []byte{}); err != nil { + return err + } + case common.DataTypeRunCounter: + var irc *indexDataRunCounter + if err := json.Unmarshal(id.Data, &irc); err != nil { + return err + } + r.log.Infof("inserting run counter %q, c: %d", irc.Group, irc.Counter) + if err := r.insertRunCounterLTS(tx, irc.Group, irc.Counter); err != nil { + return err + } + } + } + return nil + }) + return err + } + + doneCh := make(chan struct{}) + defer close(doneCh) + + // get last dump + var dumpPath string + for object := range r.lts.List(path.Join(common.StorageRunsIndexesDir)+"/", "", true, doneCh) { + if object.Err != nil { + return "", object.Err + } + r.log.Infof("path: %s", object.Path) + + dumpPath = object.Path + } + if dumpPath == "" { + return "", nil + } + + f, err := r.lts.ReadObject(dumpPath) + if err != nil { + if err == objectstorage.ErrNotExist { + r.log.Warnf("object %s disappeared, ignoring", dumpPath) + } + return "", err + } + defer f.Close() + + dec := json.NewDecoder(f) + + if err := dec.Decode(&iheader); err != nil { + return "", err + } + count := 0 + ids := make([]*indexData, 0, paginationSize) + for { + var id *indexData + + err := dec.Decode(&id) + if err == io.EOF { + // all done + break + } + if err != nil { + f.Close() + return "", err + } + ids = append(ids, id) + + if count > paginationSize { + if err := insertfunc(ids); err != nil { + return "", err + } + count = 0 + ids = make([]*indexData, 0, paginationSize) + } else { + count++ + } + } + if err := insertfunc(ids); err != nil { + return "", err + } + + return iheader.LastWalSequence, nil +} + +func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { + insertfunc := func(walFiles []*wal.WalFile) error { + err := r.rdb.Do(func(tx *db.Tx) error { + for _, walFile := range walFiles { + walFilef, err := r.wal.ReadWal(walFile.WalSequence) + if err != nil { + return err + } + dec := json.NewDecoder(walFilef) + var header *wal.WalHeader + if err = dec.Decode(&header); err != nil && err != io.EOF { + walFilef.Close() + return err + } + walFilef.Close() + if err := r.insertCommittedWalSequenceLTS(tx, walFile.WalSequence); err != nil { + return err + } + if err := r.applyWal(tx, header.WalDataFileID); err != nil { + return err + } + } + return nil + }) + return err + } + + lastWalSeq := startWalSeq + walFiles := []*wal.WalFile{} + count := 0 + + doneCh := make(chan struct{}) + defer close(doneCh) + + for walFile := range r.wal.ListLtsWals(startWalSeq) { + if walFile.Err != nil { + return "", walFile.Err + } + + walFiles = append(walFiles, walFile) + lastWalSeq = walFile.WalSequence + + if count > 100 { + if err := insertfunc(walFiles); err != nil { + return "", err + } + count = 0 + walFiles = []*wal.WalFile{} + } else { + count++ + } + } + if err := insertfunc(walFiles); err != nil { + return "", err + } + + return lastWalSeq, nil +} + +func (r *ReadDB) HandleEventsLTS(ctx context.Context) error { + var revision int64 + err := r.rdb.Do(func(tx *db.Tx) error { + err := tx.QueryRow("select revision from revision order by revision desc limit 1").Scan(&revision) + if err != nil { + if err == sql.ErrNoRows { + revision = 0 + } else { + return err + } + } + return nil + }) + if err != nil { + return err + } + + wctx, cancel := context.WithCancel(ctx) + defer cancel() + r.log.Infof("revision: %d", revision) + wch := r.wal.Watch(wctx, revision+1) + for we := range wch { + r.log.Debugf("we: %s", util.Dump(we)) + if we.Err != nil { + err := we.Err + if err == wal.ErrCompacted { + r.log.Warnf("required events already compacted, reinitializing readdb") + r.Initialized = false + return nil + } + return errors.Wrapf(err, "watch error") + } + + // a single transaction for every response (every response contains all the + // events happened in an etcd revision). + r.dbWriteLock.Lock() + err = r.rdb.Do(func(tx *db.Tx) error { + + // if theres a wal seq epoch change something happened to etcd, usually (if + // the user hasn't messed up with etcd keys) this means etcd has been reset + // in such case we should resync from the lts state to ensure we apply all the + // wal marked as committedstorage (since they could have been lost from etcd) + curWalSeq, err := r.GetCommittedWalSequenceLTS(tx) + if err != nil { + return err + } + r.log.Debugf("curWalSeq: %q", curWalSeq) + if curWalSeq != "" && we.WalData != nil { + curWalSequence, err := sequence.Parse(curWalSeq) + if err != nil { + return err + } + curWalEpoch := curWalSequence.Epoch + + weWalSequence, err := sequence.Parse(we.WalData.WalSequence) + if err != nil { + return err + } + r.log.Infof("we.WalData.WalSequence: %q", we.WalData.WalSequence) + weWalEpoch := weWalSequence.Epoch + if curWalEpoch != weWalEpoch { + r.Initialized = false + return errors.Errorf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from lts", curWalEpoch, weWalEpoch) + } + } + + if err := r.handleEventLTS(tx, we); err != nil { + return err + } + + if err := insertRevisionLTS(tx, we.Revision); err != nil { + return err + } + return nil + }) + r.dbWriteLock.Unlock() + if err != nil { + return err + } + } + + return nil +} + +func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { + walFile, err := r.wal.ReadWalData(walDataFileID) + if err != nil { + return errors.Wrapf(err, "cannot read wal data file %q", walDataFileID) + } + defer walFile.Close() + + dec := json.NewDecoder(walFile) + for { + var action *wal.Action + + err := dec.Decode(&action) + if err == io.EOF { + // all done + break + } + if err != nil { + return errors.Wrapf(err, "failed to decode wal file") + } + + if err := r.applyAction(tx, action); err != nil { + return err + } + } + + return nil +} + +func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { + r.log.Infof("action: dataType: %s, ID: %s", action.DataType, action.ID) + switch action.ActionType { + case wal.ActionTypePut: + switch action.DataType { + case string(common.DataTypeRun): + var run *types.Run + if err := json.Unmarshal(action.Data, &run); err != nil { + return err + } + if err := r.insertRunLTS(tx, run, action.Data); err != nil { + return err + } + case string(common.DataTypeRunCounter): + var runCounter uint64 + if err := json.Unmarshal(action.Data, &runCounter); err != nil { + return err + } + r.log.Infof("inserting run counter %q, c: %d", action.ID, runCounter) + if err := r.insertRunCounterLTS(tx, action.ID, runCounter); err != nil { + return err + } + } + + case wal.ActionTypeDelete: + switch action.DataType { + case string(common.DataTypeRun): + case string(common.DataTypeRunCounter): + } + } + + return nil +} + +func (r *ReadDB) handleEventLTS(tx *db.Tx, we *wal.WatchElement) error { + //r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) + //key := string(ev.Kv.Key) + + if err := r.handleWalEvent(tx, we); err != nil { + return err + } + return nil +} + +func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error { + for cgName, cgRev := range we.ChangeGroupsRevisions { + if err := r.insertChangeGroupRevisionLTS(tx, cgName, cgRev); err != nil { + return err + } + } + + if we.WalData != nil { + // update readdb only when the wal has been committed to lts + if we.WalData.WalStatus != wal.WalStatusCommitted { + return nil + } + + if err := r.insertCommittedWalSequenceLTS(tx, we.WalData.WalSequence); err != nil { + return err + } + + r.log.Debugf("applying wal to db") + return r.applyWal(tx, we.WalData.WalDataFileID) + } + return nil +} + func (r *ReadDB) Do(f func(tx *db.Tx) error) error { if !r.IsInitialized() { return errors.Errorf("db not initialized") @@ -588,6 +1014,23 @@ func insertRevision(tx *db.Tx, revision int64) error { return nil } +func insertRevisionLTS(tx *db.Tx, revision int64) error { + // poor man insert or update that works because transaction isolation level is serializable + if _, err := tx.Exec("delete from revision_lts"); err != nil { + return errors.Wrap(err, "failed to delete revision") + } + // TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types... + //q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql() + q, args, err := revisionLTSInsert.Values(revision).ToSql() + if err != nil { + return errors.Wrap(err, "failed to build query") + } + if _, err = tx.Exec(q, args...); err != nil { + return errors.WithStack(err) + } + return nil +} + func insertRun(tx *db.Tx, run *types.Run, data []byte) error { // add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02) groupPath := run.Group @@ -691,7 +1134,8 @@ func (r *ReadDB) getRevision(tx *db.Tx) (int64, error) { return 0, errors.Wrap(err, "failed to build query") } - if err := tx.QueryRow(q, args...).Scan(&revision); err == sql.ErrNoRows { + err = tx.QueryRow(q, args...).Scan(&revision) + if err == sql.ErrNoRows { return 0, nil } return revision, err @@ -1000,3 +1444,192 @@ func scanChangeGroupsRevision(rows *sql.Rows) (types.ChangeGroupsRevisions, erro } return changegroups, nil } + +func (r *ReadDB) insertCommittedWalSequenceLTS(tx *db.Tx, seq string) error { + r.log.Infof("insert seq: %s", seq) + // poor man insert or update that works because transaction isolation level is serializable + if _, err := tx.Exec("delete from committedwalsequence_lts"); err != nil { + return errors.Wrap(err, "failed to delete committedwalsequence") + } + q, args, err := committedwalsequenceLTSInsert.Values(seq).ToSql() + if err != nil { + return errors.Wrap(err, "failed to build query") + } + if _, err = tx.Exec(q, args...); err != nil { + return errors.WithStack(err) + } + return nil +} + +func (r *ReadDB) GetCommittedWalSequenceLTS(tx *db.Tx) (string, error) { + var seq string + + q, args, err := committedwalsequenceLTSSelect.OrderBy("seq").Limit(1).ToSql() + r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) + if err != nil { + return "", errors.Wrap(err, "failed to build query") + } + + err = tx.QueryRow(q, args...).Scan(&seq) + if err == sql.ErrNoRows { + return "", nil + } + return seq, err +} + +func (r *ReadDB) insertChangeGroupRevisionLTS(tx *db.Tx, changegroup string, revision int64) error { + r.log.Infof("insertChangeGroupRevision: %s %d", changegroup, revision) + + // poor man insert or update that works because transaction isolation level is serializable + if _, err := tx.Exec("delete from changegrouprevision_lts where id = $1", changegroup); err != nil { + return errors.Wrap(err, "failed to delete run") + } + // insert only if revision > 0 + if revision > 0 { + q, args, err := changegrouprevisionLTSInsert.Values(changegroup, revision).ToSql() + if err != nil { + return errors.Wrap(err, "failed to build query") + } + if _, err = tx.Exec(q, args...); err != nil { + return err + } + } + return nil +} + +func (r *ReadDB) GetChangeGroupsUpdateTokensLTS(tx *db.Tx, groups []string) (*wal.ChangeGroupsUpdateToken, error) { + s := changegrouprevisionLTSSelect.Where(sq.Eq{"id": groups}) + q, args, err := s.ToSql() + r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) + if err != nil { + return nil, errors.Wrap(err, "failed to build query") + } + cgr, err := fetchChangeGroupsRevisionLTS(tx, q, args...) + if err != nil { + return nil, err + } + + revision, err := r.getRevision(tx) + if err != nil { + return nil, err + } + + // for non existing changegroups use a changegroup with revision = 0 + for _, g := range groups { + if _, ok := cgr[g]; !ok { + cgr[g] = 0 + } + } + + return &wal.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil +} + +func fetchChangeGroupsRevisionLTS(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) { + rows, err := tx.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanChangeGroupsRevision(rows) +} + +func scanChangeGroupsRevisionLTS(rows *sql.Rows) (map[string]int64, error) { + changegroups := map[string]int64{} + for rows.Next() { + var ( + id string + revision int64 + ) + if err := rows.Scan(&id, &revision); err != nil { + return nil, errors.Wrap(err, "failed to scan rows") + } + changegroups[id] = revision + } + if err := rows.Err(); err != nil { + return nil, err + } + return changegroups, nil +} + +func (r *ReadDB) insertRunCounterLTS(tx *db.Tx, group string, counter uint64) error { + // poor man insert or update that works because transaction isolation level is serializable + if _, err := tx.Exec("delete from runcounter_lts where groupid = $1", group); err != nil { + return errors.Wrap(err, "failed to delete revision") + } + // TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types... + //q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql() + q, args, err := runcounterLTSInsert.Values(group, counter).ToSql() + if err != nil { + return errors.Wrap(err, "failed to build query") + } + if _, err = tx.Exec(q, args...); err != nil { + return errors.WithStack(err) + } + return nil +} + +func (r *ReadDB) GetRunCounterLTS(tx *db.Tx, group string) (uint64, error) { + var g string + var counter uint64 + + q, args, err := runcounterLTSSelect.Where(sq.Eq{"groupid": group}).ToSql() + r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) + if err != nil { + return 0, errors.Wrap(err, "failed to build query") + } + + err = tx.QueryRow(q, args...).Scan(&g, &counter) + if err == sql.ErrNoRows { + return 0, nil + } + return counter, err +} + +func (r *ReadDB) GetRunCountersLTS(tx *db.Tx, start string, limit int) ([]*types.RunCounter, error) { + s := runcounterLTSSelect.Where(sq.Gt{"groupid": start}) + if limit > 0 { + s = s.Limit(uint64(limit)) + } + s = s.OrderBy("groupid asc") + + q, args, err := s.ToSql() + r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) + if err != nil { + return nil, errors.Wrap(err, "failed to build query") + } + + return fetchRunCounters(tx, q, args...) +} + +func fetchRunCounters(tx *db.Tx, q string, args ...interface{}) ([]*types.RunCounter, error) { + rows, err := tx.Query(q, args...) + if err != nil { + return nil, err + } + defer rows.Close() + return scanRunCounters(rows) +} + +func scanRunCounter(rows *sql.Rows) (*types.RunCounter, error) { + r := &types.RunCounter{} + if err := rows.Scan(&r.Group, &r.Counter); err != nil { + return nil, errors.Wrap(err, "failed to scan rows") + } + + return r, nil +} + +func scanRunCounters(rows *sql.Rows) ([]*types.RunCounter, error) { + runCounters := []*types.RunCounter{} + for rows.Next() { + r, err := scanRunCounter(rows) + if err != nil { + return nil, err + } + runCounters = append(runCounters, r) + } + if err := rows.Err(); err != nil { + return nil, err + } + return runCounters, nil +} diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 4907f9b..552f658 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -85,12 +85,12 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error { log.Debugf("run: %s", util.Dump(r)) rc, err := store.LTSGetRunConfig(s.wal, r.ID) if err != nil { - return errors.Wrapf(err, "cannot get run config %q from etcd", r.ID) + return errors.Wrapf(err, "cannot get run config %q", r.ID) } log.Debugf("rc: %s", util.Dump(rc)) rd, err := store.LTSGetRunData(s.wal, r.ID) if err != nil { - return errors.Wrapf(err, "cannot get run data %q from etcd", r.ID) + return errors.Wrapf(err, "cannot get run data %q", r.ID) } log.Debugf("rd: %s", util.Dump(rd)) @@ -1077,25 +1077,9 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error { return err } - resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) - // if lastIndexKey doesn't exist return an error - if err != nil { - return err - } - lastIndexRevision := resp.Kvs[0].ModRevision - lastIndexDir := string(resp.Kvs[0].Value) + actions := append([]*wal.Action{ra}) - indexActions, err := s.additionalActions(lastIndexDir, ra) - if err != nil { - return err - } - - actions := append([]*wal.Action{ra}, indexActions...) - - cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)} - then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, lastIndexDir)} - - if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil { + if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil { return err } @@ -1107,53 +1091,6 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error { return nil } -func (s *Scheduler) additionalActions(indexDir string, action *wal.Action) ([]*wal.Action, error) { - type indexData struct { - ID string - Group string - Phase types.RunPhase - } - - configType, _ := common.PathToTypeID(action.Path) - - var actionType wal.ActionType - - switch action.ActionType { - case wal.ActionTypePut: - actionType = wal.ActionTypePut - case wal.ActionTypeDelete: - actionType = wal.ActionTypeDelete - } - - switch configType { - case common.ConfigTypeRun: - var run *types.Run - if err := json.Unmarshal(action.Data, &run); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal run") - } - - data := []byte{} - index := path.Join(common.StorageRunsIndexesDir, indexDir, "added", run.ID) - id := &indexData{ID: run.ID, Group: run.Group, Phase: run.Phase} - idj, err := json.Marshal(id) - if err != nil { - return nil, err - } - data = append(data, idj...) - - actions := []*wal.Action{ - &wal.Action{ - ActionType: actionType, - Path: index, - Data: data, - }, - } - return actions, nil - } - - return []*wal.Action{}, nil -} - func (s *Scheduler) dumpLTSLoop(ctx context.Context) { for { log.Debugf("lts dump loop") @@ -1174,37 +1111,50 @@ func (s *Scheduler) dumpLTSLoop(ctx context.Context) { } func (s *Scheduler) dumpLTS(ctx context.Context) error { + type indexHeader struct { + LastWalSequence string + } type indexData struct { + DataType string + Data interface{} + } + + type indexDataRun struct { ID string Group string Phase types.RunPhase } - - resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) - // if lastIndexKey doesn't exist return an error - if err != nil { - return err + type indexDataRunCounter struct { + Group string + Counter uint64 } - lastIndexRevision := resp.Kvs[0].ModRevision - revision := resp.Header.Revision indexDir := strconv.FormatInt(time.Now().UnixNano(), 10) - readdbRevision, err := s.readDB.GetRevision() + var lastWalSequence string + err := s.readDB.Do(func(tx *db.Tx) error { + var err error + lastWalSequence, err = s.readDB.GetCommittedWalSequenceLTS(tx) + return err + }) if err != nil { return err } - if readdbRevision < revision { - return errors.Errorf("readdb revision %d is lower than index revision %d", readdbRevision, revision) - } - runs := []*readdb.RunData{} + data := []byte{} + iheader := &indexHeader{LastWalSequence: lastWalSequence} + ihj, err := json.Marshal(iheader) + if err != nil { + return err + } + data = append(data, ihj...) + var lastRunID string stop := false for { err := s.readDB.Do(func(tx *db.Tx) error { var err error - lruns, err := s.readDB.GetRunsFilteredLTS(tx, nil, false, nil, lastRunID, 10000, types.SortOrderDesc) + lruns, err := s.readDB.GetRunsFilteredLTS(tx, nil, false, nil, lastRunID, 1000, types.SortOrderDesc) if err != nil { return err } @@ -1213,7 +1163,14 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error { } else { lastRunID = lruns[len(lruns)-1].ID } - runs = append(runs, lruns...) + for _, run := range lruns { + id := &indexData{DataType: string(common.DataTypeRun), Data: indexDataRun{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)}} + idj, err := json.Marshal(id) + if err != nil { + return err + } + data = append(data, idj...) + } return nil }) if err != nil { @@ -1224,30 +1181,41 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error { } } - data := []byte{} - for _, run := range runs { - id := &indexData{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)} - idj, err := json.Marshal(id) + var lastGroup string + stop = false + for { + err := s.readDB.Do(func(tx *db.Tx) error { + var err error + counters, err := s.readDB.GetRunCountersLTS(tx, lastGroup, 1000) + if err != nil { + return err + } + if len(counters) == 0 { + stop = true + } else { + lastGroup = counters[len(counters)-1].Group + } + for _, counter := range counters { + id := &indexData{DataType: string(common.DataTypeRunCounter), Data: indexDataRunCounter{Group: counter.Group, Counter: counter.Counter}} + idj, err := json.Marshal(id) + if err != nil { + return err + } + data = append(data, idj...) + } + return nil + }) if err != nil { return err } - data = append(data, idj...) + if stop { + break + } } index := path.Join(common.StorageRunsIndexesDir, indexDir, "all") - cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)} - then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, indexDir)} - - actions := []*wal.Action{ - &wal.Action{ - ActionType: wal.ActionTypePut, - Path: index, - Data: data, - }, - } - - if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil { + if err = s.lts.WriteObject(index, bytes.NewReader(data)); err != nil { return err } @@ -1279,18 +1247,12 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error { Phase types.RunPhase } - resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) - // if lastIndexKey doesn't exist return an error - if err != nil { - return err - } - lastIndexDir := string(resp.Kvs[0].Value) - - // collect all object that don't pertain to the lastIndexDir + // collect all old indexes objects := []string{} doneCh := make(chan struct{}) defer close(doneCh) - for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { + var indexPath string + for object := range s.lts.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { if object.Err != nil { return object.Err } @@ -1299,22 +1261,20 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error { if len(h) < 2 { return errors.Errorf("wrong index dir path %q", object.Path) } - indexDir := h[1] - if indexDir != lastIndexDir { - objects = append(objects, object.Path) + curIndexPath := object.Path + if curIndexPath > indexPath { + if indexPath != "" { + objects = append(objects, indexPath) + } + indexPath = curIndexPath + } else { + objects = append(objects, curIndexPath) } } - actions := make([]*wal.Action, len(objects)) - for i, object := range objects { - actions[i] = &wal.Action{ - ActionType: wal.ActionTypeDelete, - Path: object, - } - } - - if len(actions) > 0 { - if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil { + for _, object := range objects { + if err := s.lts.DeleteObject(object); err != nil { + log.Errorf("object: %s, err: %v", object, err) return err } } @@ -1352,8 +1312,9 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule } walConf := &wal.WalManagerConfig{ - E: e, - Lts: lts, + E: e, + Lts: lts, + DataToPathFunc: common.DataToPathFunc, } wal, err := wal.NewWalManager(ctx, logger, walConf) if err != nil { @@ -1361,13 +1322,13 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule } s.wal = wal - readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, wal) + readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, lts, wal) if err != nil { return nil, err } s.readDB = readDB - ch := command.NewCommandHandler(logger, e, lts, wal) + ch := command.NewCommandHandler(logger, e, readDB, lts, wal) s.ch = ch return s, nil @@ -1385,41 +1346,6 @@ func (s *Scheduler) InitEtcd(ctx context.Context) error { return etcd.FromEtcdError(err) } - // Populate lastIndexDir if empty - doneCh := make(chan struct{}) - defer close(doneCh) - - _, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0) - if err != nil && err != etcd.ErrKeyNotFound { - return err - } - // EtcdLastIndexKey already exist - if err == nil { - return nil - } - - var indexDir string - // take the last (greater) indexdir - for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { - if object.Err != nil { - return object.Err - } - - h := util.PathList(object.Path) - if len(h) < 2 { - return errors.Errorf("wrong index dir path %q", object.Path) - } - indexDir = h[1] - } - - // if an indexDir doesn't exist in lts then initialize - if indexDir == "" { - indexDir = strconv.FormatInt(time.Now().UnixNano(), 10) - if _, err := s.e.AtomicPut(ctx, common.EtcdLastIndexKey, []byte(indexDir), 0, nil); err != nil { - return err - } - } - return nil } diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index 46f93ec..798aafe 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -88,32 +88,11 @@ func LTSRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string paths := []string{} subGroups := LTSSubGroups(group) for _, subGroup := range subGroups { - paths = append(paths, common.StorageCounterFile(subGroup)) + paths = append(paths, common.StorageRunCounterFile(subGroup)) } return paths } -func LTSGetRunCounter(wal *wal.WalManager, group string) (uint64, *wal.ChangeGroupsUpdateToken, error) { - // use the first group dir after the root - pl := util.PathList(group) - if len(pl) < 2 { - return 0, nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group) - } - runCounterPath := common.StorageCounterFile(pl[1]) - rcf, cgt, err := wal.ReadObject(runCounterPath, []string{"counter-" + pl[1]}) - if err != nil { - return 0, cgt, err - } - defer rcf.Close() - d := json.NewDecoder(rcf) - var c uint64 - if err := d.Decode(&c); err != nil { - return 0, nil, err - } - - return c, cgt, nil -} - func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) { // use the first group dir after the root pl := util.PathList(group) @@ -128,7 +107,8 @@ func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa action := &wal.Action{ ActionType: wal.ActionTypePut, - Path: common.StorageCounterFile(pl[1]), + DataType: string(common.DataTypeRunCounter), + ID: pl[1], Data: cj, } @@ -175,7 +155,8 @@ func LTSSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) { action := &wal.Action{ ActionType: wal.ActionTypePut, - Path: common.StorageRunConfigFile(rc.ID), + DataType: string(common.DataTypeRunConfig), + ID: rc.ID, Data: rcj, } @@ -206,7 +187,8 @@ func LTSSaveRunDataAction(rd *types.RunData) (*wal.Action, error) { action := &wal.Action{ ActionType: wal.ActionTypePut, - Path: common.StorageRunDataFile(rd.ID), + DataType: string(common.DataTypeRunData), + ID: rd.ID, Data: rdj, } @@ -238,7 +220,8 @@ func LTSSaveRunAction(r *types.Run) (*wal.Action, error) { action := &wal.Action{ ActionType: wal.ActionTypePut, - Path: common.StorageRunFile(r.ID), + DataType: string(common.DataTypeRun), + ID: r.ID, Data: rj, } diff --git a/internal/services/runservice/types/types.go b/internal/services/runservice/types/types.go index 1333a94..6d1d4b2 100644 --- a/internal/services/runservice/types/types.go +++ b/internal/services/runservice/types/types.go @@ -23,12 +23,6 @@ import ( "github.com/sorintlab/agola/internal/util" ) -type RunBundle struct { - Run *Run - Rc *RunConfig - Rd *RunData -} - type SortOrder int const ( @@ -36,6 +30,17 @@ const ( SortOrderDesc ) +type RunBundle struct { + Run *Run + Rc *RunConfig + Rd *RunData +} + +type RunCounter struct { + Group string + Counter uint64 +} + type RunPhase string const ( diff --git a/internal/wal/changes.go b/internal/wal/changes.go index bed5480..4bab62a 100644 --- a/internal/wal/changes.go +++ b/internal/wal/changes.go @@ -180,12 +180,16 @@ func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revi } func (w *WalManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) { + dataPath := w.dataToPathFunc(action.DataType, action.ID) + if dataPath == "" { + return + } switch action.ActionType { case ActionTypePut: - w.changes.addPut(action.Path, walSequence, revision) + w.changes.addPut(dataPath, walSequence, revision) case ActionTypeDelete: - w.changes.addDelete(action.Path, walSequence, revision) + w.changes.addDelete(dataPath, walSequence, revision) } if w.changes.actions[walSequence] == nil { w.changes.actions[walSequence] = []*Action{} diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 75355b7..96d175a 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -114,7 +114,8 @@ const ( type Action struct { ActionType ActionType - Path string + DataType string + ID string Data []byte } @@ -213,9 +214,12 @@ func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *Cha if ok { for _, action := range actions { - if action.ActionType == ActionTypePut && action.Path == p { - w.log.Debugf("reading file from wal: %q", action.Path) - return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil + if action.ActionType == ActionTypePut { + dataPath := w.dataToPathFunc(action.DataType, action.ID) + if dataPath == p { + w.log.Debugf("reading file from wal: %q", dataPath) + return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil + } } } return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq) @@ -884,7 +888,11 @@ func (w *WalManager) checkpoint(ctx context.Context) error { } func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error { - path := w.toStorageDataPath(action.Path) + dataPath := w.dataToPathFunc(action.DataType, action.ID) + if dataPath == "" { + return nil + } + path := w.toStorageDataPath(dataPath) switch action.ActionType { case ActionTypePut: w.log.Debugf("writing file: %q", path) @@ -1209,18 +1217,21 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { return nil } -type AdditionalActionsFunc func(action *Action) ([]*Action, error) +type CheckpointFunc func(action *Action) error -func NoOpAdditionalActionFunc(action *Action) ([]*Action, error) { - return []*Action{}, nil +type DataToPathFunc func(dataType string, id string) string + +func NoOpDataToPath(dataType string, id string) string { + return "" } type WalManagerConfig struct { - BasePath string - E *etcd.Store - Lts *objectstorage.ObjStorage - AdditionalActionsFunc AdditionalActionsFunc - EtcdWalsKeepNum int + BasePath string + E *etcd.Store + Lts *objectstorage.ObjStorage + EtcdWalsKeepNum int + CheckpointFunc CheckpointFunc + DataToPathFunc DataToPathFunc } type WalManager struct { @@ -1230,6 +1241,8 @@ type WalManager struct { lts *objectstorage.ObjStorage changes *WalChanges etcdWalsKeepNum int + checkpointFunc CheckpointFunc + dataToPathFunc DataToPathFunc } func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) { @@ -1240,9 +1253,9 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf return nil, errors.New("etcdWalsKeepNum must be greater than 0") } - additionalActionsFunc := conf.AdditionalActionsFunc - if additionalActionsFunc == nil { - additionalActionsFunc = NoOpAdditionalActionFunc + dataToPathFunc := conf.DataToPathFunc + if dataToPathFunc == nil { + dataToPathFunc = NoOpDataToPath } w := &WalManager{ @@ -1252,6 +1265,8 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf lts: conf.Lts, etcdWalsKeepNum: conf.EtcdWalsKeepNum, changes: NewWalChanges(), + checkpointFunc: conf.CheckpointFunc, + dataToPathFunc: dataToPathFunc, } // add trailing slash the basepath diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go index 752172f..7ded7d3 100644 --- a/internal/wal/wal_test.go +++ b/internal/wal/wal_test.go @@ -90,6 +90,8 @@ func TestEtcdReset(t *testing.T) { } wal, err := NewWalManager(ctx, logger, walConfig) walReadyCh := make(chan struct{}) + + t.Logf("starting wal") go wal.Run(ctx, walReadyCh) <-walReadyCh @@ -102,9 +104,9 @@ func TestEtcdReset(t *testing.T) { expectedObjects := []string{} for i := 0; i < 20; i++ { - objectPath := fmt.Sprintf("object%02d", i) - expectedObjects = append(expectedObjects, objectPath) - actions[0].Path = objectPath + objectID := fmt.Sprintf("object%02d", i) + expectedObjects = append(expectedObjects, objectID) + actions[0].ID = objectID if _, err := wal.WriteWal(ctx, actions, nil); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -113,20 +115,39 @@ func TestEtcdReset(t *testing.T) { // wait for wal to be committed storage time.Sleep(5 * time.Second) + t.Logf("stopping wal") + cancel() + + t.Logf("stopping etcd") // Reset etcd shutdownEtcd(tetcd) tetcd.WaitDown(10 * time.Second) + t.Logf("resetting etcd") os.RemoveAll(etcdDir) + t.Logf("starting etcd") + tetcd = setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) if err := tetcd.Start(); err != nil { t.Fatalf("unexpected err: %v", err) } defer shutdownEtcd(tetcd) - cancel() - ctx = context.Background() + ctx, cancel = context.WithCancel(context.Background()) + walConfig = &WalManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + Lts: objectstorage.NewObjStorage(lts, "/"), + EtcdWalsKeepNum: 10, + } + wal, err = NewWalManager(ctx, logger, walConfig) + walReadyCh = make(chan struct{}) + + t.Logf("starting wal") go wal.Run(ctx, walReadyCh) <-walReadyCh + time.Sleep(5 * time.Second) + curObjects := []string{} doneCh := make(chan struct{}) for object := range wal.List("", "", true, doneCh) { @@ -174,7 +195,7 @@ func TestConcurrentUpdate(t *testing.T) { actions := []*Action{ { ActionType: ActionTypePut, - Path: "/object01", + ID: "/object01", Data: []byte("{}"), }, } @@ -183,6 +204,8 @@ func TestConcurrentUpdate(t *testing.T) { go wal.Run(ctx, walReadyCh) <-walReadyCh + time.Sleep(5 * time.Second) + cgNames := []string{"changegroup01", "changegroup02"} cgt, err := wal.GetChangeGroupsUpdateToken(cgNames) if err != nil { @@ -253,7 +276,7 @@ func TestWalCleaner(t *testing.T) { actions := []*Action{ { ActionType: ActionTypePut, - Path: "/object01", + ID: "/object01", Data: []byte("{}"), }, }