diff --git a/internal/services/config/config.go b/internal/services/config/config.go index 91c3fe5..1b3b5e1 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -92,7 +92,8 @@ type Runservice struct { Etcd Etcd `yaml:"etcd"` ObjectStorage ObjectStorage `yaml:"objectStorage"` - RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` + RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` + RunWorkspaceExpireInterval time.Duration `yaml:"runWorkspaceExpireInterval"` } type Executor struct { @@ -222,7 +223,8 @@ var defaultConfig = Config{ }, }, Runservice: Runservice{ - RunCacheExpireInterval: 7 * 24 * time.Hour, + RunCacheExpireInterval: 7 * 24 * time.Hour, + RunWorkspaceExpireInterval: 7 * 24 * time.Hour, }, Executor: Executor{ ActiveTasksLimit: 2, diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index de966bc..a94cf15 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -52,6 +52,7 @@ var ( EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock") EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner") + EtcdWorkspaceCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "workspacecleaner") EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") EtcdMaintenanceKey = "maintenance" diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index a10d976..231e498 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -378,6 +378,7 @@ func (s *Runservice) run(ctx context.Context) error { util.GoWait(&wg, func() { s.finishedRunsArchiverLoop(ctx) }) util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) }) util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) }) + util.GoWait(&wg, func() { s.workspaceCleanerLoop(ctx, s.c.RunWorkspaceExpireInterval) }) util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) }) util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) }) } diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index e228075..6e0e1ea 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -42,7 +42,8 @@ import ( ) const ( - cacheCleanerInterval = 1 * 24 * time.Hour + cacheCleanerInterval = 1 * 24 * time.Hour + workspaceCleanerInterval = 1 * 24 * time.Hour defaultExecutorNotAliveInterval = 60 * time.Second ) @@ -1432,3 +1433,55 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. return nil } + +func (s *Runservice) workspaceCleanerLoop(ctx context.Context, workspaceExpireInterval time.Duration) { + for { + if err := s.workspaceCleaner(ctx, workspaceExpireInterval); err != nil { + log.Errorf("err: %+v", err) + } + + sleepCh := time.NewTimer(workspaceCleanerInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterval time.Duration) error { + log.Debugf("workspaceCleaner") + + session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, common.EtcdWorkspaceCleanerLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple workspacecleaners will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer func() { _ = m.Unlock(ctx) }() + + doneCh := make(chan struct{}) + defer close(doneCh) + for object := range s.ost.List(store.OSTArchivesBaseDir()+"/", "", true, doneCh) { + if object.Err != nil { + return object.Err + } + if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) { + if err := s.ost.DeleteObject(object.Path); err != nil { + if err != ostypes.ErrNotExist { + log.Warnf("failed to delete workspace object %q: %v", object.Path, err) + } + } + } + } + + return nil +} diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 81b213c..a54bcf1 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -135,8 +135,12 @@ func OSTRunTaskLogsRunPath(rtID, runID string) string { return path.Join(OSTRunTaskLogsRunsDir(rtID), runID) } +func OSTArchivesBaseDir() string { + return "workspacearchives" +} + func OSTRunTaskArchivesBaseDir(rtID string) string { - return path.Join("workspacearchives", rtID) + return path.Join(OSTArchivesBaseDir(), rtID) } func OSTRunTaskArchivesDataDir(rtID string) string { @@ -155,6 +159,18 @@ func OSTRunTaskArchivesRunPath(rtID, runID string) string { return path.Join(OSTRunTaskArchivesRunsDir(rtID), runID) } +func OSTRunTaskIDFromPath(archivePath string) (string, error) { + pl := util.PathList(archivePath) + if len(pl) < 2 { + return "", errors.Errorf("wrong archive path %q", archivePath) + } + fmt.Printf("pl: %q\n", pl) + if pl[0] != "workspacearchives" { + return "", errors.Errorf("wrong archive path %q", archivePath) + } + return pl[1], nil +} + func OSTCacheDir() string { return "caches" } diff --git a/internal/services/runservice/store/store_test.go b/internal/services/runservice/store/store_test.go new file mode 100644 index 0000000..33fbf4e --- /dev/null +++ b/internal/services/runservice/store/store_test.go @@ -0,0 +1,81 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package store + +import ( + "fmt" + "testing" +) + +func TestOSTRunTaskIDFromArchivePath(t *testing.T) { + tests := []struct { + name string + archivePath string + out string + err error + }{ + { + name: "test no runs 1", + archivePath: "aaaa", + err: fmt.Errorf("wrong archive path %q", "aaaa"), + }, + { + name: "test no runs 1", + archivePath: "workspacearchives", + err: fmt.Errorf("wrong archive path %q", "workspacearchives"), + }, + { + name: "test no runs 1", + archivePath: "/workspacearchives/", + err: fmt.Errorf("wrong archive path %q", "/workspacearchives/"), + }, + { + name: "test no runs 1", + archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar", + out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8", + }, + { + name: "test no runs 1", + archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar", + out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8", + }, + { + name: "test no runs 1", + archivePath: "workspacearchives/2502c5c7-0fd9-432b-918e-3ccf31a664f8/data/3.tar", + out: "2502c5c7-0fd9-432b-918e-3ccf31a664f8", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + id, err := OSTRunTaskIDFromPath(tt.archivePath) + if err != nil { + if tt.err == nil { + t.Fatalf("got error: %v, expected no error", err) + } + if err.Error() != tt.err.Error() { + t.Fatalf("got error: %v, want error: %v", err, tt.err) + } + } else { + if tt.err != nil { + t.Fatalf("got nil error, want error: %v", tt.err) + } + if id != tt.out { + t.Fatalf("got id: %q, want id: %q", id, tt.out) + } + } + }) + } +}