runservice: add run cache cleaner
Removes old cache entries (defaults to 7 days)
This commit is contained in:
parent
4c19ea3e91
commit
d3f658c5ad
|
@ -67,6 +67,8 @@ type RunServiceScheduler struct {
|
||||||
Web Web `yaml:"web"`
|
Web Web `yaml:"web"`
|
||||||
Etcd Etcd `yaml:"etcd"`
|
Etcd Etcd `yaml:"etcd"`
|
||||||
LTS LTS `yaml:"lts"`
|
LTS LTS `yaml:"lts"`
|
||||||
|
|
||||||
|
RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type RunServiceExecutor struct {
|
type RunServiceExecutor struct {
|
||||||
|
@ -173,6 +175,9 @@ var defaultConfig = Config{
|
||||||
Duration: 12 * time.Hour,
|
Duration: 12 * time.Hour,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
RunServiceScheduler: RunServiceScheduler{
|
||||||
|
RunCacheExpireInterval: 7 * 24 * time.Hour,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
func Parse(configFile string) (*Config, error) {
|
func Parse(configFile string) (*Config, error) {
|
||||||
|
|
|
@ -52,6 +52,13 @@ import (
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
cacheCleanerInterval = 1 * 24 * time.Hour
|
||||||
|
|
||||||
|
cacheCleanerLock = "locks/cachecleaner"
|
||||||
|
taskUpdaterLock = "locks/taskupdater"
|
||||||
|
)
|
||||||
|
|
||||||
var level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
|
var level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
|
||||||
var logger = slog.New(level)
|
var logger = slog.New(level)
|
||||||
var log = logger.Sugar()
|
var log = logger.Sugar()
|
||||||
|
@ -770,8 +777,11 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
defer session.Close()
|
defer session.Close()
|
||||||
|
|
||||||
m := concurrency.NewMutex(session, "taskupdaterlock")
|
m := concurrency.NewMutex(session, taskUpdaterLock)
|
||||||
|
|
||||||
|
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||||
|
// locked. Currently multiple task updaters will enqueue and start when another
|
||||||
|
// finishes (unuseful and consume resources)
|
||||||
if err := m.Lock(ctx); err != nil {
|
if err := m.Lock(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1411,6 +1421,59 @@ func (s *Scheduler) dumpLTSCleaner(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) {
|
||||||
|
for {
|
||||||
|
if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil {
|
||||||
|
log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(cacheCleanerInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error {
|
||||||
|
log.Debugf("cacheCleaner")
|
||||||
|
|
||||||
|
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
m := concurrency.NewMutex(session, cacheCleanerLock)
|
||||||
|
|
||||||
|
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||||
|
// locked. Currently multiple cachecleaners will enqueue and start when another
|
||||||
|
// finishes (unuseful and consume resources)
|
||||||
|
if err := m.Lock(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer m.Unlock(ctx)
|
||||||
|
|
||||||
|
doneCh := make(chan struct{})
|
||||||
|
defer close(doneCh)
|
||||||
|
for object := range s.lts.List(store.LTSCacheDir()+"/", "", true, doneCh) {
|
||||||
|
if object.Err != nil {
|
||||||
|
return object.Err
|
||||||
|
}
|
||||||
|
if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) {
|
||||||
|
if err := s.lts.DeleteObject(object.Path); err != nil {
|
||||||
|
if err != objectstorage.ErrNotExist {
|
||||||
|
log.Warnf("failed to delete cache object %q: %v", object.Path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
c *config.RunServiceScheduler
|
c *config.RunServiceScheduler
|
||||||
e *etcd.Store
|
e *etcd.Store
|
||||||
|
@ -1571,6 +1634,7 @@ func (s *Scheduler) Run(ctx context.Context) error {
|
||||||
go s.dumpLTSLoop(ctx)
|
go s.dumpLTSLoop(ctx)
|
||||||
go s.dumpLTSCleanerLoop(ctx)
|
go s.dumpLTSCleanerLoop(ctx)
|
||||||
go s.compactChangeGroupsLoop(ctx)
|
go s.compactChangeGroupsLoop(ctx)
|
||||||
|
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
|
||||||
|
|
||||||
go s.executorTaskUpdateHandler(ctx, ch)
|
go s.executorTaskUpdateHandler(ctx, ch)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue