From 35e1ec0e15add8fe5b532892865e98ae11ba8020 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 8 Nov 2019 10:10:56 +0100 Subject: [PATCH 1/2] datamanager: remove old storage wals Remove all wals not required by the existing data status files and not existing in etcd. --- internal/datamanager/data.go | 55 ++++++++ internal/datamanager/datamanager.go | 29 +++- internal/datamanager/datamanager_test.go | 168 ++++++++++++++++++++++- internal/datamanager/wal.go | 134 ++++++++++++++++-- 4 files changed, 365 insertions(+), 21 deletions(-) diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index baf5c2f..b510eed 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -529,6 +529,43 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { return bytes.NewReader(de.Data), nil } +func (d *DataManager) GetFirstDataStatusSequences(n int) ([]*sequence.Sequence, error) { + if n < 1 { + return nil, errors.Errorf("n must be greater than 0") + } + + dataStatusSequences := []*sequence.Sequence{} + c := 0 + + doneCh := make(chan struct{}) + defer close(doneCh) + for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) { + if object.Err != nil { + return nil, object.Err + } + if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil { + seq, err := sequence.Parse(m[1]) + if err != nil { + d.log.Warnf("cannot parse sequence for data status file %q", object.Path) + continue + } + dataStatusSequences = append(dataStatusSequences, seq) + c++ + } else { + d.log.Warnf("bad file %q found in storage data dir", object.Path) + } + if c >= n { + break + } + } + + if len(dataStatusSequences) == 0 { + return nil, ostypes.ErrNotExist + } + + return dataStatusSequences, nil +} + func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error) { if n < 1 { return nil, errors.Errorf("n must be greater than 0") @@ -582,6 +619,15 @@ func (d *DataManager) GetDataStatus(dataSequence *sequence.Sequence) (*DataStatu return dataStatus, dec.Decode(&dataStatus) } +func (d *DataManager) GetFirstDataStatusSequence() (*sequence.Sequence, error) { + dataStatusSequences, err := d.GetFirstDataStatusSequences(1) + if err != nil { + return nil, err + } + + return dataStatusSequences[0], nil +} + func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) { dataStatusSequences, err := d.GetLastDataStatusSequences(1) if err != nil { @@ -591,6 +637,15 @@ func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) { return dataStatusSequences[0], nil } +func (d *DataManager) GetFirstDataStatus() (*DataStatus, error) { + dataStatusSequence, err := d.GetFirstDataStatusSequence() + if err != nil { + return nil, err + } + + return d.GetDataStatus(dataStatusSequence) +} + func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { dataStatusSequence, err := d.GetLastDataStatusSequence() if err != nil { diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index 911ae2e..2ff994d 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -34,10 +34,15 @@ import ( // * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one const ( - DefaultCheckpointInterval = 10 * time.Second - DefaultCheckpointCleanInterval = 5 * time.Minute - DefaultEtcdWalsKeepNum = 100 - DefaultMinCheckpointWalsNum = 100 + DefaultSyncInterval = 5 * time.Second + DefaultCheckpointInterval = 10 * time.Second + DefaultCheckpointCleanInterval = 5 * time.Minute + DefaultEtcdWalCleanInterval = 2 * time.Second + DefaultStorageWalCleanInterval = 5 * time.Minute + DefaultCompactChangeGroupsInterval = 1 * time.Second + DefaultEtcdPingerInterval = 1 * time.Second + DefaultEtcdWalsKeepNum = 100 + DefaultMinCheckpointWalsNum = 100 ) var ( @@ -66,6 +71,7 @@ var ( etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") + etcdStorageWalCleanerLockKey = path.Join(etcdWalBaseDir, "storagewalcleanerlock") etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") @@ -154,12 +160,20 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo return d, nil } +func (d *DataManager) storageWalStatusDir() string { + return path.Join(d.basePath, storageWalsStatusDir) +} + func (d *DataManager) storageWalStatusFile(walSeq string) string { - return path.Join(d.basePath, storageWalsStatusDir, walSeq) + return path.Join(d.storageWalStatusDir(), walSeq) +} + +func (d *DataManager) storageWalDataDir() string { + return path.Join(d.basePath, storageWalsDataDir) } func (d *DataManager) storageWalDataFile(walFileID string) string { - return path.Join(d.basePath, storageWalsDataDir, walFileID) + return path.Join(d.storageWalDataDir(), walFileID) } func (d *DataManager) storageDataDir() string { @@ -239,7 +253,8 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { go d.syncLoop(ctx) go d.checkpointLoop(ctx) go d.checkpointCleanLoop(ctx) - go d.walCleanerLoop(ctx) + go d.etcdWalCleanerLoop(ctx) + go d.storageWalCleanerLoop(ctx) go d.compactChangeGroupsLoop(ctx) go d.etcdPingerLoop(ctx) diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index a639638..703f998 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -394,7 +394,7 @@ func TestConcurrentUpdate(t *testing.T) { } } -func TestWalCleaner(t *testing.T) { +func TestEtcdWalCleaner(t *testing.T) { dir, err := ioutil.TempDir("", "agola") if err != nil { t.Fatalf("unexpected err: %v", err) @@ -455,7 +455,7 @@ func TestWalCleaner(t *testing.T) { if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } - if err := dm.walCleaner(ctx); err != nil { + if err := dm.etcdWalCleaner(ctx); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -573,7 +573,7 @@ func TestReadObject(t *testing.T) { if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } - if err := dm.walCleaner(ctx); err != nil { + if err := dm.etcdWalCleaner(ctx); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1316,6 +1316,168 @@ func testCleanConcurrentCheckpoint(t *testing.T, basePath string) { } } +func TestStorageWalCleaner(t *testing.T) { + tests := []struct { + name string + basePath string + }{ + { + name: "test with empty basepath", + basePath: "", + }, + { + name: "test with relative basepath", + basePath: "base/path", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testStorageWalCleaner(t, tt.basePath) + }) + } +} + +func testStorageWalCleaner(t *testing.T, basePath string) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + BasePath: basePath, + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh := make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + time.Sleep(5 * time.Second) + + contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + var currentEntries map[string]*DataEntry + actions := []*Action{} + for n := 0; n < 10; n++ { + for i := 0; i < 400; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + + currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // get the last data status sequence + lastDataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // Use the first dataStatusToKeep data status + dataStatus, err := dm.GetDataStatus(lastDataStatusSequences[dataStatusToKeep-1]) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + // get the list of expected wals + doneCh := make(chan struct{}) + defer close(doneCh) + + expectedWalStatusFiles := []string{} + expectedWalDataFiles := []string{} + for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) { + if object.Err != nil { + t.Fatalf("unexpected err: %v", err) + } + + name := path.Base(object.Path) + ext := path.Ext(name) + walSequence := strings.TrimSuffix(name, ext) + + if walSequence < dataStatus.WalSequence { + continue + } + header, err := dm.ReadWal(walSequence) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + expectedWalStatusFiles = append(expectedWalStatusFiles, object.Path) + expectedWalDataFiles = append(expectedWalDataFiles, dm.storageWalDataFile(header.WalDataFileID)) + } + sort.Strings(expectedWalDataFiles) + + if err := dm.CleanOldCheckpoints(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := dm.storageWalCleaner(ctx); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + currentWalStatusFiles := []string{} + currentWalDataFiles := []string{} + for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) { + if object.Err != nil { + t.Fatalf("unexpected err: %v", err) + } + + currentWalStatusFiles = append(currentWalStatusFiles, object.Path) + } + for object := range dm.ost.List(dm.storageWalDataDir()+"/", "", true, doneCh) { + if object.Err != nil { + t.Fatalf("unexpected err: %v", err) + } + + currentWalDataFiles = append(currentWalDataFiles, object.Path) + } + sort.Strings(currentWalDataFiles) + if diff := cmp.Diff(currentWalStatusFiles, expectedWalStatusFiles); diff != "" { + t.Fatalf("different wal status files: %v", diff) + } + if diff := cmp.Diff(currentWalDataFiles, expectedWalDataFiles); diff != "" { + t.Fatalf("different wal data files: %v", diff) + } +} + func TestExportImport(t *testing.T) { dir, err := ioutil.TempDir("", "agola") if err != nil { diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index b81a4ec..4aaabdc 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -179,7 +179,7 @@ func (d *DataManager) ListOSTWals(start string) <-chan *WalFile { startPath = d.storageWalStatusFile(start) } - for object := range d.ost.List(path.Join(d.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { + for object := range d.ost.List(d.storageWalStatusDir()+"/", startPath, true, doneCh) { if object.Err != nil { walCh <- &WalFile{ Err: object.Err, @@ -547,7 +547,7 @@ func (d *DataManager) syncLoop(ctx context.Context) { d.log.Errorf("syncer error: %+v", err) } - sleepCh := time.NewTimer(5 * time.Second).C + sleepCh := time.NewTimer(DefaultSyncInterval).C select { case <-ctx.Done(): return @@ -751,14 +751,14 @@ func (d *DataManager) checkpointClean(ctx context.Context) error { return nil } -func (d *DataManager) walCleanerLoop(ctx context.Context) { +func (d *DataManager) etcdWalCleanerLoop(ctx context.Context) { for { - d.log.Debugf("walcleaner") - if err := d.walCleaner(ctx); err != nil { - d.log.Errorf("walcleaner error: %v", err) + d.log.Debugf("etcdwalcleaner") + if err := d.etcdWalCleaner(ctx); err != nil { + d.log.Errorf("etcdwalcleaner error: %v", err) } - sleepCh := time.NewTimer(2 * time.Second).C + sleepCh := time.NewTimer(DefaultEtcdWalCleanInterval).C select { case <-ctx.Done(): return @@ -767,10 +767,10 @@ func (d *DataManager) walCleanerLoop(ctx context.Context) { } } -// walCleaner will clean already checkpointed wals from etcd +// etcdWalCleaner will clean already checkpointed wals from etcd // it must always keep at least one wal that is needed for resync operations // from clients -func (d *DataManager) walCleaner(ctx context.Context) error { +func (d *DataManager) etcdWalCleaner(ctx context.Context) error { session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err @@ -826,13 +826,125 @@ func (d *DataManager) walCleaner(ctx context.Context) error { return nil } +func (d *DataManager) storageWalCleanerLoop(ctx context.Context) { + for { + d.log.Debugf("storagewalcleaner") + if err := d.storageWalCleaner(ctx); err != nil { + d.log.Errorf("storagewalcleaner error: %v", err) + } + + sleepCh := time.NewTimer(DefaultStorageWalCleanInterval).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +// storageWalCleaner will clean unneeded wals from the storage +func (d *DataManager) storageWalCleaner(ctx context.Context) error { + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdStorageWalCleanerLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer func() { _ = m.Unlock(ctx) }() + + firstDataStatus, err := d.GetFirstDataStatus() + if err != nil { + return err + } + firstWalSequence := firstDataStatus.WalSequence + + // get the first wal in etcd (in any state) and use it's wal sequence if + // it's lesser than the first data status wal sequence + resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) + if err != nil { + return err + } + if len(resp.Kvs) == 0 { + return errors.Errorf("no wals in etcd") + } + var walData WalData + if err := json.Unmarshal(resp.Kvs[0].Value, &walData); err != nil { + return err + } + if walData.WalSequence < firstWalSequence { + firstWalSequence = walData.WalSequence + } + + doneCh := make(chan struct{}) + defer close(doneCh) + + for object := range d.ost.List(d.storageWalStatusDir()+"/", "", true, doneCh) { + if object.Err != nil { + return err + } + name := path.Base(object.Path) + ext := path.Ext(name) + walSequence := strings.TrimSuffix(name, ext) + + // handle committed status file and related data file + if ext == ".committed" { + if walSequence >= firstWalSequence { + break + } + + header, err := d.ReadWal(walSequence) + if err != nil { + return err + } + + // first remove wal data file + walStatusFilePath := d.storageWalDataFile(header.WalDataFileID) + d.log.Infof("removing %q", walStatusFilePath) + if err := d.ost.DeleteObject(walStatusFilePath); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + + // then remove wal status files + d.log.Infof("removing %q", object.Path) + if err := d.ost.DeleteObject(object.Path); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + } + + // handle old checkpointed status file + // TODO(sgotti) remove this in future versions since .checkpointed files are not created anymore + if ext == ".checkpointed" { + d.log.Infof("removing %q", object.Path) + if err := d.ost.DeleteObject(object.Path); err != nil { + if err != ostypes.ErrNotExist { + return err + } + } + } + } + + return nil +} + func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) { for { if err := d.compactChangeGroups(ctx); err != nil { d.log.Errorf("err: %+v", err) } - sleepCh := time.NewTimer(1 * time.Second).C + sleepCh := time.NewTimer(DefaultCompactChangeGroupsInterval).C select { case <-ctx.Done(): return @@ -917,7 +1029,7 @@ func (d *DataManager) etcdPingerLoop(ctx context.Context) { d.log.Errorf("err: %+v", err) } - sleepCh := time.NewTimer(1 * time.Second).C + sleepCh := time.NewTimer(DefaultEtcdPingerInterval).C select { case <-ctx.Done(): return From 5af07d085280293a84adea2fb75290312d0b047f Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 8 Nov 2019 16:25:53 +0100 Subject: [PATCH 2/2] objectstorage: use a single package remove all the subpackages and just use a single package --- internal/common/common.go | 6 +-- internal/datamanager/data.go | 20 ++++----- internal/datamanager/datamanager_test.go | 34 ++++++++------- internal/datamanager/wal.go | 12 +++--- internal/objectstorage/{common => }/atomic.go | 12 +++--- internal/objectstorage/objectstorage.go | 27 +++++++++--- internal/objectstorage/objectstorage_test.go | 20 ++++----- internal/objectstorage/{posix => }/posix.go | 35 ++++++++-------- .../objectstorage/{posix => }/posix_test.go | 6 +-- .../{posixflat => }/posixflat.go | 37 ++++++++--------- .../{posixflat => }/posixflat_test.go | 6 +-- internal/objectstorage/{s3 => }/s3.go | 28 ++++++------- internal/objectstorage/types/types.go | 41 ------------------- .../services/configstore/readdb/readdb.go | 5 +-- internal/services/runservice/api/api.go | 3 +- internal/services/runservice/api/executor.go | 9 ++-- internal/services/runservice/readdb/readdb.go | 5 +-- internal/services/runservice/scheduler.go | 8 ++-- internal/services/runservice/store/store.go | 4 +- 19 files changed, 137 insertions(+), 181 deletions(-) rename internal/objectstorage/{common => }/atomic.go (86%) rename internal/objectstorage/{posix => }/posix.go (80%) rename internal/objectstorage/{posix => }/posix_test.go (94%) rename internal/objectstorage/{posixflat => }/posixflat.go (88%) rename internal/objectstorage/{posixflat => }/posixflat_test.go (97%) rename internal/objectstorage/{s3 => }/s3.go (85%) delete mode 100644 internal/objectstorage/types/types.go diff --git a/internal/common/common.go b/internal/common/common.go index cb6a5d0..26f62cc 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -23,8 +23,6 @@ import ( "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - "agola.io/agola/internal/objectstorage/posix" - "agola.io/agola/internal/objectstorage/s3" "agola.io/agola/internal/services/config" "go.uber.org/zap" errors "golang.org/x/xerrors" @@ -83,7 +81,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error switch c.Type { case config.ObjectStorageTypePosix: - ost, err = posix.New(c.Path) + ost, err = objectstorage.NewPosix(c.Path) if err != nil { return nil, errors.Errorf("failed to create posix object storage: %w", err) } @@ -102,7 +100,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme) } } - ost, err = s3.New(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) + ost, err = objectstorage.NewS3(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) if err != nil { return nil, errors.Errorf("failed to create s3 object storage: %w", err) } diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index b510eed..b557e39 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -26,7 +26,7 @@ import ( "sort" "strings" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/sequence" uuid "github.com/satori/go.uuid" @@ -165,7 +165,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er } curDataStatus, err := d.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return err } @@ -321,10 +321,10 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s if actionGroup.DataStatusFile != nil { // TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed oldDataf, err := d.ost.ReadObject(d.DataFilePath(dataType, actionGroup.DataStatusFile.ID)) - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return nil, err } - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { dec := json.NewDecoder(oldDataf) for { var de *DataEntry @@ -481,7 +481,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { var matchingDataFileID string // get the matching data file for the action entry ID if len(curFiles[dataType]) == 0 { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } matchingDataFileID = curFiles[dataType][0].ID @@ -507,7 +507,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { pos, ok := dataFileIndex.Index[id] if !ok { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, matchingDataFileID)) @@ -560,7 +560,7 @@ func (d *DataManager) GetFirstDataStatusSequences(n int) ([]*sequence.Sequence, } if len(dataStatusSequences) == 0 { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } return dataStatusSequences, nil @@ -601,7 +601,7 @@ func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, e }) if len(dataStatusSequences) == 0 { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } return dataStatusSequences, nil @@ -862,7 +862,7 @@ func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequenc if _, ok := dataStatusPathsMap[object.Path]; !ok { d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -930,7 +930,7 @@ func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequenc if _, ok := files[pne]; !ok { d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 703f998..470a223 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -31,8 +31,6 @@ import ( slog "agola.io/agola/internal/log" "agola.io/agola/internal/objectstorage" - "agola.io/agola/internal/objectstorage/posix" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/testutil" "github.com/google/go-cmp/cmp" errors "golang.org/x/xerrors" @@ -85,7 +83,7 @@ func TestEtcdReset(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -195,7 +193,7 @@ func TestEtcdResetWalsGap(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -324,7 +322,7 @@ func TestConcurrentUpdate(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -415,7 +413,7 @@ func TestEtcdWalCleaner(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -488,7 +486,7 @@ func TestReadObject(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -560,8 +558,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != ostypes.ErrNotExist { - t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) + if err != objectstorage.ErrNotExist { + t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) @@ -584,8 +582,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != ostypes.ErrNotExist { - t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) + if err != objectstorage.ErrNotExist { + t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) @@ -781,7 +779,7 @@ func testCheckpoint(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -997,7 +995,7 @@ func TestRead(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1111,7 +1109,7 @@ func testClean(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1229,7 +1227,7 @@ func testCleanConcurrentCheckpoint(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1358,7 +1356,7 @@ func testStorageWalCleaner(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1498,7 +1496,7 @@ func TestExportImport(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1607,7 +1605,7 @@ func TestExportImport(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err = posix.New(ostDir) + ost, err = objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 4aaabdc..3f67086 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -26,7 +26,7 @@ import ( "time" "agola.io/agola/internal/etcd" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/sequence" uuid "github.com/satori/go.uuid" @@ -132,7 +132,7 @@ func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.Read func (d *DataManager) HasOSTWal(walseq string) (bool, error) { _, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed") - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return false, nil } if err != nil { @@ -909,7 +909,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error { walStatusFilePath := d.storageWalDataFile(header.WalDataFileID) d.log.Infof("removing %q", walStatusFilePath) if err := d.ost.DeleteObject(walStatusFilePath); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -917,7 +917,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error { // then remove wal status files d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -928,7 +928,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error { if ext == ".checkpointed" { d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -1149,7 +1149,7 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro firstWal = dataStatus.WalSequence } else { dataStatus, err = d.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return err } // set the first wal to import in etcd if there's a snapshot. In this way we'll diff --git a/internal/objectstorage/common/atomic.go b/internal/objectstorage/atomic.go similarity index 86% rename from internal/objectstorage/common/atomic.go rename to internal/objectstorage/atomic.go index 4b14bec..b0a5c32 100644 --- a/internal/objectstorage/common/atomic.go +++ b/internal/objectstorage/atomic.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package common +package objectstorage import ( "io" @@ -22,12 +22,12 @@ import ( "strings" ) -// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a +// writeFileAtomicFunc atomically writes a file, it achieves this by creating a // temporary file and then moving it. writeFunc is the func that will write // data to the file. // TODO(sgotti) remove left over tmp files if process crashes before calling // os.Remove -func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { +func writeFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { f, err := ioutil.TempFile(tmpDir, "tmpfile") if err != nil { return err @@ -75,10 +75,12 @@ func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bo return nil } -func WriteFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error { - return WriteFileAtomicFunc(filename, baseDir, tmpDir, perm, persist, +/* +func writeFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error { + return writeFileAtomicFunc(filename, baseDir, tmpDir, perm, persist, func(f io.Writer) error { _, err := f.Write(data) return err }) } +*/ diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index 86c4126..08d7f71 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -16,13 +16,14 @@ package objectstorage import ( "io" + "time" - "agola.io/agola/internal/objectstorage/types" + errors "golang.org/x/xerrors" ) type Storage interface { - Stat(filepath string) (*types.ObjectInfo, error) - ReadObject(filepath string) (types.ReadSeekCloser, error) + Stat(filepath string) (*ObjectInfo, error) + ReadObject(filepath string) (ReadSeekCloser, error) // WriteObject atomically writes an object. If size is greater or equal to // zero then only size bytes will be read from data and wrote. If size is // less than zero data will be wrote until EOF. When persist is true the @@ -30,7 +31,23 @@ type Storage interface { // storage. WriteObject(filepath string, data io.Reader, size int64, persist bool) error DeleteObject(filepath string) error - List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo + List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo +} + +var ErrNotExist = errors.New("does not exist") + +type ReadSeekCloser interface { + io.Reader + io.Seeker + io.Closer +} + +type ObjectInfo struct { + Path string + LastModified time.Time + Size int64 + + Err error } // ObjStorage wraps a Storage providing additional helper functions @@ -47,7 +64,7 @@ func (s *ObjStorage) Delimiter() string { return s.delimiter } -func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan types.ObjectInfo { +func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo { delimiter := s.delimiter if recursive { delimiter = "" diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go index ce04a60..7e6a07f 100644 --- a/internal/objectstorage/objectstorage_test.go +++ b/internal/objectstorage/objectstorage_test.go @@ -24,21 +24,17 @@ import ( "reflect" "strings" "testing" - - "agola.io/agola/internal/objectstorage/posix" - "agola.io/agola/internal/objectstorage/posixflat" - "agola.io/agola/internal/objectstorage/s3" ) -func setupPosix(t *testing.T, dir string) (*posix.PosixStorage, error) { - return posix.New(path.Join(dir, "posix")) +func setupPosix(t *testing.T, dir string) (*PosixStorage, error) { + return NewPosix(path.Join(dir, "posix")) } -func setupPosixFlat(t *testing.T, dir string) (*posixflat.PosixFlatStorage, error) { - return posixflat.New(path.Join(dir, "posixflat")) +func setupPosixFlat(t *testing.T, dir string) (*PosixFlatStorage, error) { + return NewPosixFlat(path.Join(dir, "posixflat")) } -func setupS3(t *testing.T, dir string) (*s3.S3Storage, error) { +func setupS3(t *testing.T, dir string) (*S3Storage, error) { minioEndpoint := os.Getenv("MINIO_ENDPOINT") minioAccessKey := os.Getenv("MINIO_ACCESSKEY") minioSecretKey := os.Getenv("MINIO_SECRETKEY") @@ -47,7 +43,7 @@ func setupS3(t *testing.T, dir string) (*s3.S3Storage, error) { return nil, nil } - return s3.New(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) + return NewS3(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) } func TestList(t *testing.T) { @@ -272,7 +268,7 @@ func TestList(t *testing.T) { for sname, s := range tt.s { t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) { switch s := s.(type) { - case *s3.S3Storage: + case *S3Storage: if s == nil { t.SkipNow() } @@ -336,7 +332,7 @@ func TestWriteObject(t *testing.T) { for sname, s := range map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s} { t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) { switch s := s.(type) { - case *s3.S3Storage: + case *S3Storage: if s == nil { t.SkipNow() } diff --git a/internal/objectstorage/posix/posix.go b/internal/objectstorage/posix.go similarity index 80% rename from internal/objectstorage/posix/posix.go rename to internal/objectstorage/posix.go index 3364a03..75e1ff6 100644 --- a/internal/objectstorage/posix/posix.go +++ b/internal/objectstorage/posix.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posix +package objectstorage import ( "io" @@ -21,9 +21,6 @@ import ( "path/filepath" "strings" - "agola.io/agola/internal/objectstorage/common" - "agola.io/agola/internal/objectstorage/types" - errors "golang.org/x/xerrors" ) @@ -37,7 +34,7 @@ type PosixStorage struct { tmpDir string } -func New(baseDir string) (*PosixStorage, error) { +func NewPosix(baseDir string) (*PosixStorage, error) { if err := os.MkdirAll(baseDir, 0770); err != nil { return nil, err } @@ -59,7 +56,7 @@ func (s *PosixStorage) fsPath(p string) (string, error) { return filepath.Join(s.dataDir, p), nil } -func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) { +func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -68,15 +65,15 @@ func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) { fi, err := os.Stat(fspath) if err != nil { if os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, err } - return &types.ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil + return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil } -func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) { +func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -84,7 +81,7 @@ func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) { f, err := os.Open(fspath) if err != nil && os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return f, err } @@ -103,7 +100,7 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist if size >= 0 { r = io.LimitReader(data, size) } - return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { + return writeFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { _, err := io.Copy(f, r) return err }) @@ -117,7 +114,7 @@ func (s *PosixStorage) DeleteObject(p string) error { if err := os.Remove(fspath); err != nil { if os.IsNotExist(err) { - return types.ErrNotExist + return ErrNotExist } return err } @@ -151,16 +148,16 @@ func (s *PosixStorage) DeleteObject(p string) error { return nil } -func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { - objectCh := make(chan types.ObjectInfo, 1) +func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} return objectCh } if startWith != "" && !strings.Contains(startWith, prefix) { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} return objectCh } @@ -182,7 +179,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s startWith = strings.TrimPrefix(startWith, "/") } - go func(objectCh chan<- types.ObjectInfo) { + go func(objectCh chan<- ObjectInfo) { defer close(objectCh) err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { if err != nil && !os.IsNotExist(err) { @@ -219,7 +216,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s if strings.HasPrefix(p, prefix) && p > startWith { select { // Send object content. - case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: + case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: // If receives done from the caller, return here. case <-doneCh: return io.EOF @@ -229,7 +226,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s return nil }) if err != nil && err != io.EOF { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: err, } return diff --git a/internal/objectstorage/posix/posix_test.go b/internal/objectstorage/posix_test.go similarity index 94% rename from internal/objectstorage/posix/posix_test.go rename to internal/objectstorage/posix_test.go index aca1a2e..e5f3a39 100644 --- a/internal/objectstorage/posix/posix_test.go +++ b/internal/objectstorage/posix_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posix +package objectstorage import ( "bytes" @@ -22,7 +22,7 @@ import ( "testing" ) -func TestDeleteObject(t *testing.T) { +func TestPosixDeleteObject(t *testing.T) { objects := []string{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"} dir, err := ioutil.TempDir("", "objectstorage") @@ -31,7 +31,7 @@ func TestDeleteObject(t *testing.T) { } //defer os.RemoveAll(dir) - ls, err := New(dir) + ls, err := NewPosix(dir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/objectstorage/posixflat/posixflat.go b/internal/objectstorage/posixflat.go similarity index 88% rename from internal/objectstorage/posixflat/posixflat.go rename to internal/objectstorage/posixflat.go index cdcdd6e..85dbad5 100644 --- a/internal/objectstorage/posixflat/posixflat.go +++ b/internal/objectstorage/posixflat.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posixflat +package objectstorage import ( "io" @@ -23,15 +23,10 @@ import ( "strings" "unicode/utf8" - "agola.io/agola/internal/objectstorage/common" - "agola.io/agola/internal/objectstorage/types" - errors "golang.org/x/xerrors" ) const ( - dataDirName = "data" - tmpDirName = "tmp" splitLength = 8 ) @@ -210,7 +205,7 @@ type PosixFlatStorage struct { tmpDir string } -func New(baseDir string) (*PosixFlatStorage, error) { +func NewPosixFlat(baseDir string) (*PosixFlatStorage, error) { if err := os.MkdirAll(baseDir, 0770); err != nil { return nil, err } @@ -235,7 +230,7 @@ func (s *PosixFlatStorage) fsPath(p string) (string, error) { return filepath.Join(s.dataDir, escape(p)), nil } -func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) { +func (s *PosixFlatStorage) Stat(p string) (*ObjectInfo, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -244,15 +239,15 @@ func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) { fi, err := os.Stat(fspath) if err != nil { if os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, err } - return &types.ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil + return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil } -func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) { +func (s *PosixFlatStorage) ReadObject(p string) (ReadSeekCloser, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -260,7 +255,7 @@ func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) { f, err := os.Open(fspath) if err != nil && os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return f, err } @@ -279,7 +274,7 @@ func (s *PosixFlatStorage) WriteObject(p string, data io.Reader, size int64, per if size >= 0 { r = io.LimitReader(data, size) } - return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { + return writeFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { _, err := io.Copy(f, r) return err }) @@ -293,7 +288,7 @@ func (s *PosixFlatStorage) DeleteObject(p string) error { if err := os.Remove(fspath); err != nil { if os.IsNotExist(err) { - return types.ErrNotExist + return ErrNotExist } return err } @@ -327,16 +322,16 @@ func (s *PosixFlatStorage) DeleteObject(p string) error { return nil } -func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { - objectCh := make(chan types.ObjectInfo, 1) +func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} return objectCh } if startWith != "" && !strings.Contains(startWith, prefix) { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} return objectCh } @@ -358,7 +353,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch startWith = strings.TrimPrefix(startWith, "/") } - go func(objectCh chan<- types.ObjectInfo) { + go func(objectCh chan<- ObjectInfo) { var prevp string defer close(objectCh) err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { @@ -416,7 +411,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch if p > prevp { select { // Send object content. - case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: + case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: // If receives done from the caller, return here. case <-doneCh: return io.EOF @@ -428,7 +423,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch return nil }) if err != nil && err != io.EOF { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: err, } return diff --git a/internal/objectstorage/posixflat/posixflat_test.go b/internal/objectstorage/posixflat_test.go similarity index 97% rename from internal/objectstorage/posixflat/posixflat_test.go rename to internal/objectstorage/posixflat_test.go index 4fc4328..2741243 100644 --- a/internal/objectstorage/posixflat/posixflat_test.go +++ b/internal/objectstorage/posixflat_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posixflat +package objectstorage import ( "bytes" @@ -77,7 +77,7 @@ func TestEscapeUnescape(t *testing.T) { } } -func TestDeleteObject(t *testing.T) { +func TestPosixFlatDeleteObject(t *testing.T) { objects := []string{"/", "//", "☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"} dir, err := ioutil.TempDir("", "objectstorage") @@ -86,7 +86,7 @@ func TestDeleteObject(t *testing.T) { } //defer os.RemoveAll(dir) - ls, err := New(dir) + ls, err := NewPosixFlat(dir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/objectstorage/s3/s3.go b/internal/objectstorage/s3.go similarity index 85% rename from internal/objectstorage/s3/s3.go rename to internal/objectstorage/s3.go index 06b28bd..da33d1c 100644 --- a/internal/objectstorage/s3/s3.go +++ b/internal/objectstorage/s3.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package s3 +package objectstorage import ( "io" @@ -21,8 +21,6 @@ import ( "os" "strings" - "agola.io/agola/internal/objectstorage/types" - minio "github.com/minio/minio-go" errors "golang.org/x/xerrors" ) @@ -34,7 +32,7 @@ type S3Storage struct { minioCore *minio.Core } -func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { +func NewS3(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure) if err != nil { return nil, err @@ -62,24 +60,24 @@ func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure }, nil } -func (s *S3Storage) Stat(p string) (*types.ObjectInfo, error) { +func (s *S3Storage) Stat(p string) (*ObjectInfo, error) { oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{}) if err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, merr } - return &types.ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil + return &ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil } -func (s *S3Storage) ReadObject(filepath string) (types.ReadSeekCloser, error) { +func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, merr } @@ -120,11 +118,11 @@ func (s *S3Storage) DeleteObject(filepath string) error { return s.minioClient.RemoveObject(s.bucket, filepath) } -func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { - objectCh := make(chan types.ObjectInfo, 1) +func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: errors.Errorf("wrong delimiter %q", delimiter), } return objectCh @@ -139,7 +137,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru } // Initiate list objects goroutine here. - go func(objectCh chan<- types.ObjectInfo) { + go func(objectCh chan<- ObjectInfo) { defer close(objectCh) // Save continuationToken for next request. var continuationToken string @@ -147,7 +145,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru // Get list of objects a maximum of 1000 per request. result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith) if err != nil { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: err, } return @@ -157,7 +155,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru for _, object := range result.Contents { select { // Send object content. - case objectCh <- types.ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}: + case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}: // If receives done from the caller, return here. case <-doneCh: return diff --git a/internal/objectstorage/types/types.go b/internal/objectstorage/types/types.go deleted file mode 100644 index 7911bbb..0000000 --- a/internal/objectstorage/types/types.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2019 Sorint.lab -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "errors" - "io" - "time" -) - -// TODO(sgotti) -// define common errors (like notFound) so the implementations will return them -// instead of their own errors - -var ErrNotExist = errors.New("does not exist") - -type ReadSeekCloser interface { - io.Reader - io.Seeker - io.Closer -} - -type ObjectInfo struct { - Path string - LastModified time.Time - Size int64 - - Err error -} diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index ca32a5a..707a702 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -28,7 +28,6 @@ import ( "agola.io/agola/internal/db" "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/sequence" "agola.io/agola/internal/util" "agola.io/agola/services/configstore/types" @@ -130,10 +129,10 @@ func (r *ReadDB) ResetDB(ctx context.Context) error { func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return "", err } - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 27bb246..fd2ae22 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -26,7 +26,6 @@ import ( "agola.io/agola/internal/db" "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/services/runservice/action" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/readdb" @@ -231,7 +230,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se } f, err := h.ost.ReadObject(logPath) if err != nil { - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err), true } return err, true diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index c1c7960..2c78963 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -25,7 +25,6 @@ import ( "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/services/runservice/action" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" @@ -249,7 +248,7 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error archivePath := store.OSTRunTaskArchivePath(rtID, step) f, err := h.ost.ReadObject(archivePath) if err != nil { - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err) } return err @@ -326,7 +325,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, defer close(doneCh) // get the latest modified object - var lastObject *ostypes.ObjectInfo + var lastObject *objectstorage.ObjectInfo for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) { if object.Err != nil { return "", object.Err @@ -345,7 +344,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, } _, err := ost.Stat(cachePath) - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return "", nil } if err != nil { @@ -358,7 +357,7 @@ func (h *CacheHandler) readCache(key string, w io.Writer) error { cachePath := store.OSTCachePath(key) f, err := h.ost.ReadObject(cachePath) if err != nil { - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err) } return err diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 029af56..f971d63 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -32,7 +32,6 @@ import ( "agola.io/agola/internal/db" "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/sequence" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" @@ -668,10 +667,10 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return "", err } - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 46c7e97..cfd5387 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -26,7 +26,7 @@ import ( "agola.io/agola/internal/datamanager" "agola.io/agola/internal/etcd" slog "agola.io/agola/internal/log" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/runconfig" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" @@ -865,7 +865,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { func (s *Runservice) OSTFileExists(path string) (bool, error) { _, err := s.ost.Stat(path) - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return false, err } return err == nil, nil @@ -1359,7 +1359,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. } if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { log.Warnf("failed to delete cache object %q: %v", object.Path, err) } } @@ -1411,7 +1411,7 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv } if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { log.Warnf("failed to delete workspace object %q: %v", object.Path, err) } } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 207c8d6..7a86552 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -24,7 +24,7 @@ import ( "agola.io/agola/internal/datamanager" "agola.io/agola/internal/etcd" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/util" "agola.io/agola/services/runservice/types" @@ -504,7 +504,7 @@ func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, dm *datamanager.DataMan } if r == nil { r, err = OSTGetRun(dm, runID) - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return nil, err } }