runservice: add run workspace cleaner
Removes old workspace files (defaults to 7 days)
This commit is contained in:
parent
ab06a2283c
commit
7d375e4c4e
@ -92,7 +92,8 @@ type Runservice struct {
|
||||
Etcd Etcd `yaml:"etcd"`
|
||||
ObjectStorage ObjectStorage `yaml:"objectStorage"`
|
||||
|
||||
RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
|
||||
RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
|
||||
RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"`
|
||||
}
|
||||
|
||||
type Executor struct {
|
||||
@ -222,7 +223,8 @@ var defaultConfig = Config{
|
||||
},
|
||||
},
|
||||
Runservice: Runservice{
|
||||
RunCacheExpireInterval: 7 * 24 * time.Hour,
|
||||
RunCacheExpireInterval: 7 * 24 * time.Hour,
|
||||
RunWorkspaceExpireInterval: 7 * 24 * time.Hour,
|
||||
},
|
||||
Executor: Executor{
|
||||
ActiveTasksLimit: 2,
|
||||
|
@ -52,6 +52,7 @@ var (
|
||||
|
||||
EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock")
|
||||
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
|
||||
EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner")
|
||||
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")
|
||||
|
||||
EtcdMaintenanceKey = "maintenance"
|
||||
|
@ -378,6 +378,7 @@ func (s *Runservice) run(ctx context.Context) error {
|
||||
util.GoWait(&wg, func() { s.finishedRunsArchiverLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) })
|
||||
util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) })
|
||||
util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) })
|
||||
util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) })
|
||||
}
|
||||
|
@ -42,7 +42,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
cacheCleanerInterval = 1 * 24 * time.Hour
|
||||
cacheCleanerInterval = 1 * 24 * time.Hour
|
||||
workspaceCleanerInterval = 1 * 24 * time.Hour
|
||||
|
||||
defaultExecutorNotAliveInterval = 60 * time.Second
|
||||
)
|
||||
@ -1432,3 +1433,55 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) {
|
||||
for {
|
||||
if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil {
|
||||
log.Errorf("err: %+v", err)
|
||||
}
|
||||
|
||||
sleepCh := time.NewTimer(workspaceCleanerInterval).C
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-sleepCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error {
|
||||
log.Debugf("workspaceCleaner")
|
||||
|
||||
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer session.Close()
|
||||
|
||||
m := concurrency.NewMutex(session, common.EtcdWorkspaceCleanerLockKey)
|
||||
|
||||
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||
// locked. Currently multiple workspacecleaners will enqueue and start when another
|
||||
// finishes (unuseful and consume resources)
|
||||
if err := m.Lock(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() { _ = m.Unlock(ctx) }()
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) {
|
||||
if object.Err != nil {
|
||||
return object.Err
|
||||
}
|
||||
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
|
||||
if err := s.ost.DeleteObject(object.Path); err != nil {
|
||||
if err != ostypes.ErrNotExist {
|
||||
log.Warnf("failed to delete workspace object %q: %v", object.Path, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -135,8 +135,12 @@ func OSTRunTaskLogsRunPath(rtID, runID string) string {
|
||||
return path.Join(OSTRunTaskLogsRunsDir(rtID), runID)
|
||||
}
|
||||
|
||||
func OSTArchivesBaseDir() string {
|
||||
return "workspacearchives"
|
||||
}
|
||||
|
||||
func OSTRunTaskArchivesBaseDir(rtID string) string {
|
||||
return path.Join("workspacearchives", rtID)
|
||||
return path.Join(OSTArchivesBaseDir(), rtID)
|
||||
}
|
||||
|
||||
func OSTRunTaskArchivesDataDir(rtID string) string {
|
||||
@ -155,6 +159,18 @@ func OSTRunTaskArchivesRunPath(rtID, runID string) string {
|
||||
return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID)
|
||||
}
|
||||
|
||||
func OSTRunTaskIDFromPath(archivePath string) (string, error) {
|
||||
pl := util.PathList(archivePath)
|
||||
if len(pl) < 2 {
|
||||
return "", errors.Errorf("wrong archive path %q", archivePath)
|
||||
}
|
||||
fmt.Printf("pl: %q\n", pl)
|
||||
if pl[0] != "workspacearchives" {
|
||||
return "", errors.Errorf("wrong archive path %q", archivePath)
|
||||
}
|
||||
return pl[1], nil
|
||||
}
|
||||
|
||||
func OSTCacheDir() string {
|
||||
return "caches"
|
||||
}
|
||||
|
81
internal/services/runservice/store/store_test.go
Normal file
81
internal/services/runservice/store/store_test.go
Normal file
@ -0,0 +1,81 @@
|
||||
// Copyright 2019 Sorint.lab
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestOSTRunTaskIDFromArchivePath(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
archivePath string
|
||||
out string
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "test no runs 1",
|
||||
archivePath: "aaaa",
|
||||
err: fmt.Errorf("wrong archive path %q", "aaaa"),
|
||||
},
|
||||
{
|
||||
name: "test no runs 1",
|
||||
archivePath: "workspacearchives",
|
||||
err: fmt.Errorf("wrong archive path %q", "workspacearchives"),
|
||||
},
|
||||
{
|
||||
name: "test no runs 1",
|
||||
archivePath: "/workspacearchives/",
|
||||
err: fmt.Errorf("wrong archive path %q", "/workspacearchives/"),
|
||||
},
|
||||
{
|
||||
name: "test no runs 1",
|
||||
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
|
||||
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
|
||||
},
|
||||
{
|
||||
name: "test no runs 1",
|
||||
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
|
||||
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
|
||||
},
|
||||
{
|
||||
name: "test no runs 1",
|
||||
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
|
||||
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
id, err := OSTRunTaskIDFromPath(tt.archivePath)
|
||||
if err != nil {
|
||||
if tt.err == nil {
|
||||
t.Fatalf("got error: %v, expected no error", err)
|
||||
}
|
||||
if err.Error() != tt.err.Error() {
|
||||
t.Fatalf("got error: %v, want error: %v", err, tt.err)
|
||||
}
|
||||
} else {
|
||||
if tt.err != nil {
|
||||
t.Fatalf("got nil error, want error: %v", tt.err)
|
||||
}
|
||||
if id != tt.out {
|
||||
t.Fatalf("got id: %q, want id: %q", id, tt.out)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user