From 41e333d7ec7b406c1bfd7d5a87268bec815b5528 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Sat, 27 Apr 2019 15:16:48 +0200 Subject: [PATCH] *: rename "lts" to "ost" `lts` was choosen to reflect a "long term storage" but currently it's just an object storage implementation. So use this term and "ost" as its abbreviation (to not clash with "os"). --- examples/config.yml | 8 +- internal/common/common.go | 14 +- internal/services/config/config.go | 36 ++-- internal/services/configstore/configstore.go | 10 +- .../services/configstore/configstore_test.go | 16 +- .../services/configstore/readdb/readdb.go | 40 ++-- internal/services/gateway/gateway.go | 6 +- .../services/runservice/scheduler/api/api.go | 18 +- .../runservice/scheduler/api/executor.go | 44 ++-- .../runservice/scheduler/command/command.go | 18 +- .../runservice/scheduler/readdb/create.go | 15 +- .../runservice/scheduler/readdb/readdb.go | 203 +++++++++--------- .../runservice/scheduler/scheduler.go | 84 ++++---- .../runservice/scheduler/store/store.go | 46 ++-- internal/wal/changes.go | 2 +- internal/wal/wal.go | 56 ++--- internal/wal/wal_test.go | 20 +- 17 files changed, 320 insertions(+), 316 deletions(-) diff --git a/examples/config.yml b/examples/config.yml index 23817b0..dc2a1b8 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -24,18 +24,18 @@ configStore: dataDir: /tmp/agola/configstore etcd: endpoints: "http://localhost:2379" - lts: + objectStorage: type: posix - path: /tmp/agola/configstore/lts + path: /tmp/agola/configstore/ost web: listenAddress: ":4002" runServiceScheduler: #debug: true dataDir: /tmp/agola/runservice/scheduler - lts: + objectStorage: type: posix - path: /tmp/agola/runservice/lts + path: /tmp/agola/runservice/ost etcd: endpoints: "http://localhost:2379" web: diff --git a/internal/common/common.go b/internal/common/common.go index 6ed5494..1aff75c 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -73,19 +73,19 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error { }) } -func NewLTS(c *config.LTS) (*objectstorage.ObjStorage, error) { +func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error) { var ( err error - lts objectstorage.Storage + ost objectstorage.Storage ) switch c.Type { - case config.LTSTypePosix: - lts, err = objectstorage.NewPosixStorage(c.Path) + case config.ObjectStorageTypePosix: + ost, err = objectstorage.NewPosixStorage(c.Path) if err != nil { return nil, errors.Wrapf(err, "failed to create posix object storage") } - case config.LTSTypeS3: + case config.ObjectStorageTypeS3: // minio golang client doesn't accept an url as an endpoint endpoint := c.Endpoint secure := !c.DisableTLS @@ -100,13 +100,13 @@ func NewLTS(c *config.LTS) (*objectstorage.ObjStorage, error) { return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme) } } - lts, err = objectstorage.NewS3Storage(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) + ost, err = objectstorage.NewS3Storage(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) if err != nil { return nil, errors.Wrapf(err, "failed to create s3 object storage") } } - return objectstorage.NewObjStorage(lts, "/"), nil + return objectstorage.NewObjStorage(ost, "/"), nil } func NewEtcd(c *config.Etcd, logger *zap.Logger, prefix string) (*etcd.Store, error) { diff --git a/internal/services/config/config.go b/internal/services/config/config.go index b10bb57..d811bd0 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -45,9 +45,9 @@ type Gateway struct { ConfigStoreURL string `yaml:"configStoreURL"` GitServerURL string `yaml:"gitServerURL"` - Web Web `yaml:"web"` - Etcd Etcd `yaml:"etcd"` - LTS LTS `yaml:"lts"` + Web Web `yaml:"web"` + Etcd Etcd `yaml:"etcd"` + ObjectStorage ObjectStorage `yaml:"objectStorage"` TokenSigning TokenSigning `yaml:"tokenSigning"` @@ -63,10 +63,10 @@ type Scheduler struct { type RunServiceScheduler struct { Debug bool `yaml:"debug"` - DataDir string `yaml:"dataDir"` - Web Web `yaml:"web"` - Etcd Etcd `yaml:"etcd"` - LTS LTS `yaml:"lts"` + DataDir string `yaml:"dataDir"` + Web Web `yaml:"web"` + Etcd Etcd `yaml:"etcd"` + ObjectStorage ObjectStorage `yaml:"objectStorage"` RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` } @@ -93,9 +93,9 @@ type ConfigStore struct { DataDir string `yaml:"dataDir"` - Web Web `yaml:"web"` - Etcd Etcd `yaml:"etcd"` - LTS LTS `yaml:"lts"` + Web Web `yaml:"web"` + Etcd Etcd `yaml:"etcd"` + ObjectStorage ObjectStorage `yaml:"objectStorage"` } type GitServer struct { @@ -106,9 +106,9 @@ type GitServer struct { GithookPath string `yaml:"githookPath"` GatewayURL string `yaml:"gatewayURL"` - Web Web `yaml:"web"` - Etcd Etcd `yaml:"etcd"` - LTS LTS `yaml:"lts"` + Web Web `yaml:"web"` + Etcd Etcd `yaml:"etcd"` + ObjectStorage ObjectStorage `yaml:"objectStorage"` } type Web struct { @@ -130,15 +130,15 @@ type Web struct { AllowedOrigins []string `yaml:"allowedOrigins"` } -type LTSType string +type ObjectStorageType string const ( - LTSTypePosix LTSType = "posix" - LTSTypeS3 LTSType = "s3" + ObjectStorageTypePosix ObjectStorageType = "posix" + ObjectStorageTypeS3 ObjectStorageType = "s3" ) -type LTS struct { - Type LTSType `yaml:"type"` +type ObjectStorage struct { + Type ObjectStorageType `yaml:"type"` // Posix Path string `yaml:"path"` diff --git a/internal/services/configstore/configstore.go b/internal/services/configstore/configstore.go index 9edadd1..997f733 100644 --- a/internal/services/configstore/configstore.go +++ b/internal/services/configstore/configstore.go @@ -47,7 +47,7 @@ type ConfigStore struct { e *etcd.Store wal *wal.WalManager readDB *readdb.ReadDB - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage ch *command.CommandHandler listenAddress string } @@ -57,7 +57,7 @@ func NewConfigStore(ctx context.Context, c *config.ConfigStore) (*ConfigStore, e level.SetLevel(zapcore.DebugLevel) } - lts, err := scommon.NewLTS(&c.LTS) + ost, err := scommon.NewObjectStorage(&c.ObjectStorage) if err != nil { return nil, err } @@ -69,19 +69,19 @@ func NewConfigStore(ctx context.Context, c *config.ConfigStore) (*ConfigStore, e cs := &ConfigStore{ c: c, e: e, - lts: lts, + ost: ost, } walConf := &wal.WalManagerConfig{ E: e, - Lts: lts, + OST: ost, DataToPathFunc: common.DataToPathFunc, } wal, err := wal.NewWalManager(ctx, logger, walConf) if err != nil { return nil, err } - readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, lts, wal) + readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, wal) if err != nil { return nil, err } diff --git a/internal/services/configstore/configstore_test.go b/internal/services/configstore/configstore_test.go index 4fa203a..c67a914 100644 --- a/internal/services/configstore/configstore_test.go +++ b/internal/services/configstore/configstore_test.go @@ -63,16 +63,16 @@ func setupConfigstore(t *testing.T, ctx context.Context, dir string) (*ConfigSto t.Fatalf("unexpected err: %v", err) } - ltsDir, err := ioutil.TempDir(dir, "lts") + ostDir, err := ioutil.TempDir(dir, "ost") csDir, err := ioutil.TempDir(dir, "cs") baseConfig := config.ConfigStore{ Etcd: config.Etcd{ Endpoints: tetcd.Endpoint, }, - LTS: config.LTS{ - Type: config.LTSTypePosix, - Path: ltsDir, + ObjectStorage: config.ObjectStorage{ + Type: config.ObjectStorageTypePosix, + Path: ostDir, }, Web: config.Web{}, } @@ -134,7 +134,7 @@ func TestResync(t *testing.T) { ctx := context.Background() - ltsDir, err := ioutil.TempDir(dir, "lts") + ostDir, err := ioutil.TempDir(dir, "ost") csDir1, err := ioutil.TempDir(dir, "cs1") csDir2, err := ioutil.TempDir(dir, "cs2") csDir3, err := ioutil.TempDir(dir, "cs3") @@ -143,9 +143,9 @@ func TestResync(t *testing.T) { Etcd: config.Etcd{ Endpoints: tetcd.Endpoint, }, - LTS: config.LTS{ - Type: config.LTSTypePosix, - Path: ltsDir, + ObjectStorage: config.ObjectStorage{ + Type: config.ObjectStorageTypePosix, + Path: ostDir, }, Web: config.Web{}, } diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 144932d..1161dd4 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -58,14 +58,14 @@ type ReadDB struct { dataDir string e *etcd.Store rdb *db.DB - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage wal *wal.WalManager Initialized bool initMutex sync.Mutex } -func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, lts *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) { +func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) { if err := os.MkdirAll(dataDir, 0770); err != nil { return nil, err } @@ -84,7 +84,7 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. dataDir: dataDir, rdb: rdb, e: e, - lts: lts, + ost: ost, wal: wal, } @@ -131,7 +131,7 @@ func (r *ReadDB) SyncFromFiles() (string, error) { var lastCheckpointedWal string // Get last checkpointed wal from lts - for wal := range r.wal.ListLtsWals("") { + for wal := range r.wal.ListOSTWals("") { if wal.Err != nil { return "", wal.Err } @@ -268,7 +268,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { doneCh := make(chan struct{}) defer close(doneCh) - for walFile := range r.wal.ListLtsWals(startWalSeq) { + for walFile := range r.wal.ListOSTWals(startWalSeq) { if walFile.Err != nil { return "", walFile.Err } @@ -318,16 +318,19 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { doFullSync = true r.log.Warn("no startWalSeq in db, doing a full sync") } else { - ok, err := r.wal.HasLtsWal(curWalSeq) + ok, err := r.wal.HasOSTWal(curWalSeq) if err != nil { return err } if !ok { - r.log.Warnf("no wal with seq %q in lts, doing a full sync", curWalSeq) + r.log.Warnf("no wal with seq %q in objectstorage, doing a full sync", curWalSeq) doFullSync = true } - // if the epoch of the wals has changed this means etcd has been reset. If so we should do a full resync since we are saving in the rdb also data that was not yet committed to lts so we should have the rdb ahead of the current lts data + // if the epoch of the wals has changed this means etcd has been reset. If so + // we should do a full resync since we are saving in the rdb also data that + // was not yet committed to objectstorage so we should have the rdb ahead of + // the current objectstorage data // TODO(sgotti) improve this to avoid doing a full resync curWalSequence, err := sequence.Parse(curWalSeq) if err != nil { @@ -361,17 +364,17 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { r.log.Infof("startWalSeq: %s", curWalSeq) // Sync from wals - // sync from lts until the current known lastCommittedStorageWal in etcd - // since wals are first committed to lts and then in etcd we would like to - // avoid to store in rdb something that is not yet marked as committedstorage - // in etcd + // sync from objectstorage until the current known lastCommittedStorageWal in + // etcd since wals are first committed to objectstorage and then in etcd we + // would like to avoid to store in rdb something that is not yet marked as + // committedstorage in etcd curWalSeq, err = r.SyncFromWals(curWalSeq, lastCommittedStorageWal) if err != nil { return errors.Wrap(err, "failed to sync from wals") } // Get the first available wal from etcd and check that our current walseq - // from wals on lts is >= + // from wals on objectstorage is >= // if not (this happens when syncFromWals takes some time and in the meantime // many new wals are written, the next sync should be faster and able to continue firstAvailableWalData, revision, err := r.wal.FirstAvailableWalData(ctx) @@ -412,7 +415,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { } //} - //// update readdb only when the wal has been committed to lts + //// update readdb only when the wal has been committed to objectstorage //if walElement.WalData.WalStatus != wal.WalStatusCommittedStorage { // return nil //} @@ -510,8 +513,9 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { // if theres a wal seq epoch change something happened to etcd, usually (if // the user hasn't messed up with etcd keys) this means etcd has been reset - // in such case we should resync from the lts state to ensure we apply all the - // wal marked as committedstorage (since they could have been lost from etcd) + // in such case we should resync from the objectstorage state to ensure we + // apply all the wal marked as committedstorage (since they could have been + // lost from etcd) curWalSeq, err := r.GetCommittedWalSequence(tx) if err != nil { return err @@ -532,7 +536,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { weWalEpoch := weWalSequence.Epoch if curWalEpoch != weWalEpoch { r.Initialized = false - return errors.Errorf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from lts", curWalEpoch, weWalEpoch) + return errors.Errorf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from objectstorage", curWalEpoch, weWalEpoch) } } @@ -565,7 +569,7 @@ func (r *ReadDB) handleEvent(tx *db.Tx, we *wal.WatchElement) error { } func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error { - // update readdb only when the wal has been committed to lts + // update readdb only when the wal has been committed to objectstorage //if we.WalData.WalStatus != wal.WalStatusCommittedStorage { // return nil //} diff --git a/internal/services/gateway/gateway.go b/internal/services/gateway/gateway.go index bbc21c4..f7c3140 100644 --- a/internal/services/gateway/gateway.go +++ b/internal/services/gateway/gateway.go @@ -51,7 +51,7 @@ const ( type Gateway struct { c *config.Gateway - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage runserviceClient *rsapi.Client configstoreClient *csapi.Client ch *command.CommandHandler @@ -115,7 +115,7 @@ func NewGateway(c *config.Gateway) (*Gateway, error) { return nil, errors.Errorf("unknown token signing method: %q", c.TokenSigning.Method) } - lts, err := scommon.NewLTS(&c.LTS) + ost, err := scommon.NewObjectStorage(&c.ObjectStorage) if err != nil { return nil, err } @@ -126,7 +126,7 @@ func NewGateway(c *config.Gateway) (*Gateway, error) { return &Gateway{ c: c, - lts: lts, + ost: ost, runserviceClient: rsapi.NewClient(c.RunServiceURL), configstoreClient: configstoreClient, ch: ch, diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index 12caea2..c636001 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -101,15 +101,15 @@ func httpResponse(w http.ResponseWriter, code int, res interface{}) error { type LogsHandler struct { log *zap.SugaredLogger e *etcd.Store - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage wal *wal.WalManager } -func NewLogsHandler(logger *zap.Logger, e *etcd.Store, lts *objectstorage.ObjStorage, wal *wal.WalManager) *LogsHandler { +func NewLogsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) *LogsHandler { return &LogsHandler{ log: logger.Sugar(), e: e, - lts: lts, + ost: ost, wal: wal, } } @@ -178,7 +178,7 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) { - r, err := store.GetRunEtcdOrLTS(ctx, h.e, h.wal, runID) + r, err := store.GetRunEtcdOrOST(ctx, h.e, h.wal, runID) if err != nil { return err, true } @@ -198,11 +198,11 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se if task.Steps[step].LogPhase == types.RunTaskFetchPhaseFinished { var logPath string if setup { - logPath = store.LTSRunTaskSetupLogPath(task.ID) + logPath = store.OSTRunTaskSetupLogPath(task.ID) } else { - logPath = store.LTSRunTaskStepLogPath(task.ID, step) + logPath = store.OSTRunTaskStepLogPath(task.ID, step) } - f, err := h.lts.ReadObject(logPath) + f, err := h.ost.ReadObject(logPath) if err != nil { if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err), true @@ -364,7 +364,7 @@ func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } if run == nil { - run, err = store.LTSGetRun(h.wal, runID) + run, err = store.OSTGetRun(h.wal, runID) if err != nil && err != objectstorage.ErrNotExist { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -375,7 +375,7 @@ func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - rc, err := store.LTSGetRunConfig(h.wal, run.ID) + rc, err := store.OSTGetRunConfig(h.wal, run.ID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index 676c576..233ace4 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -199,13 +199,13 @@ func (h *ExecutorTasksHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) type ArchivesHandler struct { log *zap.SugaredLogger - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage } -func NewArchivesHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *ArchivesHandler { +func NewArchivesHandler(logger *zap.Logger, ost *objectstorage.ObjStorage) *ArchivesHandler { return &ArchivesHandler{ log: logger.Sugar(), - lts: lts, + ost: ost, } } @@ -242,8 +242,8 @@ func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error { - archivePath := store.LTSRunArchivePath(rtID, step) - f, err := h.lts.ReadObject(archivePath) + archivePath := store.OSTRunArchivePath(rtID, step) + f, err := h.ost.ReadObject(archivePath) if err != nil { if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err) @@ -260,13 +260,13 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error type CacheHandler struct { log *zap.SugaredLogger - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage } -func NewCacheHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *CacheHandler { +func NewCacheHandler(logger *zap.Logger, ost *objectstorage.ObjStorage) *CacheHandler { return &CacheHandler{ log: logger.Sugar(), - lts: lts, + ost: ost, } } @@ -289,7 +289,7 @@ func (h *CacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { query := r.URL.Query() _, prefix := query["prefix"] - matchedKey, err := matchCache(h.lts, key, prefix) + matchedKey, err := matchCache(h.ost, key, prefix) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -316,8 +316,8 @@ func (h *CacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } -func matchCache(lts *objectstorage.ObjStorage, key string, prefix bool) (string, error) { - cachePath := store.LTSCachePath(key) +func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, error) { + cachePath := store.OSTCachePath(key) if prefix { doneCh := make(chan struct{}) @@ -325,7 +325,7 @@ func matchCache(lts *objectstorage.ObjStorage, key string, prefix bool) (string, // get the latest modified object var lastObject *objectstorage.ObjectInfo - for object := range lts.List(store.LTSCacheDir()+"/"+key, "", false, doneCh) { + for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) { if object.Err != nil { return "", object.Err } @@ -339,10 +339,10 @@ func matchCache(lts *objectstorage.ObjStorage, key string, prefix bool) (string, return "", nil } - return store.LTSCacheKey(lastObject.Path), nil + return store.OSTCacheKey(lastObject.Path), nil } - _, err := lts.Stat(cachePath) + _, err := ost.Stat(cachePath) if err == objectstorage.ErrNotExist { return "", nil } @@ -353,8 +353,8 @@ func matchCache(lts *objectstorage.ObjStorage, key string, prefix bool) (string, } func (h *CacheHandler) readCache(key string, w io.Writer) error { - cachePath := store.LTSCachePath(key) - f, err := h.lts.ReadObject(cachePath) + cachePath := store.OSTCachePath(key) + f, err := h.ost.ReadObject(cachePath) if err != nil { if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err) @@ -371,13 +371,13 @@ func (h *CacheHandler) readCache(key string, w io.Writer) error { type CacheCreateHandler struct { log *zap.SugaredLogger - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage } -func NewCacheCreateHandler(logger *zap.Logger, lts *objectstorage.ObjStorage) *CacheCreateHandler { +func NewCacheCreateHandler(logger *zap.Logger, ost *objectstorage.ObjStorage) *CacheCreateHandler { return &CacheCreateHandler{ log: logger.Sugar(), - lts: lts, + ost: ost, } } @@ -400,7 +400,7 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-cache") - matchedKey, err := matchCache(h.lts, key, false) + matchedKey, err := matchCache(h.ost, key, false) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -410,8 +410,8 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - cachePath := store.LTSCachePath(key) - if err := h.lts.WriteObject(cachePath, r.Body); err != nil { + cachePath := store.OSTCachePath(key) + if err := h.ost.WriteObject(cachePath, r.Body); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index 4ec73c1..347f1ec 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -40,16 +40,16 @@ type CommandHandler struct { log *zap.SugaredLogger e *etcd.Store readDB *readdb.ReadDB - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage wal *wal.WalManager } -func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, lts *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler { +func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler { return &CommandHandler{ log: logger.Sugar(), e: e, readDB: readDB, - lts: lts, + ost: ost, wal: wal, } } @@ -218,12 +218,12 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) // fetch the existing runconfig and run s.log.Infof("creating run from existing run") - rc, err := store.LTSGetRunConfig(s.wal, req.RunID) + rc, err := store.OSTGetRunConfig(s.wal, req.RunID) if err != nil { return nil, util.NewErrBadRequest(errors.Wrapf(err, "runconfig %q doens't exist", req.RunID)) } - run, err := store.GetRunEtcdOrLTS(ctx, s.e, s.wal, req.RunID) + run, err := store.GetRunEtcdOrOST(ctx, s.e, s.wal, req.RunID) if err != nil { return nil, err } @@ -382,14 +382,14 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg actions := []*wal.Action{} // persist group counter - rca, err := store.LTSUpdateRunCounterAction(ctx, c, run.Group) + rca, err := store.OSTUpdateRunCounterAction(ctx, c, run.Group) if err != nil { return err } actions = append(actions, rca) // persist run config - rca, err = store.LTSSaveRunConfigAction(rc) + rca, err = store.OSTSaveRunConfigAction(rc) if err != nil { return err } @@ -542,11 +542,11 @@ func (s *CommandHandler) getRunCounter(group string) (uint64, *wal.ChangeGroupsU var cgt *wal.ChangeGroupsUpdateToken err := s.readDB.Do(func(tx *db.Tx) error { var err error - c, err = s.readDB.GetRunCounterLTS(tx, pl[1]) + c, err = s.readDB.GetRunCounterOST(tx, pl[1]) if err != nil { return err } - cgt, err = s.readDB.GetChangeGroupsUpdateTokensLTS(tx, []string{"counter-" + pl[1]}) + cgt, err = s.readDB.GetChangeGroupsUpdateTokensOST(tx, []string{"counter-" + pl[1]}) return err }) if err != nil { diff --git a/internal/services/runservice/scheduler/readdb/create.go b/internal/services/runservice/scheduler/readdb/create.go index 26bc2fc..664b5fe 100644 --- a/internal/services/runservice/scheduler/readdb/create.go +++ b/internal/services/runservice/scheduler/readdb/create.go @@ -16,7 +16,6 @@ package readdb var Stmts = []string{ // last processed etcd event revision - //"create table revision (clusterid varchar, revision bigint, PRIMARY KEY(revision))", "create table revision (revision bigint, PRIMARY KEY(revision))", "create table run (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", @@ -28,17 +27,17 @@ var Stmts = []string{ // changegrouprevision stores the current revision of the changegroup for optimistic locking "create table changegrouprevision (id varchar, revision varchar, PRIMARY KEY (id, revision))", - // LTS - "create table revision_lts (revision bigint, PRIMARY KEY(revision))", + // objectstorage + "create table revision_ost (revision bigint, PRIMARY KEY(revision))", // committedwalsequence stores the last committed wal sequence - "create table committedwalsequence_lts (seq varchar, PRIMARY KEY (seq))", + "create table committedwalsequence_ost (seq varchar, PRIMARY KEY (seq))", - "create table changegrouprevision_lts (id varchar, revision varchar, PRIMARY KEY (id, revision))", + "create table changegrouprevision_ost (id varchar, revision varchar, PRIMARY KEY (id, revision))", - "create table run_lts (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", + "create table run_ost (id varchar, grouppath varchar, phase varchar, PRIMARY KEY (id, grouppath, phase))", - "create table rundata_lts (id varchar, data bytea, PRIMARY KEY (id))", + "create table rundata_ost (id varchar, data bytea, PRIMARY KEY (id))", - "create table runcounter_lts (groupid varchar, counter bigint, PRIMARY KEY (groupid))", + "create table runcounter_ost (groupid varchar, counter bigint, PRIMARY KEY (groupid))", } diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/scheduler/readdb/readdb.go index 87db08d..6a007fa 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/scheduler/readdb/readdb.go @@ -69,23 +69,23 @@ var ( changegrouprevisionSelect = sb.Select("id, revision").From("changegrouprevision") changegrouprevisionInsert = sb.Insert("changegrouprevision").Columns("id", "revision") - // readdb tables based on lts data - revisionLTSSelect = sb.Select("revision").From("revision_lts") - revisionLTSInsert = sb.Insert("revision_lts").Columns("revision") + // readdb tables based on objectstorage data + revisionOSTSelect = sb.Select("revision").From("revision_ost") + revisionOSTInsert = sb.Insert("revision_ost").Columns("revision") - runLTSSelect = sb.Select("id", "grouppath", "phase").From("run_lts") - runLTSInsert = sb.Insert("run_lts").Columns("id", "grouppath", "phase") + runOSTSelect = sb.Select("id", "grouppath", "phase").From("run_ost") + runOSTInsert = sb.Insert("run_ost").Columns("id", "grouppath", "phase") - rundataLTSInsert = sb.Insert("rundata_lts").Columns("id", "data") + rundataOSTInsert = sb.Insert("rundata_ost").Columns("id", "data") - committedwalsequenceLTSSelect = sb.Select("seq").From("committedwalsequence_lts") - committedwalsequenceLTSInsert = sb.Insert("committedwalsequence_lts").Columns("seq") + committedwalsequenceOSTSelect = sb.Select("seq").From("committedwalsequence_ost") + committedwalsequenceOSTInsert = sb.Insert("committedwalsequence_ost").Columns("seq") - changegrouprevisionLTSSelect = sb.Select("id, revision").From("changegrouprevision_lts") - changegrouprevisionLTSInsert = sb.Insert("changegrouprevision_lts").Columns("id", "revision") + changegrouprevisionOSTSelect = sb.Select("id, revision").From("changegrouprevision_ost") + changegrouprevisionOSTInsert = sb.Insert("changegrouprevision_ost").Columns("id", "revision") - runcounterLTSSelect = sb.Select("groupid", "counter").From("runcounter_lts") - runcounterLTSInsert = sb.Insert("runcounter_lts").Columns("groupid", "counter") + runcounterOSTSelect = sb.Select("groupid", "counter").From("runcounter_ost") + runcounterOSTInsert = sb.Insert("runcounter_ost").Columns("groupid", "counter") ) type ReadDB struct { @@ -93,7 +93,7 @@ type ReadDB struct { dataDir string e *etcd.Store rdb *db.DB - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage wal *wal.WalManager Initialized bool @@ -108,7 +108,7 @@ type ReadDB struct { dbWriteLock sync.Mutex } -func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, lts *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) { +func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) { if err := os.MkdirAll(dataDir, 0770); err != nil { return nil, err } @@ -126,7 +126,7 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. log: logger.Sugar(), e: e, dataDir: dataDir, - lts: lts, + ost: ost, wal: wal, rdb: rdb, } @@ -152,8 +152,8 @@ func (r *ReadDB) Initialize(ctx context.Context) error { if err := r.ResetDB(); err != nil { return errors.Wrapf(err, "failed to reset db") } - if err := r.SyncLTS(ctx); err != nil { - return errors.Wrapf(err, "error syncing lts db") + if err := r.SyncObjectStorage(ctx); err != nil { + return errors.Wrapf(err, "error syncing objectstorage db") } if err := r.SyncRDB(ctx); err != nil { return errors.Wrapf(err, "error syncing run db") @@ -297,9 +297,9 @@ func (r *ReadDB) Run(ctx context.Context) error { wg.Add(1) go func() { - r.log.Infof("starting HandleEvents") - if err := r.HandleEvents(ctx); err != nil { - r.log.Errorf("handleevents err: %+v", err) + r.log.Infof("starting handleEvents") + if err := r.handleEvents(ctx); err != nil { + r.log.Errorf("handleEvents err: %+v", err) errCh <- err } wg.Done() @@ -307,9 +307,9 @@ func (r *ReadDB) Run(ctx context.Context) error { wg.Add(1) go func() { - r.log.Infof("starting HandleEventsLTS") - if err := r.HandleEventsLTS(ctx); err != nil { - r.log.Errorf("handleevents lts err: %+v", err) + r.log.Infof("starting handleEventsOST") + if err := r.handleEventsOST(ctx); err != nil { + r.log.Errorf("handleEventsOST err: %+v", err) errCh <- err } wg.Done() @@ -331,7 +331,7 @@ func (r *ReadDB) Run(ctx context.Context) error { } } -func (r *ReadDB) HandleEvents(ctx context.Context) error { +func (r *ReadDB) handleEvents(ctx context.Context) error { var revision int64 var lastRuns []*RunData err := r.rdb.Do(func(tx *db.Tx) error { @@ -447,16 +447,16 @@ func (r *ReadDB) handleRunEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdcl return errors.Wrap(err, "failed to delete run") } - // Run has been deleted from etcd, this means that it was stored in the LTS + // Run has been deleted from etcd, this means that it was stored in the objectstorage // TODO(sgotti) this is here just to avoid a window where the run is not in - // run table and in the run_lts table but should be changed/removed when we'll + // run table and in the run_os table but should be changed/removed when we'll // implement run removal - run, err := store.LTSGetRun(r.wal, runID) + run, err := store.OSTGetRun(r.wal, runID) if err != nil { return err } - return r.insertRunLTS(tx, run, []byte{}) + return r.insertRunOST(tx, run, []byte{}) } return nil @@ -501,12 +501,12 @@ func (r *ReadDB) handleChangeGroupEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp return nil } -func (r *ReadDB) SyncLTS(ctx context.Context) error { +func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { // get the last committed storage wal sequence saved in the rdb curWalSeq := "" err := r.rdb.Do(func(tx *db.Tx) error { var err error - curWalSeq, err = r.GetCommittedWalSequenceLTS(tx) + curWalSeq, err = r.GetCommittedWalSequenceOST(tx) if err != nil { return err } @@ -526,19 +526,19 @@ func (r *ReadDB) SyncLTS(ctx context.Context) error { doFullSync = true r.log.Warn("no startWalSeq in db, doing a full sync") } else { - ok, err := r.wal.HasLtsWal(curWalSeq) + ok, err := r.wal.HasOSTWal(curWalSeq) if err != nil { return err } if !ok { - r.log.Warnf("no wal with seq %q in lts, doing a full sync", curWalSeq) + r.log.Warnf("no wal with seq %q in objectstorage, doing a full sync", curWalSeq) doFullSync = true } // if the epoch of the wals has changed this means etcd has been reset. If so // we should do a full resync since we are saving in the rdb also data that - // was not yet committed to lts so we should have the rdb ahead of the current - // lts data + // was not yet committed to objectstorage so we should have the rdb ahead of + // the current objectstorage data // TODO(sgotti) improve this to avoid doing a full resync curWalSequence, err := sequence.Parse(curWalSeq) if err != nil { @@ -557,7 +557,7 @@ func (r *ReadDB) SyncLTS(ctx context.Context) error { } if doFullSync { - r.log.Infof("doing a full sync from lts files") + r.log.Infof("doing a full sync from objectstorage files") if err := r.ResetDB(); err != nil { return err } @@ -572,8 +572,8 @@ func (r *ReadDB) SyncLTS(ctx context.Context) error { r.log.Infof("startWalSeq: %s", curWalSeq) // Sync from wals - // sync from lts until the current known lastCommittedStorageWal in etcd - // since wals are first committed to lts and then in etcd we would like to + // sync from objectstorage until the current known lastCommittedStorageWal in etcd + // since wals are first committed to objectstorage and then in etcd we would like to // avoid to store in rdb something that is not yet marked as committedstorage // in etcd curWalSeq, err = r.SyncFromWals(curWalSeq, lastCommittedStorageWal) @@ -582,7 +582,7 @@ func (r *ReadDB) SyncLTS(ctx context.Context) error { } // Get the first available wal from etcd and check that our current walseq - // from wals on lts is >= + // from wals on objectstorage is >= // if not (this happens when syncFromWals takes some time and in the meantime // many new wals are written, the next sync should be faster and able to continue firstAvailableWalData, revision, err := r.wal.FirstAvailableWalData(ctx) @@ -604,7 +604,7 @@ func (r *ReadDB) SyncLTS(ctx context.Context) error { } err = r.rdb.Do(func(tx *db.Tx) error { - if err := insertRevisionLTS(tx, revision); err != nil { + if err := insertRevisionOST(tx, revision); err != nil { return err } @@ -617,7 +617,7 @@ func (r *ReadDB) SyncLTS(ctx context.Context) error { continue } - if err := r.insertCommittedWalSequenceLTS(tx, walElement.WalData.WalSequence); err != nil { + if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil { return err } @@ -669,7 +669,7 @@ func (r *ReadDB) SyncFromDump() (string, error) { Phase: ir.Phase, } r.log.Infof("inserting run %q", run.ID) - if err := r.insertRunLTS(tx, run, []byte{}); err != nil { + if err := r.insertRunOST(tx, run, []byte{}); err != nil { return err } case common.DataTypeRunCounter: @@ -678,7 +678,7 @@ func (r *ReadDB) SyncFromDump() (string, error) { return err } r.log.Infof("inserting run counter %q, c: %d", irc.Group, irc.Counter) - if err := r.insertRunCounterLTS(tx, irc.Group, irc.Counter); err != nil { + if err := r.insertRunCounterOST(tx, irc.Group, irc.Counter); err != nil { return err } } @@ -693,7 +693,7 @@ func (r *ReadDB) SyncFromDump() (string, error) { // get last dump var dumpPath string - for object := range r.lts.List(path.Join(common.StorageRunsIndexesDir)+"/", "", true, doneCh) { + for object := range r.ost.List(path.Join(common.StorageRunsIndexesDir)+"/", "", true, doneCh) { if object.Err != nil { return "", object.Err } @@ -705,7 +705,7 @@ func (r *ReadDB) SyncFromDump() (string, error) { return "", nil } - f, err := r.lts.ReadObject(dumpPath) + f, err := r.ost.ReadObject(dumpPath) if err != nil { if err == objectstorage.ErrNotExist { r.log.Warnf("object %s disappeared, ignoring", dumpPath) @@ -767,7 +767,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { return err } walFilef.Close() - if err := r.insertCommittedWalSequenceLTS(tx, walFile.WalSequence); err != nil { + if err := r.insertCommittedWalSequenceOST(tx, walFile.WalSequence); err != nil { return err } if err := r.applyWal(tx, header.WalDataFileID); err != nil { @@ -786,7 +786,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { doneCh := make(chan struct{}) defer close(doneCh) - for walFile := range r.wal.ListLtsWals(startWalSeq) { + for walFile := range r.wal.ListOSTWals(startWalSeq) { if walFile.Err != nil { return "", walFile.Err } @@ -811,7 +811,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { return lastWalSeq, nil } -func (r *ReadDB) HandleEventsLTS(ctx context.Context) error { +func (r *ReadDB) handleEventsOST(ctx context.Context) error { var revision int64 err := r.rdb.Do(func(tx *db.Tx) error { err := tx.QueryRow("select revision from revision order by revision desc limit 1").Scan(&revision) @@ -851,9 +851,10 @@ func (r *ReadDB) HandleEventsLTS(ctx context.Context) error { // if theres a wal seq epoch change something happened to etcd, usually (if // the user hasn't messed up with etcd keys) this means etcd has been reset - // in such case we should resync from the lts state to ensure we apply all the - // wal marked as committedstorage (since they could have been lost from etcd) - curWalSeq, err := r.GetCommittedWalSequenceLTS(tx) + // in such case we should resync from the objectstorage state to ensure we + // apply all the wal marked as committedstorage (since they could have been + // lost from etcd) + curWalSeq, err := r.GetCommittedWalSequenceOST(tx) if err != nil { return err } @@ -873,15 +874,15 @@ func (r *ReadDB) HandleEventsLTS(ctx context.Context) error { weWalEpoch := weWalSequence.Epoch if curWalEpoch != weWalEpoch { r.Initialized = false - return errors.Errorf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from lts", curWalEpoch, weWalEpoch) + return errors.Errorf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from objectstorage", curWalEpoch, weWalEpoch) } } - if err := r.handleEventLTS(tx, we); err != nil { + if err := r.handleEventOST(tx, we); err != nil { return err } - if err := insertRevisionLTS(tx, we.Revision); err != nil { + if err := insertRevisionOST(tx, we.Revision); err != nil { return err } return nil @@ -933,7 +934,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { if err := json.Unmarshal(action.Data, &run); err != nil { return err } - if err := r.insertRunLTS(tx, run, action.Data); err != nil { + if err := r.insertRunOST(tx, run, action.Data); err != nil { return err } case string(common.DataTypeRunCounter): @@ -942,7 +943,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { return err } r.log.Infof("inserting run counter %q, c: %d", action.ID, runCounter) - if err := r.insertRunCounterLTS(tx, action.ID, runCounter); err != nil { + if err := r.insertRunCounterOST(tx, action.ID, runCounter); err != nil { return err } } @@ -957,7 +958,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { return nil } -func (r *ReadDB) handleEventLTS(tx *db.Tx, we *wal.WatchElement) error { +func (r *ReadDB) handleEventOST(tx *db.Tx, we *wal.WatchElement) error { //r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) //key := string(ev.Kv.Key) @@ -969,18 +970,18 @@ func (r *ReadDB) handleEventLTS(tx *db.Tx, we *wal.WatchElement) error { func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error { for cgName, cgRev := range we.ChangeGroupsRevisions { - if err := r.insertChangeGroupRevisionLTS(tx, cgName, cgRev); err != nil { + if err := r.insertChangeGroupRevisionOST(tx, cgName, cgRev); err != nil { return err } } if we.WalData != nil { - // update readdb only when the wal has been committed to lts + // update readdb only when the wal has been committed to objectstorage if we.WalData.WalStatus != wal.WalStatusCommitted { return nil } - if err := r.insertCommittedWalSequenceLTS(tx, we.WalData.WalSequence); err != nil { + if err := r.insertCommittedWalSequenceOST(tx, we.WalData.WalSequence); err != nil { return err } @@ -1014,14 +1015,14 @@ func insertRevision(tx *db.Tx, revision int64) error { return nil } -func insertRevisionLTS(tx *db.Tx, revision int64) error { +func insertRevisionOST(tx *db.Tx, revision int64) error { // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from revision_lts"); err != nil { + if _, err := tx.Exec("delete from revision_ost"); err != nil { return errors.Wrap(err, "failed to delete revision") } // TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types... //q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql() - q, args, err := revisionLTSInsert.Values(revision).ToSql() + q, args, err := revisionOSTInsert.Values(revision).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -1065,7 +1066,7 @@ func insertRun(tx *db.Tx, run *types.Run, data []byte) error { return nil } -func (r *ReadDB) insertRunLTS(tx *db.Tx, run *types.Run, data []byte) error { +func (r *ReadDB) insertRunOST(tx *db.Tx, run *types.Run, data []byte) error { // add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02) groupPath := run.Group if !strings.HasSuffix(groupPath, "/") { @@ -1073,10 +1074,10 @@ func (r *ReadDB) insertRunLTS(tx *db.Tx, run *types.Run, data []byte) error { } // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from run_lts where id = $1", run.ID); err != nil { - return errors.Wrap(err, "failed to delete run lts") + if _, err := tx.Exec("delete from run_ost where id = $1", run.ID); err != nil { + return errors.Wrap(err, "failed to delete run objectstorage") } - q, args, err := runLTSInsert.Values(run.ID, groupPath, run.Phase).ToSql() + q, args, err := runOSTInsert.Values(run.ID, groupPath, run.Phase).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -1085,10 +1086,10 @@ func (r *ReadDB) insertRunLTS(tx *db.Tx, run *types.Run, data []byte) error { } // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from rundata_lts where id = $1", run.ID); err != nil { + if _, err := tx.Exec("delete from rundata_ost where id = $1", run.ID); err != nil { return errors.Wrap(err, "failed to delete rundata") } - q, args, err = rundataLTSInsert.Values(run.ID, data).ToSql() + q, args, err = rundataOSTInsert.Values(run.ID, data).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -1173,14 +1174,14 @@ func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFi } func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) { - useLTS := false + useObjectStorage := false for _, phase := range phaseFilter { if phase == types.RunPhaseFinished || phase == types.RunPhaseCancelled { - useLTS = true + useObjectStorage = true } } if len(phaseFilter) == 0 { - useLTS = true + useObjectStorage = true } runDataRDB, err := r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) @@ -1194,14 +1195,14 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ lastRunsMap[r.GroupPath] = r } - if useLTS { + if useObjectStorage { // skip if the phase requested is not finished - runDataLTS, err := r.GetRunsFilteredLTS(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) + runDataOST, err := r.GetRunsFilteredOST(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder) if err != nil { return nil, err } - for _, rd := range runDataLTS { + for _, rd := range runDataOST { if lastRun { if lr, ok := lastRunsMap[rd.GroupPath]; ok { switch sortOrder { @@ -1250,8 +1251,8 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ continue } - // get run from lts - run, err := store.LTSGetRun(r.wal, runID) + // get run from objectstorage + run, err := store.OSTGetRun(r.wal, runID) if err != nil { return nil, errors.WithStack(err) } @@ -1262,16 +1263,16 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ return aruns, nil } -func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, lts bool) sq.SelectBuilder { +func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, objectstorage bool) sq.SelectBuilder { runt := "run" rundatat := "rundata" fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"} if len(groups) > 0 && lastRun { fields = []string{"max(run.id)", "run.grouppath", "run.phase", "rundata.data"} } - if lts { - runt = "run_lts" - rundatat = "rundata_lts" + if objectstorage { + runt = "run_ost" + rundatat = "rundata_ost" } r.log.Debugf("runt: %s", runt) @@ -1329,7 +1330,7 @@ func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, return fetchRuns(tx, q, args...) } -func (r *ReadDB) GetRunsFilteredLTS(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { +func (r *ReadDB) GetRunsFilteredOST(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) { s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, true) q, args, err := s.ToSql() @@ -1445,13 +1446,13 @@ func scanChangeGroupsRevision(rows *sql.Rows) (types.ChangeGroupsRevisions, erro return changegroups, nil } -func (r *ReadDB) insertCommittedWalSequenceLTS(tx *db.Tx, seq string) error { +func (r *ReadDB) insertCommittedWalSequenceOST(tx *db.Tx, seq string) error { r.log.Infof("insert seq: %s", seq) // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from committedwalsequence_lts"); err != nil { + if _, err := tx.Exec("delete from committedwalsequence_ost"); err != nil { return errors.Wrap(err, "failed to delete committedwalsequence") } - q, args, err := committedwalsequenceLTSInsert.Values(seq).ToSql() + q, args, err := committedwalsequenceOSTInsert.Values(seq).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -1461,10 +1462,10 @@ func (r *ReadDB) insertCommittedWalSequenceLTS(tx *db.Tx, seq string) error { return nil } -func (r *ReadDB) GetCommittedWalSequenceLTS(tx *db.Tx) (string, error) { +func (r *ReadDB) GetCommittedWalSequenceOST(tx *db.Tx) (string, error) { var seq string - q, args, err := committedwalsequenceLTSSelect.OrderBy("seq").Limit(1).ToSql() + q, args, err := committedwalsequenceOSTSelect.OrderBy("seq").Limit(1).ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) if err != nil { return "", errors.Wrap(err, "failed to build query") @@ -1477,16 +1478,16 @@ func (r *ReadDB) GetCommittedWalSequenceLTS(tx *db.Tx) (string, error) { return seq, err } -func (r *ReadDB) insertChangeGroupRevisionLTS(tx *db.Tx, changegroup string, revision int64) error { +func (r *ReadDB) insertChangeGroupRevisionOST(tx *db.Tx, changegroup string, revision int64) error { r.log.Infof("insertChangeGroupRevision: %s %d", changegroup, revision) // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from changegrouprevision_lts where id = $1", changegroup); err != nil { + if _, err := tx.Exec("delete from changegrouprevision_ost where id = $1", changegroup); err != nil { return errors.Wrap(err, "failed to delete run") } // insert only if revision > 0 if revision > 0 { - q, args, err := changegrouprevisionLTSInsert.Values(changegroup, revision).ToSql() + q, args, err := changegrouprevisionOSTInsert.Values(changegroup, revision).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -1497,14 +1498,14 @@ func (r *ReadDB) insertChangeGroupRevisionLTS(tx *db.Tx, changegroup string, rev return nil } -func (r *ReadDB) GetChangeGroupsUpdateTokensLTS(tx *db.Tx, groups []string) (*wal.ChangeGroupsUpdateToken, error) { - s := changegrouprevisionLTSSelect.Where(sq.Eq{"id": groups}) +func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*wal.ChangeGroupsUpdateToken, error) { + s := changegrouprevisionOSTSelect.Where(sq.Eq{"id": groups}) q, args, err := s.ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) if err != nil { return nil, errors.Wrap(err, "failed to build query") } - cgr, err := fetchChangeGroupsRevisionLTS(tx, q, args...) + cgr, err := fetchChangeGroupsRevisionOST(tx, q, args...) if err != nil { return nil, err } @@ -1524,7 +1525,7 @@ func (r *ReadDB) GetChangeGroupsUpdateTokensLTS(tx *db.Tx, groups []string) (*wa return &wal.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil } -func fetchChangeGroupsRevisionLTS(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) { +func fetchChangeGroupsRevisionOST(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) { rows, err := tx.Query(q, args...) if err != nil { return nil, err @@ -1533,7 +1534,7 @@ func fetchChangeGroupsRevisionLTS(tx *db.Tx, q string, args ...interface{}) (map return scanChangeGroupsRevision(rows) } -func scanChangeGroupsRevisionLTS(rows *sql.Rows) (map[string]int64, error) { +func scanChangeGroupsRevisionOST(rows *sql.Rows) (map[string]int64, error) { changegroups := map[string]int64{} for rows.Next() { var ( @@ -1551,14 +1552,14 @@ func scanChangeGroupsRevisionLTS(rows *sql.Rows) (map[string]int64, error) { return changegroups, nil } -func (r *ReadDB) insertRunCounterLTS(tx *db.Tx, group string, counter uint64) error { +func (r *ReadDB) insertRunCounterOST(tx *db.Tx, group string, counter uint64) error { // poor man insert or update that works because transaction isolation level is serializable - if _, err := tx.Exec("delete from runcounter_lts where groupid = $1", group); err != nil { + if _, err := tx.Exec("delete from runcounter_ost where groupid = $1", group); err != nil { return errors.Wrap(err, "failed to delete revision") } // TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types... //q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql() - q, args, err := runcounterLTSInsert.Values(group, counter).ToSql() + q, args, err := runcounterOSTInsert.Values(group, counter).ToSql() if err != nil { return errors.Wrap(err, "failed to build query") } @@ -1568,11 +1569,11 @@ func (r *ReadDB) insertRunCounterLTS(tx *db.Tx, group string, counter uint64) er return nil } -func (r *ReadDB) GetRunCounterLTS(tx *db.Tx, group string) (uint64, error) { +func (r *ReadDB) GetRunCounterOST(tx *db.Tx, group string) (uint64, error) { var g string var counter uint64 - q, args, err := runcounterLTSSelect.Where(sq.Eq{"groupid": group}).ToSql() + q, args, err := runcounterOSTSelect.Where(sq.Eq{"groupid": group}).ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) if err != nil { return 0, errors.Wrap(err, "failed to build query") @@ -1585,8 +1586,8 @@ func (r *ReadDB) GetRunCounterLTS(tx *db.Tx, group string) (uint64, error) { return counter, err } -func (r *ReadDB) GetRunCountersLTS(tx *db.Tx, start string, limit int) ([]*types.RunCounter, error) { - s := runcounterLTSSelect.Where(sq.Gt{"groupid": start}) +func (r *ReadDB) GetRunCountersOST(tx *db.Tx, start string, limit int) ([]*types.RunCounter, error) { + s := runcounterOSTSelect.Where(sq.Gt{"groupid": start}) if limit > 0 { s = s.Limit(uint64(limit)) } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 7b38b5b..5b1af8f 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -333,7 +333,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types Shell: rct.Shell, User: rct.User, Steps: rct.Steps, - CachePrefix: store.LTSRootGroup(r.Group), + CachePrefix: store.OSTRootGroup(r.Group), Status: types.ExecutorTaskStatus{ Phase: types.ExecutorTaskPhaseNotStarted, Steps: make([]*types.ExecutorTaskStepStatus, len(rct.Steps)), @@ -613,7 +613,7 @@ func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.Exec if err != nil { return err } - rc, err := store.LTSGetRunConfig(s.wal, r.ID) + rc, err := store.OSTGetRunConfig(s.wal, r.ID) if err != nil { return errors.Wrapf(err, "cannot get run config %q", r.ID) } @@ -891,9 +891,9 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, var logPath string if setup { - logPath = store.LTSRunTaskSetupLogPath(rt.ID) + logPath = store.OSTRunTaskSetupLogPath(rt.ID) } else { - logPath = store.LTSRunTaskStepLogPath(rt.ID, stepnum) + logPath = store.OSTRunTaskStepLogPath(rt.ID, stepnum) } ok, err := s.fileExists(logPath) if err != nil { @@ -923,7 +923,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, return errors.Errorf("received http status: %d", r.StatusCode) } - return s.lts.WriteObject(logPath, r.Body) + return s.ost.WriteObject(logPath, r.Body) } func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { @@ -1040,7 +1040,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return nil } - path := store.LTSRunArchivePath(rt.ID, stepnum) + path := store.OSTRunArchivePath(rt.ID, stepnum) ok, err := s.fileExists(path) if err != nil { return err @@ -1065,7 +1065,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return errors.Errorf("received http status: %d", r.StatusCode) } - return s.lts.WriteObject(path, r.Body) + return s.ost.WriteObject(path, r.Body) } func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { @@ -1163,7 +1163,7 @@ func (s *Scheduler) runsScheduler(ctx context.Context) error { func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { log.Debugf("runScheduler") - rc, err := store.LTSGetRunConfig(s.wal, r.ID) + rc, err := store.OSTGetRunConfig(s.wal, r.ID) if err != nil { return errors.Wrapf(err, "cannot get run config %q", r.ID) } @@ -1201,14 +1201,14 @@ func (s *Scheduler) finishedRunsArchiver(ctx context.Context) error { } } - // We write archived runs in lts in the ordered they were archived + // We write archived runs in objectstorage in the ordered they were archived runs, err = store.GetRuns(ctx, s.e) if err != nil { return err } for _, r := range runs { if r.Archived { - if err := s.runLTSArchiver(ctx, r); err != nil { + if err := s.runOSTArchiver(ctx, r); err != nil { log.Errorf("err: %+v", err) } } @@ -1262,11 +1262,11 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error return nil } -func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error { - // TODO(sgotti) avoid saving multiple times the run on lts if the +func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { + // TODO(sgotti) avoid saving multiple times the run on objectstorage if the // store.DeletedArchivedRun fails - log.Infof("saving run in lts: %s", r.ID) - ra, err := store.LTSSaveRunAction(r) + log.Infof("saving run in objectstorage: %s", r.ID) + ra, err := store.OSTSaveRunAction(r) if err != nil { return err } @@ -1285,12 +1285,12 @@ func (s *Scheduler) runLTSArchiver(ctx context.Context, r *types.Run) error { return nil } -func (s *Scheduler) dumpLTSLoop(ctx context.Context) { +func (s *Scheduler) dumpOSTLoop(ctx context.Context) { for { - log.Debugf("lts dump loop") + log.Debugf("objectstorage dump loop") // TODO(sgotti) create new dump only after N files - if err := s.dumpLTS(ctx); err != nil { + if err := s.dumpOST(ctx); err != nil { log.Errorf("err: %+v", err) } @@ -1304,7 +1304,7 @@ func (s *Scheduler) dumpLTSLoop(ctx context.Context) { } } -func (s *Scheduler) dumpLTS(ctx context.Context) error { +func (s *Scheduler) dumpOST(ctx context.Context) error { type indexHeader struct { LastWalSequence string } @@ -1328,7 +1328,7 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error { var lastWalSequence string err := s.readDB.Do(func(tx *db.Tx) error { var err error - lastWalSequence, err = s.readDB.GetCommittedWalSequenceLTS(tx) + lastWalSequence, err = s.readDB.GetCommittedWalSequenceOST(tx) return err }) if err != nil { @@ -1348,7 +1348,7 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error { for { err := s.readDB.Do(func(tx *db.Tx) error { var err error - lruns, err := s.readDB.GetRunsFilteredLTS(tx, nil, false, nil, lastRunID, 1000, types.SortOrderDesc) + lruns, err := s.readDB.GetRunsFilteredOST(tx, nil, false, nil, lastRunID, 1000, types.SortOrderDesc) if err != nil { return err } @@ -1380,7 +1380,7 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error { for { err := s.readDB.Do(func(tx *db.Tx) error { var err error - counters, err := s.readDB.GetRunCountersLTS(tx, lastGroup, 1000) + counters, err := s.readDB.GetRunCountersOST(tx, lastGroup, 1000) if err != nil { return err } @@ -1409,18 +1409,18 @@ func (s *Scheduler) dumpLTS(ctx context.Context) error { index := path.Join(common.StorageRunsIndexesDir, indexDir, "all") - if err = s.lts.WriteObject(index, bytes.NewReader(data)); err != nil { + if err = s.ost.WriteObject(index, bytes.NewReader(data)); err != nil { return err } return nil } -func (s *Scheduler) dumpLTSCleanerLoop(ctx context.Context) { +func (s *Scheduler) dumpOSTCleanerLoop(ctx context.Context) { for { - log.Infof("lts dump cleaner loop") + log.Infof("objectstorage dump cleaner loop") - if err := s.dumpLTSCleaner(ctx); err != nil { + if err := s.dumpOSTCleaner(ctx); err != nil { log.Errorf("err: %+v", err) } @@ -1434,7 +1434,7 @@ func (s *Scheduler) dumpLTSCleanerLoop(ctx context.Context) { } } -func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error { +func (s *Scheduler) dumpOSTCleaner(ctx context.Context) error { type indexData struct { ID string Group string @@ -1446,7 +1446,7 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error { doneCh := make(chan struct{}) defer close(doneCh) var indexPath string - for object := range s.lts.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { + for object := range s.ost.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { if object.Err != nil { return object.Err } @@ -1467,7 +1467,7 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error { } for _, object := range objects { - if err := s.lts.DeleteObject(object); err != nil { + if err := s.ost.DeleteObject(object); err != nil { log.Errorf("object: %s, err: %v", object, err) return err } @@ -1513,12 +1513,12 @@ func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.D doneCh := make(chan struct{}) defer close(doneCh) - for object := range s.lts.List(store.LTSCacheDir()+"/", "", true, doneCh) { + for object := range s.ost.List(store.OSTCacheDir()+"/", "", true, doneCh) { if object.Err != nil { return object.Err } if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) { - if err := s.lts.DeleteObject(object.Path); err != nil { + if err := s.ost.DeleteObject(object.Path); err != nil { if err != objectstorage.ErrNotExist { log.Warnf("failed to delete cache object %q: %v", object.Path, err) } @@ -1560,7 +1560,7 @@ func (s *Scheduler) etcdPinger(ctx context.Context) error { type Scheduler struct { c *config.RunServiceScheduler e *etcd.Store - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage wal *wal.WalManager readDB *readdb.ReadDB ch *command.CommandHandler @@ -1571,7 +1571,7 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule level.SetLevel(zapcore.DebugLevel) } - lts, err := scommon.NewLTS(&c.LTS) + ost, err := scommon.NewObjectStorage(&c.ObjectStorage) if err != nil { return nil, err } @@ -1583,12 +1583,12 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule s := &Scheduler{ c: c, e: e, - lts: lts, + ost: ost, } walConf := &wal.WalManagerConfig{ E: e, - Lts: lts, + OST: ost, DataToPathFunc: common.DataToPathFunc, } wal, err := wal.NewWalManager(ctx, logger, walConf) @@ -1597,13 +1597,13 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule } s.wal = wal - readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, lts, wal) + readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, wal) if err != nil { return nil, err } s.readDB = readDB - ch := command.NewCommandHandler(logger, e, readDB, lts, wal) + ch := command.NewCommandHandler(logger, e, readDB, ost, wal) s.ch = ch return s, nil @@ -1661,14 +1661,14 @@ func (s *Scheduler) Run(ctx context.Context) error { executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) executorTaskHandler := api.NewExecutorTaskHandler(s.e) executorTasksHandler := api.NewExecutorTasksHandler(s.e) - archivesHandler := api.NewArchivesHandler(logger, s.lts) - cacheHandler := api.NewCacheHandler(logger, s.lts) - cacheCreateHandler := api.NewCacheCreateHandler(logger, s.lts) + archivesHandler := api.NewArchivesHandler(logger, s.ost) + cacheHandler := api.NewCacheHandler(logger, s.ost) + cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) // api from clients executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch) - logsHandler := api.NewLogsHandler(logger, s.e, s.lts, s.wal) + logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.wal) runHandler := api.NewRunHandler(logger, s.e, s.wal, s.readDB) runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ch) @@ -1714,8 +1714,8 @@ func (s *Scheduler) Run(ctx context.Context) error { go s.runTasksUpdaterLoop(ctx) go s.fetcherLoop(ctx) go s.finishedRunsArchiverLoop(ctx) - go s.dumpLTSLoop(ctx) - go s.dumpLTSCleanerLoop(ctx) + go s.dumpOSTLoop(ctx) + go s.dumpOSTCleanerLoop(ctx) go s.compactChangeGroupsLoop(ctx) go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) go s.executorTaskUpdateHandler(ctx, ch) diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index 39e4f94..25e46c9 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -33,7 +33,7 @@ import ( etcdclientv3 "go.etcd.io/etcd/clientv3" ) -func LTSSubGroupsAndGroupTypes(group string) []string { +func OSTSubGroupsAndGroupTypes(group string) []string { h := util.PathHierarchy(group) if len(h)%2 != 1 { panic(fmt.Errorf("wrong group path %q", group)) @@ -42,7 +42,7 @@ func LTSSubGroupsAndGroupTypes(group string) []string { return h } -func LTSRootGroup(group string) string { +func OSTRootGroup(group string) string { pl := util.PathList(group) if len(pl) < 2 { panic(fmt.Errorf("cannot determine root group name, wrong group path %q", group)) @@ -51,7 +51,7 @@ func LTSRootGroup(group string) string { return pl[1] } -func LTSSubGroups(group string) []string { +func OSTSubGroups(group string) []string { h := util.PathHierarchy(group) if len(h)%2 != 1 { panic(fmt.Errorf("wrong group path %q", group)) @@ -68,7 +68,7 @@ func LTSSubGroups(group string) []string { return sg } -func LTSSubGroupTypes(group string) []string { +func OSTSubGroupTypes(group string) []string { h := util.PathHierarchy(group) if len(h)%2 != 1 { panic(fmt.Errorf("wrong group path %q", group)) @@ -85,16 +85,16 @@ func LTSSubGroupTypes(group string) []string { return sg } -func LTSRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string { +func OSTRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string { paths := []string{} - subGroups := LTSSubGroups(group) + subGroups := OSTSubGroups(group) for _, subGroup := range subGroups { paths = append(paths, common.StorageRunCounterFile(subGroup)) } return paths } -func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) { +func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) { // use the first group dir after the root pl := util.PathList(group) if len(pl) < 2 { @@ -116,36 +116,36 @@ func LTSUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa return action, nil } -func LTSRunTaskLogsDir(rtID string) string { +func OSTRunTaskLogsDir(rtID string) string { return path.Join("logs", rtID) } -func LTSRunTaskSetupLogPath(rtID string) string { - return path.Join(LTSRunTaskLogsDir(rtID), "setup.log") +func OSTRunTaskSetupLogPath(rtID string) string { + return path.Join(OSTRunTaskLogsDir(rtID), "setup.log") } -func LTSRunTaskStepLogPath(rtID string, step int) string { - return path.Join(LTSRunTaskLogsDir(rtID), "steps", fmt.Sprintf("%d.log", step)) +func OSTRunTaskStepLogPath(rtID string, step int) string { + return path.Join(OSTRunTaskLogsDir(rtID), "steps", fmt.Sprintf("%d.log", step)) } -func LTSRunArchivePath(rtID string, step int) string { +func OSTRunArchivePath(rtID string, step int) string { return path.Join("workspacearchives", fmt.Sprintf("%s/%d.tar", rtID, step)) } -func LTSCacheDir() string { +func OSTCacheDir() string { return "caches" } -func LTSCachePath(key string) string { - return path.Join(LTSCacheDir(), fmt.Sprintf("%s.tar", key)) +func OSTCachePath(key string) string { + return path.Join(OSTCacheDir(), fmt.Sprintf("%s.tar", key)) } -func LTSCacheKey(p string) string { +func OSTCacheKey(p string) string { base := path.Base(p) return strings.TrimSuffix(base, path.Ext(base)) } -func LTSGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, error) { +func OSTGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, error) { runConfigPath := common.StorageRunConfigFile(runConfigID) rcf, _, err := wal.ReadObject(runConfigPath, nil) if err != nil { @@ -161,7 +161,7 @@ func LTSGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, return rc, nil } -func LTSSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) { +func OSTSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) { rcj, err := json.Marshal(rc) if err != nil { return nil, err @@ -177,7 +177,7 @@ func LTSSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) { return action, nil } -func LTSGetRun(wal *wal.WalManager, runID string) (*types.Run, error) { +func OSTGetRun(wal *wal.WalManager, runID string) (*types.Run, error) { runPath := common.StorageRunFile(runID) rf, _, err := wal.ReadObject(runPath, nil) @@ -194,7 +194,7 @@ func LTSGetRun(wal *wal.WalManager, runID string) (*types.Run, error) { return r, nil } -func LTSSaveRunAction(r *types.Run) (*wal.Action, error) { +func OSTSaveRunAction(r *types.Run) (*wal.Action, error) { rj, err := json.Marshal(r) if err != nil { return nil, err @@ -501,13 +501,13 @@ func GetRuns(ctx context.Context, e *etcd.Store) ([]*types.Run, error) { return runs, nil } -func GetRunEtcdOrLTS(ctx context.Context, e *etcd.Store, wal *wal.WalManager, runID string) (*types.Run, error) { +func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, wal *wal.WalManager, runID string) (*types.Run, error) { r, _, err := GetRun(ctx, e, runID) if err != nil && err != etcd.ErrKeyNotFound { return nil, err } if r == nil { - r, err = LTSGetRun(wal, runID) + r, err = OSTGetRun(wal, runID) if err != nil && err != objectstorage.ErrNotExist { return nil, err } diff --git a/internal/wal/changes.go b/internal/wal/changes.go index 4bab62a..de31741 100644 --- a/internal/wal/changes.go +++ b/internal/wal/changes.go @@ -150,7 +150,7 @@ func (c *WalChanges) updatePathsOrdered() { func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error { walDataFilePath := w.storageWalDataFile(walData.WalDataFileID) - walDataFile, err := w.lts.ReadObject(walDataFilePath) + walDataFile, err := w.ost.ReadObject(walDataFilePath) if err != nil { return errors.Wrapf(err, "failed to read waldata %q", walDataFilePath) } diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 96d175a..3daab37 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -128,11 +128,11 @@ type WalHeader struct { type WalStatus string const ( - // WalStatusCommitted represent a wal written to the lts + // WalStatusCommitted represent a wal written to the objectstorage WalStatusCommitted WalStatus = "committed" - // WalStatusCommittedStorage represent the .committed marker file written to the lts + // WalStatusCommittedStorage represent the .committed marker file written to the objectstorage WalStatusCommittedStorage WalStatus = "committed_storage" - // WalStatusCheckpointed mean that all the wal actions have been executed on the lts + // WalStatusCheckpointed mean that all the wal actions have been executed on the objectstorage WalStatusCheckpointed WalStatus = "checkpointed" ) @@ -225,7 +225,7 @@ func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *Cha return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq) } - f, err := w.lts.ReadObject(w.toStorageDataPath(p)) + f, err := w.ost.ReadObject(w.toStorageDataPath(p)) return f, cgt, err } @@ -234,7 +234,7 @@ func (w *WalManager) changesList(paths []string, prefix, startWith string, recur for _, p := range paths { if !recursive && len(p) > len(prefix) { rel := strings.TrimPrefix(p, prefix) - skip := strings.Contains(rel, w.lts.Delimiter()) + skip := strings.Contains(rel, w.ost.Delimiter()) if skip { continue } @@ -266,7 +266,7 @@ func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-cha ci := 0 go func(objectCh chan<- objectstorage.ObjectInfo) { defer close(objectCh) - for object := range w.lts.List(prefix, startWith, recursive, doneCh) { + for object := range w.ost.List(prefix, startWith, recursive, doneCh) { if object.Err != nil { objectCh <- object return @@ -318,8 +318,8 @@ func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-cha return objectCh } -func (w *WalManager) HasLtsWal(walseq string) (bool, error) { - _, err := w.lts.Stat(w.storageWalStatusFile(walseq) + ".committed") +func (w *WalManager) HasOSTWal(walseq string) (bool, error) { + _, err := w.ost.Stat(w.storageWalStatusFile(walseq) + ".committed") if err == objectstorage.ErrNotExist { return false, nil } @@ -330,11 +330,11 @@ func (w *WalManager) HasLtsWal(walseq string) (bool, error) { } func (w *WalManager) ReadWal(walseq string) (io.ReadCloser, error) { - return w.lts.ReadObject(w.storageWalStatusFile(walseq) + ".committed") + return w.ost.ReadObject(w.storageWalStatusFile(walseq) + ".committed") } func (w *WalManager) ReadWalData(walFileID string) (io.ReadCloser, error) { - return w.lts.ReadObject(w.storageWalDataFile(walFileID)) + return w.ost.ReadObject(w.storageWalDataFile(walFileID)) } type WalFile struct { @@ -344,7 +344,7 @@ type WalFile struct { Checkpointed bool } -func (w *WalManager) ListLtsWals(start string) <-chan *WalFile { +func (w *WalManager) ListOSTWals(start string) <-chan *WalFile { walCh := make(chan *WalFile, 1) go func() { @@ -358,7 +358,7 @@ func (w *WalManager) ListLtsWals(start string) <-chan *WalFile { startPath = w.storageWalStatusFile(start) } - for object := range w.lts.List(path.Join(w.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { + for object := range w.ost.List(path.Join(w.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { if object.Err != nil { walCh <- &WalFile{ Err: object.Err, @@ -371,7 +371,7 @@ func (w *WalManager) ListLtsWals(start string) <-chan *WalFile { walSequence := strings.TrimSuffix(name, ext) // wal file refers to another wal, so return the current one if curWal.WalSequence != walSequence { - // if this happen something is wrong on the lts + // if this happen something is wrong on the objectstorage if !curWal.Committed && curWal.Checkpointed { walCh <- &WalFile{ Err: errors.Errorf("wal is checkpointed but not committed. this should never happen"), @@ -574,9 +574,9 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle // be committed // // TODO(sgotti) save inside the wal file also the previous committed wal to -// handle possible lts list operation eventual consistency gaps (list won't -// report a wal at seq X but a wal at X+n, if this kind of eventual consistency -// ever exists) +// handle possible objectstorage list operation eventual consistency gaps (list +// won't report a wal at seq X but a wal at X+n, if this kind of eventual +// consistency ever exists) func (w *WalManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) { return w.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil) } @@ -616,7 +616,7 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio return nil, err } } - if err := w.lts.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil { + if err := w.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil { return nil, err } w.log.Debugf("wrote wal file: %s", walDataFilePath) @@ -761,7 +761,7 @@ func (w *WalManager) sync(ctx context.Context) error { } walFileCommittedPath := walFilePath + ".committed" - if err := w.lts.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil { + if err := w.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil { return err } @@ -791,7 +791,7 @@ func (w *WalManager) sync(ctx context.Context) error { walFilePath := w.storageWalStatusFile(walData.WalSequence) w.log.Debugf("checkpointing committed wal to storage") walFileCheckpointedPath := walFilePath + ".checkpointed" - if err := w.lts.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil { + if err := w.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil { return err } } @@ -849,7 +849,7 @@ func (w *WalManager) checkpoint(ctx context.Context) error { walFilePath := w.storageWalDataFile(walData.WalDataFileID) w.log.Debugf("checkpointing wal: %q", walData.WalSequence) - walFile, err := w.lts.ReadObject(walFilePath) + walFile, err := w.ost.ReadObject(walFilePath) if err != nil { return err } @@ -896,13 +896,13 @@ func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error switch action.ActionType { case ActionTypePut: w.log.Debugf("writing file: %q", path) - if err := w.lts.WriteObject(path, bytes.NewReader(action.Data)); err != nil { + if err := w.ost.WriteObject(path, bytes.NewReader(action.Data)); err != nil { return err } case ActionTypeDelete: w.log.Debugf("deleting file: %q", path) - if err := w.lts.DeleteObject(path); err != nil && err != objectstorage.ErrNotExist { + if err := w.ost.DeleteObject(path); err != nil && err != objectstorage.ErrNotExist { return err } } @@ -1076,7 +1076,7 @@ func (w *WalManager) etcdPinger(ctx context.Context) error { func (w *WalManager) InitEtcd(ctx context.Context) error { writeWal := func(wal *WalFile) error { w.log.Infof("wal seq: %s", wal.WalSequence) - walFile, err := w.lts.ReadObject(w.storageWalStatusFile(wal.WalSequence) + ".committed") + walFile, err := w.ost.ReadObject(w.storageWalStatusFile(wal.WalSequence) + ".committed") if err != nil { return err } @@ -1141,14 +1141,14 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { // walsdata not found in etcd - // if there're some wals in the lts this means etcd has been reset. + // if there're some wals in the objectstorage this means etcd has been reset. // So take all the wals in committed or checkpointed state starting from the // first not checkpointed wal and put them in etcd lastCommittedStorageWalsRing := ring.New(100) lastCommittedStorageWalElem := lastCommittedStorageWalsRing lastCommittedStorageWalSequence := "" wroteWals := 0 - for wal := range w.ListLtsWals("") { + for wal := range w.ListOSTWals("") { w.log.Infof("wal: %s", wal) if wal.Err != nil { return wal.Err @@ -1228,7 +1228,7 @@ func NoOpDataToPath(dataType string, id string) string { type WalManagerConfig struct { BasePath string E *etcd.Store - Lts *objectstorage.ObjStorage + OST *objectstorage.ObjStorage EtcdWalsKeepNum int CheckpointFunc CheckpointFunc DataToPathFunc DataToPathFunc @@ -1238,7 +1238,7 @@ type WalManager struct { basePath string log *zap.SugaredLogger e *etcd.Store - lts *objectstorage.ObjStorage + ost *objectstorage.ObjStorage changes *WalChanges etcdWalsKeepNum int checkpointFunc CheckpointFunc @@ -1262,7 +1262,7 @@ func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConf basePath: conf.BasePath, log: logger.Sugar(), e: conf.E, - lts: conf.Lts, + ost: conf.OST, etcdWalsKeepNum: conf.EtcdWalsKeepNum, changes: NewWalChanges(), checkpointFunc: conf.CheckpointFunc, diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go index 7ded7d3..8eb5a09 100644 --- a/internal/wal/wal_test.go +++ b/internal/wal/wal_test.go @@ -75,9 +75,9 @@ func TestEtcdReset(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - ltsDir, err := ioutil.TempDir(dir, "lts") + ostDir, err := ioutil.TempDir(dir, "ost") - lts, err := objectstorage.NewPosixStorage(ltsDir) + ost, err := objectstorage.NewPosixStorage(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -85,7 +85,7 @@ func TestEtcdReset(t *testing.T) { walConfig := &WalManagerConfig{ BasePath: "basepath", E: tetcd.TestEtcd.Store, - Lts: objectstorage.NewObjStorage(lts, "/"), + OST: objectstorage.NewObjStorage(ost, "/"), EtcdWalsKeepNum: 10, } wal, err := NewWalManager(ctx, logger, walConfig) @@ -136,7 +136,7 @@ func TestEtcdReset(t *testing.T) { walConfig = &WalManagerConfig{ BasePath: "basepath", E: tetcd.TestEtcd.Store, - Lts: objectstorage.NewObjStorage(lts, "/"), + OST: objectstorage.NewObjStorage(ost, "/"), EtcdWalsKeepNum: 10, } wal, err = NewWalManager(ctx, logger, walConfig) @@ -178,16 +178,16 @@ func TestConcurrentUpdate(t *testing.T) { ctx := context.Background() - ltsDir, err := ioutil.TempDir(dir, "lts") + ostDir, err := ioutil.TempDir(dir, "ost") - lts, err := objectstorage.NewPosixStorage(ltsDir) + ost, err := objectstorage.NewPosixStorage(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } walConfig := &WalManagerConfig{ E: tetcd.TestEtcd.Store, - Lts: objectstorage.NewObjStorage(lts, "/"), + OST: objectstorage.NewObjStorage(ost, "/"), EtcdWalsKeepNum: 10, } wal, err := NewWalManager(ctx, logger, walConfig) @@ -258,9 +258,9 @@ func TestWalCleaner(t *testing.T) { ctx := context.Background() - ltsDir, err := ioutil.TempDir(dir, "lts") + ostDir, err := ioutil.TempDir(dir, "ost") - lts, err := objectstorage.NewPosixStorage(ltsDir) + ost, err := objectstorage.NewPosixStorage(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -268,7 +268,7 @@ func TestWalCleaner(t *testing.T) { walKeepNum := 10 walConfig := &WalManagerConfig{ E: tetcd.TestEtcd.Store, - Lts: objectstorage.NewObjStorage(lts, "/"), + OST: objectstorage.NewObjStorage(ost, "/"), EtcdWalsKeepNum: walKeepNum, } wal, err := NewWalManager(ctx, logger, walConfig)