Merge pull request #109 from sgotti/runservice_workspace_cleanup

runservice: add run workspace cleaner
This commit is contained in:
Simone Gotti 2019-09-17 11:04:51 +02:00 committed by GitHub
commit 947be9a742
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 158 additions and 4 deletions

View File

@ -92,7 +92,8 @@ type Runservice struct {
Etcd Etcd `yaml:"etcd"` Etcd Etcd `yaml:"etcd"`
ObjectStorage ObjectStorage `yaml:"objectStorage"` ObjectStorage ObjectStorage `yaml:"objectStorage"`
RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"`
} }
type Executor struct { type Executor struct {
@ -222,7 +223,8 @@ var defaultConfig = Config{
}, },
}, },
Runservice: Runservice{ Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour, RunCacheExpireInterval: 7 * 24 * time.Hour,
RunWorkspaceExpireInterval: 7 * 24 * time.Hour,
}, },
Executor: Executor{ Executor: Executor{
ActiveTasksLimit: 2, ActiveTasksLimit: 2,

View File

@ -52,6 +52,7 @@ var (
EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock") EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock")
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner") EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner")
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")
EtcdMaintenanceKey = "maintenance" EtcdMaintenanceKey = "maintenance"

View File

@ -378,6 +378,7 @@ func (s *Runservice) run(ctx context.Context) error {
util.GoWait(&wg, func() { s.finishedRunsArchiverLoop(ctx) }) util.GoWait(&wg, func() { s.finishedRunsArchiverLoop(ctx) })
util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) }) util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) })
util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) }) util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) })
util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) })
util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) }) util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) })
util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) }) util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) })
} }

View File

@ -42,7 +42,8 @@ import (
) )
const ( const (
cacheCleanerInterval = 1 * 24 * time.Hour cacheCleanerInterval = 1 * 24 * time.Hour
workspaceCleanerInterval = 1 * 24 * time.Hour
defaultExecutorNotAliveInterval = 60 * time.Second defaultExecutorNotAliveInterval = 60 * time.Second
) )
@ -1432,3 +1433,55 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.
return nil return nil
} }
func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) {
for {
if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil {
log.Errorf("err: %+v", err)
}
sleepCh := time.NewTimer(workspaceCleanerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error {
log.Debugf("workspaceCleaner")
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()
m := concurrency.NewMutex(session, common.EtcdWorkspaceCleanerLockKey)
// TODO(sgotti) find a way to use a trylock so we'll just return if already
// locked. Currently multiple workspacecleaners will enqueue and start when another
// finishes (unuseful and consume resources)
if err := m.Lock(ctx); err != nil {
return err
}
defer func() { _ = m.Unlock(ctx) }()
doneCh := make(chan struct{})
defer close(doneCh)
for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
if err := s.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
log.Warnf("failed to delete workspace object %q: %v", object.Path, err)
}
}
}
}
return nil
}

View File

@ -135,8 +135,12 @@ func OSTRunTaskLogsRunPath(rtID, runID string) string {
return path.Join(OSTRunTaskLogsRunsDir(rtID), runID) return path.Join(OSTRunTaskLogsRunsDir(rtID), runID)
} }
func OSTArchivesBaseDir() string {
return "workspacearchives"
}
func OSTRunTaskArchivesBaseDir(rtID string) string { func OSTRunTaskArchivesBaseDir(rtID string) string {
return path.Join("workspacearchives", rtID) return path.Join(OSTArchivesBaseDir(), rtID)
} }
func OSTRunTaskArchivesDataDir(rtID string) string { func OSTRunTaskArchivesDataDir(rtID string) string {
@ -155,6 +159,18 @@ func OSTRunTaskArchivesRunPath(rtID, runID string) string {
return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID) return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID)
} }
func OSTRunTaskIDFromPath(archivePath string) (string, error) {
pl := util.PathList(archivePath)
if len(pl) < 2 {
return "", errors.Errorf("wrong archive path %q", archivePath)
}
fmt.Printf("pl: %q\n", pl)
if pl[0] != "workspacearchives" {
return "", errors.Errorf("wrong archive path %q", archivePath)
}
return pl[1], nil
}
func OSTCacheDir() string { func OSTCacheDir() string {
return "caches" return "caches"
} }

View File

@ -0,0 +1,81 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package store
import (
"fmt"
"testing"
)
func TestOSTRunTaskIDFromArchivePath(t *testing.T) {
tests := []struct {
name string
archivePath string
out string
err error
}{
{
name: "test no runs 1",
archivePath: "aaaa",
err: fmt.Errorf("wrong archive path %q", "aaaa"),
},
{
name: "test no runs 1",
archivePath: "workspacearchives",
err: fmt.Errorf("wrong archive path %q", "workspacearchives"),
},
{
name: "test no runs 1",
archivePath: "/workspacearchives/",
err: fmt.Errorf("wrong archive path %q", "/workspacearchives/"),
},
{
name: "test no runs 1",
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
},
{
name: "test no runs 1",
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
},
{
name: "test no runs 1",
archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar",
out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
id, err := OSTRunTaskIDFromPath(tt.archivePath)
if err != nil {
if tt.err == nil {
t.Fatalf("got error: %v, expected no error", err)
}
if err.Error() != tt.err.Error() {
t.Fatalf("got error: %v, want error: %v", err, tt.err)
}
} else {
if tt.err != nil {
t.Fatalf("got nil error, want error: %v", tt.err)
}
if id != tt.out {
t.Fatalf("got id: %q, want id: %q", id, tt.out)
}
}
})
}
}