update wal and readdb

This commit is contained in:
Simone Gotti 2019-04-01 12:54:43 +02:00
parent eb8cd9cc52
commit fc891409ca
12 changed files with 1053 additions and 422 deletions

View File

@ -20,7 +20,6 @@ import (
"path"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/services/configstore/common"
"github.com/sorintlab/agola/internal/services/configstore/readdb"
"github.com/sorintlab/agola/internal/services/types"
"github.com/sorintlab/agola/internal/util"
@ -110,7 +109,8 @@ func (s *CommandHandler) CreateProjectGroup(ctx context.Context, projectGroup *t
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageProjectGroupFile(projectGroup.ID),
DataType: string(types.ConfigTypeProjectGroup),
ID: projectGroup.ID,
Data: pcj,
},
}
@ -185,7 +185,8 @@ func (s *CommandHandler) CreateProject(ctx context.Context, project *types.Proje
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageProjectFile(project.ID),
DataType: string(types.ConfigTypeProject),
ID: project.ID,
Data: pcj,
},
}
@ -231,7 +232,8 @@ func (s *CommandHandler) DeleteProject(ctx context.Context, projectRef string) e
actions := []*wal.Action{
{
ActionType: wal.ActionTypeDelete,
Path: common.StorageProjectFile(project.ID),
DataType: string(types.ConfigTypeProject),
ID: project.ID,
},
}
@ -335,12 +337,14 @@ func (s *CommandHandler) CreateUser(ctx context.Context, req *CreateUserRequest)
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageUserFile(user.ID),
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
{
ActionType: wal.ActionTypePut,
Path: common.StorageProjectGroupFile(pg.ID),
DataType: string(types.ConfigTypeProjectGroup),
ID: pg.ID,
Data: pgj,
},
}
@ -380,7 +384,8 @@ func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error
actions := []*wal.Action{
{
ActionType: wal.ActionTypeDelete,
Path: common.StorageUserFile(user.ID),
DataType: string(types.ConfigTypeUser),
ID: user.ID,
},
}
@ -474,7 +479,8 @@ func (s *CommandHandler) CreateUserLA(ctx context.Context, req *CreateUserLARequ
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageUserFile(user.ID),
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
@ -532,7 +538,8 @@ func (s *CommandHandler) DeleteUserLA(ctx context.Context, userName, laID string
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageUserFile(user.ID),
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
@ -611,7 +618,8 @@ func (s *CommandHandler) UpdateUserLA(ctx context.Context, req *UpdateUserLARequ
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageUserFile(user.ID),
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
@ -666,7 +674,8 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageUserFile(user.ID),
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
@ -715,7 +724,8 @@ func (s *CommandHandler) CreateRemoteSource(ctx context.Context, remoteSource *t
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageRemoteSourceFile(remoteSource.ID),
DataType: string(types.ConfigTypeRemoteSource),
ID: remoteSource.ID,
Data: rsj,
},
}
@ -755,7 +765,8 @@ func (s *CommandHandler) DeleteRemoteSource(ctx context.Context, remoteSourceNam
actions := []*wal.Action{
{
ActionType: wal.ActionTypeDelete,
Path: common.StorageRemoteSourceFile(remoteSource.ID),
DataType: string(types.ConfigTypeRemoteSource),
ID: remoteSource.ID,
},
}
@ -814,12 +825,14 @@ func (s *CommandHandler) CreateOrg(ctx context.Context, org *types.Organization)
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageOrgFile(org.ID),
DataType: string(types.ConfigTypeOrg),
ID: org.ID,
Data: orgj,
},
{
ActionType: wal.ActionTypePut,
Path: common.StorageProjectGroupFile(pg.ID),
DataType: string(types.ConfigTypeProjectGroup),
ID: pg.ID,
Data: pgj,
},
}
@ -861,14 +874,16 @@ func (s *CommandHandler) DeleteOrg(ctx context.Context, orgName string) error {
actions := []*wal.Action{
{
ActionType: wal.ActionTypeDelete,
Path: common.StorageOrgFile(org.ID),
DataType: string(types.ConfigTypeOrg),
ID: org.ID,
},
}
// delete all org projects
for _, project := range projects {
actions = append(actions, &wal.Action{
ActionType: wal.ActionTypeDelete,
Path: common.StorageProjectFile(project.ID),
DataType: string(types.ConfigTypeProject),
ID: project.ID,
})
}
@ -931,7 +946,8 @@ func (s *CommandHandler) CreateSecret(ctx context.Context, secret *types.Secret)
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageSecretFile(secret.ID),
DataType: string(types.ConfigTypeSecret),
ID: secret.ID,
Data: secretj,
},
}
@ -977,7 +993,8 @@ func (s *CommandHandler) DeleteSecret(ctx context.Context, parentType types.Conf
actions := []*wal.Action{
{
ActionType: wal.ActionTypeDelete,
Path: common.StorageSecretFile(secret.ID),
DataType: string(types.ConfigTypeSecret),
ID: secret.ID,
},
}
@ -1040,7 +1057,8 @@ func (s *CommandHandler) CreateVariable(ctx context.Context, variable *types.Var
actions := []*wal.Action{
{
ActionType: wal.ActionTypePut,
Path: common.StorageVariableFile(variable.ID),
DataType: string(types.ConfigTypeVariable),
ID: variable.ID,
Data: variablej,
},
}
@ -1085,7 +1103,8 @@ func (s *CommandHandler) DeleteVariable(ctx context.Context, parentType types.Co
actions := []*wal.Action{
{
ActionType: wal.ActionTypeDelete,
Path: common.StorageVariableFile(variable.ID),
DataType: string(types.ConfigTypeVariable),
ID: variable.ID,
},
}

View File

@ -178,9 +178,11 @@ func (r *ReadDB) SyncFromFiles() (string, error) {
}
f.Close()
configType, id := common.PathToTypeID(obj)
action := &wal.Action{
ActionType: wal.ActionTypePut,
Path: obj,
DataType: string(configType),
ID: id,
Data: data,
}
if err := r.applyAction(tx, action); err != nil {
@ -616,11 +618,9 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
}
func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error {
configType, ID := common.PathToTypeID(action.Path)
switch action.ActionType {
case wal.ActionTypePut:
switch configType {
switch types.ConfigType(action.DataType) {
case types.ConfigTypeUser:
if err := r.insertUser(tx, action.Data); err != nil {
return err
@ -652,40 +652,40 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error {
}
case wal.ActionTypeDelete:
switch configType {
switch types.ConfigType(action.DataType) {
case types.ConfigTypeUser:
r.log.Debugf("deleting user with id: %s", ID)
if err := r.deleteUser(tx, ID); err != nil {
r.log.Debugf("deleting user with id: %s", action.ID)
if err := r.deleteUser(tx, action.ID); err != nil {
return err
}
case types.ConfigTypeOrg:
r.log.Debugf("deleting org with id: %s", ID)
if err := r.deleteOrg(tx, ID); err != nil {
r.log.Debugf("deleting org with id: %s", action.ID)
if err := r.deleteOrg(tx, action.ID); err != nil {
return err
}
case types.ConfigTypeProjectGroup:
r.log.Debugf("deleting project group with id: %s", ID)
if err := r.deleteProjectGroup(tx, ID); err != nil {
r.log.Debugf("deleting project group with id: %s", action.ID)
if err := r.deleteProjectGroup(tx, action.ID); err != nil {
return err
}
case types.ConfigTypeProject:
r.log.Debugf("deleting project with id: %s", ID)
if err := r.deleteProject(tx, ID); err != nil {
r.log.Debugf("deleting project with id: %s", action.ID)
if err := r.deleteProject(tx, action.ID); err != nil {
return err
}
case types.ConfigTypeRemoteSource:
r.log.Debugf("deleting remote source with id: %s", ID)
if err := r.deleteRemoteSource(tx, ID); err != nil {
r.log.Debugf("deleting remote source with id: %s", action.ID)
if err := r.deleteRemoteSource(tx, action.ID); err != nil {
return err
}
case types.ConfigTypeSecret:
r.log.Debugf("deleting secret with id: %s", ID)
if err := r.deleteSecret(tx, ID); err != nil {
r.log.Debugf("deleting secret with id: %s", action.ID)
if err := r.deleteSecret(tx, action.ID); err != nil {
return err
}
case types.ConfigTypeVariable:
r.log.Debugf("deleting variable with id: %s", ID)
if err := r.deleteVariable(tx, ID); err != nil {
r.log.Debugf("deleting variable with id: %s", action.ID)
if err := r.deleteVariable(tx, action.ID); err != nil {
return err
}
}

View File

@ -20,11 +20,13 @@ import (
"time"
uuid "github.com/satori/go.uuid"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/runconfig"
"github.com/sorintlab/agola/internal/sequence"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
@ -37,14 +39,16 @@ import (
type CommandHandler struct {
log *zap.SugaredLogger
e *etcd.Store
readDB *readdb.ReadDB
lts *objectstorage.ObjStorage
wal *wal.WalManager
}
func NewCommandHandler(logger *zap.Logger, e *etcd.Store, lts *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler {
func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, lts *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler {
return &CommandHandler{
log: logger.Sugar(),
e: e,
readDB: readDB,
lts: lts,
wal: wal,
}
@ -293,9 +297,9 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg
rc := rb.Rc
rd := rb.Rd
c, cgt, err := store.LTSGetRunCounter(s.wal, run.Group)
c, cgt, err := s.getRunCounter(run.Group)
s.log.Infof("c: %d, cgt: %s", c, util.Dump(cgt))
if err != nil && err != objectstorage.ErrNotExist {
if err != nil {
return err
}
c++
@ -450,3 +454,28 @@ func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string)
return nil
}
func (s *CommandHandler) getRunCounter(group string) (uint64, *wal.ChangeGroupsUpdateToken, error) {
// use the first group dir after the root
pl := util.PathList(group)
if len(pl) < 2 {
return 0, nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group)
}
var c uint64
var cgt *wal.ChangeGroupsUpdateToken
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
c, err = s.readDB.GetRunCounterLTS(tx, pl[1])
if err != nil {
return err
}
cgt, err = s.readDB.GetChangeGroupsUpdateTokensLTS(tx, []string{"counter-" + pl[1]})
return err
})
if err != nil {
return 0, nil, err
}
return c, cgt, nil
}

View File

@ -17,7 +17,6 @@ package common
import (
"fmt"
"path"
"strings"
)
type ErrNotExist struct {
@ -43,8 +42,6 @@ var (
EtcdExecutorsDir = "executors"
EtcdTasksDir = "tasks"
EtcdLastIndexKey = "lastindex"
)
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
@ -80,37 +77,30 @@ func StorageRunConfigFile(runID string) string {
return path.Join(StorageRunsConfigDir, runID)
}
func StorageCounterFile(group string) string {
func StorageRunCounterFile(group string) string {
return path.Join(StorageCountersDir, group)
}
type ConfigType int
type DataType string
const (
ConfigTypeRun ConfigType = iota + 1
ConfigTypeRunData
ConfigTypeRunConfig
ConfigTypeCounter
DataTypeRun DataType = "run"
DataTypeRunData DataType = "rundata"
DataTypeRunConfig DataType = "runconfig"
DataTypeRunCounter DataType = "runcounter"
)
func PathToTypeID(p string) (ConfigType, string) {
var configType ConfigType
switch path.Dir(p) {
case StorageRunsDir:
configType = ConfigTypeRun
case StorageRunsDataDir:
configType = ConfigTypeRunData
case StorageRunsConfigDir:
configType = ConfigTypeRunConfig
func DataToPathFunc(dataType string, id string) string {
switch DataType(dataType) {
case DataTypeRun:
return StorageRunFile(id)
case DataTypeRunData:
return StorageRunDataFile(id)
case DataTypeRunConfig:
return StorageRunConfigFile(id)
case DataTypeRunCounter:
return StorageRunCounterFile(id)
}
if strings.HasPrefix(p, StorageCountersDir+"/") {
configType = ConfigTypeCounter
}
if configType == 0 {
panic(fmt.Errorf("cannot determine configtype for path: %q", p))
}
return configType, path.Base(p)
panic(fmt.Errorf("unknown data type %q", dataType))
}

View File

@ -34,7 +34,11 @@ var Stmts = []string{
// committedwalsequence stores the last committed wal sequence
"create table committedwalsequence_lts (seq varchar, PRIMARY KEY (seq))",
"create table changegrouprevision_lts (id varchar, revision varchar, PRIMARY KEY (id, revision))",
"create table run_lts (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))",
"create table rundata_lts (id varchar, data bytea, PRIMARY KEY (id))",
"create table runcounter_lts (groupid varchar, counter bigint, PRIMARY KEY (groupid))",
}

File diff suppressed because it is too large Load Diff

View File

@ -85,12 +85,12 @@ func (s *Scheduler) advanceRunTasks(ctx context.Context, r *types.Run) error {
log.Debugf("run: %s", util.Dump(r))
rc, err := store.LTSGetRunConfig(s.wal, r.ID)
if err != nil {
return errors.Wrapf(err, "cannot get run config %q from etcd", r.ID)
return errors.Wrapf(err, "cannot get run config %q", r.ID)
}
log.Debugf("rc: %s", util.Dump(rc))
rd, err := store.LTSGetRunData(s.wal, r.ID)
if err != nil {
return errors.Wrapf(err, "cannot get run data %q from etcd", r.ID)
return errors.Wrapf(err, "cannot get run data %q", r.ID)
}
log.Debugf("rd: %s", util.Dump(rd))
@ -1077,25 +1077,9 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error {
return err
}
resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
// if lastIndexKey doesn't exist return an error
if err != nil {
return err
}
lastIndexRevision := resp.Kvs[0].ModRevision
lastIndexDir := string(resp.Kvs[0].Value)
actions := append([]*wal.Action{ra})
indexActions, err := s.additionalActions(lastIndexDir, ra)
if err != nil {
return err
}
actions := append([]*wal.Action{ra}, indexActions...)
cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)}
then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, lastIndexDir)}
if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil {
if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil {
return err
}
@ -1107,53 +1091,6 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error {
return nil
}
func (s *Scheduler) additionalActions(indexDir string, action *wal.Action) ([]*wal.Action, error) {
type indexData struct {
ID string
Group string
Phase types.RunPhase
}
configType, _ := common.PathToTypeID(action.Path)
var actionType wal.ActionType
switch action.ActionType {
case wal.ActionTypePut:
actionType = wal.ActionTypePut
case wal.ActionTypeDelete:
actionType = wal.ActionTypeDelete
}
switch configType {
case common.ConfigTypeRun:
var run *types.Run
if err := json.Unmarshal(action.Data, &run); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal run")
}
data := []byte{}
index := path.Join(common.StorageRunsIndexesDir, indexDir, "added", run.ID)
id := &indexData{ID: run.ID, Group: run.Group, Phase: run.Phase}
idj, err := json.Marshal(id)
if err != nil {
return nil, err
}
data = append(data, idj...)
actions := []*wal.Action{
&wal.Action{
ActionType: actionType,
Path: index,
Data: data,
},
}
return actions, nil
}
return []*wal.Action{}, nil
}
func (s *Scheduler) dumpLTSLoop(ctx context.Context) {
for {
log.Debugf("lts dump loop")
@ -1174,37 +1111,50 @@ func (s *Scheduler) dumpLTSLoop(ctx context.Context) {
}
func (s *Scheduler) dumpLTS(ctx context.Context) error {
type indexHeader struct {
LastWalSequence string
}
type indexData struct {
DataType string
Data interface{}
}
type indexDataRun struct {
ID string
Group string
Phase types.RunPhase
}
resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
// if lastIndexKey doesn't exist return an error
if err != nil {
return err
type indexDataRunCounter struct {
Group string
Counter uint64
}
lastIndexRevision := resp.Kvs[0].ModRevision
revision := resp.Header.Revision
indexDir := strconv.FormatInt(time.Now().UnixNano(), 10)
readdbRevision, err := s.readDB.GetRevision()
var lastWalSequence string
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
lastWalSequence, err = s.readDB.GetCommittedWalSequenceLTS(tx)
return err
})
if err != nil {
return err
}
if readdbRevision < revision {
return errors.Errorf("readdb revision %d is lower than index revision %d", readdbRevision, revision)
}
runs := []*readdb.RunData{}
data := []byte{}
iheader := &indexHeader{LastWalSequence: lastWalSequence}
ihj, err := json.Marshal(iheader)
if err != nil {
return err
}
data = append(data, ihj...)
var lastRunID string
stop := false
for {
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
lruns, err := s.readDB.GetRunsFilteredLTS(tx, nil, false, nil, lastRunID, 10000, types.SortOrderDesc)
lruns, err := s.readDB.GetRunsFilteredLTS(tx, nil, false, nil, lastRunID, 1000, types.SortOrderDesc)
if err != nil {
return err
}
@ -1213,7 +1163,14 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error {
} else {
lastRunID = lruns[len(lruns)-1].ID
}
runs = append(runs, lruns...)
for _, run := range lruns {
id := &indexData{DataType: string(common.DataTypeRun), Data: indexDataRun{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)}}
idj, err := json.Marshal(id)
if err != nil {
return err
}
data = append(data, idj...)
}
return nil
})
if err != nil {
@ -1224,30 +1181,41 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error {
}
}
data := []byte{}
for _, run := range runs {
id := &indexData{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)}
var lastGroup string
stop = false
for {
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
counters, err := s.readDB.GetRunCountersLTS(tx, lastGroup, 1000)
if err != nil {
return err
}
if len(counters) == 0 {
stop = true
} else {
lastGroup = counters[len(counters)-1].Group
}
for _, counter := range counters {
id := &indexData{DataType: string(common.DataTypeRunCounter), Data: indexDataRunCounter{Group: counter.Group, Counter: counter.Counter}}
idj, err := json.Marshal(id)
if err != nil {
return err
}
data = append(data, idj...)
}
return nil
})
if err != nil {
return err
}
if stop {
break
}
}
index := path.Join(common.StorageRunsIndexesDir, indexDir, "all")
cmp := []etcdclientv3.Cmp{etcdclientv3.Compare(etcdclientv3.ModRevision(common.EtcdLastIndexKey), "=", lastIndexRevision)}
then := []etcdclientv3.Op{etcdclientv3.OpPut(common.EtcdLastIndexKey, indexDir)}
actions := []*wal.Action{
&wal.Action{
ActionType: wal.ActionTypePut,
Path: index,
Data: data,
},
}
if _, err = s.wal.WriteWalAdditionalOps(ctx, actions, nil, cmp, then); err != nil {
if err = s.lts.WriteObject(index, bytes.NewReader(data)); err != nil {
return err
}
@ -1279,18 +1247,12 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error {
Phase types.RunPhase
}
resp, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
// if lastIndexKey doesn't exist return an error
if err != nil {
return err
}
lastIndexDir := string(resp.Kvs[0].Value)
// collect all object that don't pertain to the lastIndexDir
// collect all old indexes
objects := []string{}
doneCh := make(chan struct{})
defer close(doneCh)
for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) {
var indexPath string
for object := range s.lts.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
@ -1299,22 +1261,20 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error {
if len(h) < 2 {
return errors.Errorf("wrong index dir path %q", object.Path)
}
indexDir := h[1]
if indexDir != lastIndexDir {
objects = append(objects, object.Path)
curIndexPath := object.Path
if curIndexPath > indexPath {
if indexPath != "" {
objects = append(objects, indexPath)
}
indexPath = curIndexPath
} else {
objects = append(objects, curIndexPath)
}
}
actions := make([]*wal.Action, len(objects))
for i, object := range objects {
actions[i] = &wal.Action{
ActionType: wal.ActionTypeDelete,
Path: object,
}
}
if len(actions) > 0 {
if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil {
for _, object := range objects {
if err := s.lts.DeleteObject(object); err != nil {
log.Errorf("object: %s, err: %v", object, err)
return err
}
}
@ -1354,6 +1314,7 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule
walConf := &wal.WalManagerConfig{
E: e,
Lts: lts,
DataToPathFunc: common.DataToPathFunc,
}
wal, err := wal.NewWalManager(ctx, logger, walConf)
if err != nil {
@ -1361,13 +1322,13 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule
}
s.wal = wal
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, wal)
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, lts, wal)
if err != nil {
return nil, err
}
s.readDB = readDB
ch := command.NewCommandHandler(logger, e, lts, wal)
ch := command.NewCommandHandler(logger, e, readDB, lts, wal)
s.ch = ch
return s, nil
@ -1385,41 +1346,6 @@ func (s *Scheduler) InitEtcd(ctx context.Context) error {
return etcd.FromEtcdError(err)
}
// Populate lastIndexDir if empty
doneCh := make(chan struct{})
defer close(doneCh)
_, err := s.e.Get(ctx, common.EtcdLastIndexKey, 0)
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
// EtcdLastIndexKey already exist
if err == nil {
return nil
}
var indexDir string
// take the last (greater) indexdir
for object := range s.wal.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
h := util.PathList(object.Path)
if len(h) < 2 {
return errors.Errorf("wrong index dir path %q", object.Path)
}
indexDir = h[1]
}
// if an indexDir doesn't exist in lts then initialize
if indexDir == "" {
indexDir = strconv.FormatInt(time.Now().UnixNano(), 10)
if _, err := s.e.AtomicPut(ctx, common.EtcdLastIndexKey, []byte(indexDir), 0, nil); err != nil {
return err
}
}
return nil
}

View File

@ -88,32 +88,11 @@ func LTSRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string
paths := []string{}
subGroups := LTSSubGroups(group)
for _, subGroup := range subGroups {
paths = append(paths, common.StorageCounterFile(subGroup))
paths = append(paths, common.StorageRunCounterFile(subGroup))
}
return paths
}
func LTSGetRunCounter(wal *wal.WalManager, group string) (uint64, *wal.ChangeGroupsUpdateToken, error) {
// use the first group dir after the root
pl := util.PathList(group)
if len(pl) < 2 {
return 0, nil, errors.Errorf("cannot determine group counter name, wrong group path %q", group)
}
runCounterPath := common.StorageCounterFile(pl[1])
rcf, cgt, err := wal.ReadObject(runCounterPath, []string{"counter-" + pl[1]})
if err != nil {
return 0, cgt, err
}
defer rcf.Close()
d := json.NewDecoder(rcf)
var c uint64
if err := d.Decode(&c); err != nil {
return 0, nil, err
}
return c, cgt, nil
}
func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) {
// use the first group dir after the root
pl := util.PathList(group)
@ -128,7 +107,8 @@ func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa
action := &wal.Action{
ActionType: wal.ActionTypePut,
Path: common.StorageCounterFile(pl[1]),
DataType: string(common.DataTypeRunCounter),
ID: pl[1],
Data: cj,
}
@ -175,7 +155,8 @@ func LTSSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) {
action := &wal.Action{
ActionType: wal.ActionTypePut,
Path: common.StorageRunConfigFile(rc.ID),
DataType: string(common.DataTypeRunConfig),
ID: rc.ID,
Data: rcj,
}
@ -206,7 +187,8 @@ func LTSSaveRunDataAction(rd *types.RunData) (*wal.Action, error) {
action := &wal.Action{
ActionType: wal.ActionTypePut,
Path: common.StorageRunDataFile(rd.ID),
DataType: string(common.DataTypeRunData),
ID: rd.ID,
Data: rdj,
}
@ -238,7 +220,8 @@ func LTSSaveRunAction(r *types.Run) (*wal.Action, error) {
action := &wal.Action{
ActionType: wal.ActionTypePut,
Path: common.StorageRunFile(r.ID),
DataType: string(common.DataTypeRun),
ID: r.ID,
Data: rj,
}

View File

@ -23,12 +23,6 @@ import (
"github.com/sorintlab/agola/internal/util"
)
type RunBundle struct {
Run *Run
Rc *RunConfig
Rd *RunData
}
type SortOrder int
const (
@ -36,6 +30,17 @@ const (
SortOrderDesc
)
type RunBundle struct {
Run *Run
Rc *RunConfig
Rd *RunData
}
type RunCounter struct {
Group string
Counter uint64
}
type RunPhase string
const (

View File

@ -180,12 +180,16 @@ func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revi
}
func (w *WalManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) {
dataPath := w.dataToPathFunc(action.DataType, action.ID)
if dataPath == "" {
return
}
switch action.ActionType {
case ActionTypePut:
w.changes.addPut(action.Path, walSequence, revision)
w.changes.addPut(dataPath, walSequence, revision)
case ActionTypeDelete:
w.changes.addDelete(action.Path, walSequence, revision)
w.changes.addDelete(dataPath, walSequence, revision)
}
if w.changes.actions[walSequence] == nil {
w.changes.actions[walSequence] = []*Action{}

View File

@ -114,7 +114,8 @@ const (
type Action struct {
ActionType ActionType
Path string
DataType string
ID string
Data []byte
}
@ -213,11 +214,14 @@ func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *Cha
if ok {
for _, action := range actions {
if action.ActionType == ActionTypePut && action.Path == p {
w.log.Debugf("reading file from wal: %q", action.Path)
if action.ActionType == ActionTypePut {
dataPath := w.dataToPathFunc(action.DataType, action.ID)
if dataPath == p {
w.log.Debugf("reading file from wal: %q", dataPath)
return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil
}
}
}
return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq)
}
@ -884,7 +888,11 @@ func (w *WalManager) checkpoint(ctx context.Context) error {
}
func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error {
path := w.toStorageDataPath(action.Path)
dataPath := w.dataToPathFunc(action.DataType, action.ID)
if dataPath == "" {
return nil
}
path := w.toStorageDataPath(dataPath)
switch action.ActionType {
case ActionTypePut:
w.log.Debugf("writing file: %q", path)
@ -1209,18 +1217,21 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
return nil
}
type AdditionalActionsFunc func(action *Action) ([]*Action, error)
type CheckpointFunc func(action *Action) error
func NoOpAdditionalActionFunc(action *Action) ([]*Action, error) {
return []*Action{}, nil
type DataToPathFunc func(dataType string, id string) string
func NoOpDataToPath(dataType string, id string) string {
return ""
}
type WalManagerConfig struct {
BasePath string
E *etcd.Store
Lts *objectstorage.ObjStorage
AdditionalActionsFunc AdditionalActionsFunc
EtcdWalsKeepNum int
CheckpointFunc CheckpointFunc
DataToPathFunc DataToPathFunc
}
type WalManager struct {
@ -1230,6 +1241,8 @@ type WalManager struct {
lts *objectstorage.ObjStorage
changes *WalChanges
etcdWalsKeepNum int
checkpointFunc CheckpointFunc
dataToPathFunc DataToPathFunc
}
func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) {
@ -1240,9 +1253,9 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf
return nil, errors.New("etcdWalsKeepNum must be greater than 0")
}
additionalActionsFunc := conf.AdditionalActionsFunc
if additionalActionsFunc == nil {
additionalActionsFunc = NoOpAdditionalActionFunc
dataToPathFunc := conf.DataToPathFunc
if dataToPathFunc == nil {
dataToPathFunc = NoOpDataToPath
}
w := &WalManager{
@ -1252,6 +1265,8 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf
lts: conf.Lts,
etcdWalsKeepNum: conf.EtcdWalsKeepNum,
changes: NewWalChanges(),
checkpointFunc: conf.CheckpointFunc,
dataToPathFunc: dataToPathFunc,
}
// add trailing slash the basepath

View File

@ -90,6 +90,8 @@ func TestEtcdReset(t *testing.T) {
}
wal, err := NewWalManager(ctx, logger, walConfig)
walReadyCh := make(chan struct{})
t.Logf("starting wal")
go wal.Run(ctx, walReadyCh)
<-walReadyCh
@ -102,9 +104,9 @@ func TestEtcdReset(t *testing.T) {
expectedObjects := []string{}
for i := 0; i < 20; i++ {
objectPath := fmt.Sprintf("object%02d", i)
expectedObjects = append(expectedObjects, objectPath)
actions[0].Path = objectPath
objectID := fmt.Sprintf("object%02d", i)
expectedObjects = append(expectedObjects, objectID)
actions[0].ID = objectID
if _, err := wal.WriteWal(ctx, actions, nil); err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -113,20 +115,39 @@ func TestEtcdReset(t *testing.T) {
// wait for wal to be committed storage
time.Sleep(5 * time.Second)
t.Logf("stopping wal")
cancel()
t.Logf("stopping etcd")
// Reset etcd
shutdownEtcd(tetcd)
tetcd.WaitDown(10 * time.Second)
t.Logf("resetting etcd")
os.RemoveAll(etcdDir)
t.Logf("starting etcd")
tetcd = setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
if err := tetcd.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer shutdownEtcd(tetcd)
cancel()
ctx = context.Background()
ctx, cancel = context.WithCancel(context.Background())
walConfig = &WalManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
Lts: objectstorage.NewObjStorage(lts, "/"),
EtcdWalsKeepNum: 10,
}
wal, err = NewWalManager(ctx, logger, walConfig)
walReadyCh = make(chan struct{})
t.Logf("starting wal")
go wal.Run(ctx, walReadyCh)
<-walReadyCh
time.Sleep(5 * time.Second)
curObjects := []string{}
doneCh := make(chan struct{})
for object := range wal.List("", "", true, doneCh) {
@ -174,7 +195,7 @@ func TestConcurrentUpdate(t *testing.T) {
actions := []*Action{
{
ActionType: ActionTypePut,
Path: "/object01",
ID: "/object01",
Data: []byte("{}"),
},
}
@ -183,6 +204,8 @@ func TestConcurrentUpdate(t *testing.T) {
go wal.Run(ctx, walReadyCh)
<-walReadyCh
time.Sleep(5 * time.Second)
cgNames := []string{"changegroup01", "changegroup02"}
cgt, err := wal.GetChangeGroupsUpdateToken(cgNames)
if err != nil {
@ -253,7 +276,7 @@ func TestWalCleaner(t *testing.T) {
actions := []*Action{
{
ActionType: ActionTypePut,
Path: "/object01",
ID: "/object01",
Data: []byte("{}"),
},
}