diff --git a/internal/services/config/config.go b/internal/services/config/config.go index 6b6ef82..7777ed8 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -67,6 +67,8 @@ type RunServiceScheduler struct { Web Web `yaml:"web"` Etcd Etcd `yaml:"etcd"` LTS LTS `yaml:"lts"` + + RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` } type RunServiceExecutor struct { @@ -173,6 +175,9 @@ var defaultConfig = Config{ Duration: 12 * time.Hour, }, }, + RunServiceScheduler: RunServiceScheduler{ + RunCacheExpireInterval: 7 * 24 * time.Hour, + }, } func Parse(configFile string) (*Config, error) { diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 529dff7..76c67b4 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -52,6 +52,13 @@ import ( "go.uber.org/zap/zapcore" ) +const ( + cacheCleanerInterval = 1 * 24 * time.Hour + + cacheCleanerLock = "locks/cachecleaner" + taskUpdaterLock = "locks/taskupdater" +) + var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) var logger = slog.New(level) var log = logger.Sugar() @@ -770,8 +777,11 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, "taskupdaterlock") + m := concurrency.NewMutex(session, taskUpdaterLock) + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) if err := m.Lock(ctx); err != nil { return err } @@ -1411,6 +1421,59 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error { return nil } +func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) { + for { + if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil { + log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(cacheCleanerInterval) + } +} + +func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error { + log.Debugf("cacheCleaner") + + session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, cacheCleanerLock) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple cachecleaners will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer m.Unlock(ctx) + + doneCh := make(chan struct{}) + defer close(doneCh) + for object := range s.lts.List(store.LTSCacheDir()+"/", "", true, doneCh) { + if object.Err != nil { + return object.Err + } + if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) { + if err := s.lts.DeleteObject(object.Path); err != nil { + if err != objectstorage.ErrNotExist { + log.Warnf("failed to delete cache object %q: %v", object.Path, err) + } + } + } + } + + return nil +} + type Scheduler struct { c *config.RunServiceScheduler e *etcd.Store @@ -1571,6 +1634,7 @@ func (s *Scheduler) Run(ctx context.Context) error { go s.dumpLTSLoop(ctx) go s.dumpLTSCleanerLoop(ctx) go s.compactChangeGroupsLoop(ctx) + go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) go s.executorTaskUpdateHandler(ctx, ch)