diff --git a/internal/common/common.go b/internal/common/common.go index cb6a5d0..26f62cc 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -23,8 +23,6 @@ import ( "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - "agola.io/agola/internal/objectstorage/posix" - "agola.io/agola/internal/objectstorage/s3" "agola.io/agola/internal/services/config" "go.uber.org/zap" errors "golang.org/x/xerrors" @@ -83,7 +81,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error switch c.Type { case config.ObjectStorageTypePosix: - ost, err = posix.New(c.Path) + ost, err = objectstorage.NewPosix(c.Path) if err != nil { return nil, errors.Errorf("failed to create posix object storage: %w", err) } @@ -102,7 +100,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme) } } - ost, err = s3.New(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) + ost, err = objectstorage.NewS3(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure) if err != nil { return nil, errors.Errorf("failed to create s3 object storage: %w", err) } diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index b510eed..b557e39 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -26,7 +26,7 @@ import ( "sort" "strings" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/sequence" uuid "github.com/satori/go.uuid" @@ -165,7 +165,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er } curDataStatus, err := d.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return err } @@ -321,10 +321,10 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s if actionGroup.DataStatusFile != nil { // TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed oldDataf, err := d.ost.ReadObject(d.DataFilePath(dataType, actionGroup.DataStatusFile.ID)) - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return nil, err } - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { dec := json.NewDecoder(oldDataf) for { var de *DataEntry @@ -481,7 +481,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { var matchingDataFileID string // get the matching data file for the action entry ID if len(curFiles[dataType]) == 0 { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } matchingDataFileID = curFiles[dataType][0].ID @@ -507,7 +507,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { pos, ok := dataFileIndex.Index[id] if !ok { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, matchingDataFileID)) @@ -560,7 +560,7 @@ func (d *DataManager) GetFirstDataStatusSequences(n int) ([]*sequence.Sequence, } if len(dataStatusSequences) == 0 { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } return dataStatusSequences, nil @@ -601,7 +601,7 @@ func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, e }) if len(dataStatusSequences) == 0 { - return nil, ostypes.ErrNotExist + return nil, objectstorage.ErrNotExist } return dataStatusSequences, nil @@ -862,7 +862,7 @@ func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequenc if _, ok := dataStatusPathsMap[object.Path]; !ok { d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -930,7 +930,7 @@ func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequenc if _, ok := files[pne]; !ok { d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 703f998..470a223 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -31,8 +31,6 @@ import ( slog "agola.io/agola/internal/log" "agola.io/agola/internal/objectstorage" - "agola.io/agola/internal/objectstorage/posix" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/testutil" "github.com/google/go-cmp/cmp" errors "golang.org/x/xerrors" @@ -85,7 +83,7 @@ func TestEtcdReset(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -195,7 +193,7 @@ func TestEtcdResetWalsGap(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -324,7 +322,7 @@ func TestConcurrentUpdate(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -415,7 +413,7 @@ func TestEtcdWalCleaner(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -488,7 +486,7 @@ func TestReadObject(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -560,8 +558,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != ostypes.ErrNotExist { - t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) + if err != objectstorage.ErrNotExist { + t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) @@ -584,8 +582,8 @@ func TestReadObject(t *testing.T) { // should not exists _, _, err = dm.ReadObject("datatype01", "object1", nil) - if err != ostypes.ErrNotExist { - t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err) + if err != objectstorage.ErrNotExist { + t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) } // should exist _, _, err = dm.ReadObject("datatype01", "object19", nil) @@ -781,7 +779,7 @@ func testCheckpoint(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -997,7 +995,7 @@ func TestRead(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1111,7 +1109,7 @@ func testClean(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1229,7 +1227,7 @@ func testCleanConcurrentCheckpoint(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1358,7 +1356,7 @@ func testStorageWalCleaner(t *testing.T, basePath string) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1498,7 +1496,7 @@ func TestExportImport(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - ost, err := posix.New(ostDir) + ost, err := objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -1607,7 +1605,7 @@ func TestExportImport(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - ost, err = posix.New(ostDir) + ost, err = objectstorage.NewPosix(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 4aaabdc..3f67086 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -26,7 +26,7 @@ import ( "time" "agola.io/agola/internal/etcd" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/sequence" uuid "github.com/satori/go.uuid" @@ -132,7 +132,7 @@ func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.Read func (d *DataManager) HasOSTWal(walseq string) (bool, error) { _, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed") - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return false, nil } if err != nil { @@ -909,7 +909,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error { walStatusFilePath := d.storageWalDataFile(header.WalDataFileID) d.log.Infof("removing %q", walStatusFilePath) if err := d.ost.DeleteObject(walStatusFilePath); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -917,7 +917,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error { // then remove wal status files d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -928,7 +928,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error { if ext == ".checkpointed" { d.log.Infof("removing %q", object.Path) if err := d.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { return err } } @@ -1149,7 +1149,7 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro firstWal = dataStatus.WalSequence } else { dataStatus, err = d.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return err } // set the first wal to import in etcd if there's a snapshot. In this way we'll diff --git a/internal/objectstorage/common/atomic.go b/internal/objectstorage/atomic.go similarity index 86% rename from internal/objectstorage/common/atomic.go rename to internal/objectstorage/atomic.go index 4b14bec..b0a5c32 100644 --- a/internal/objectstorage/common/atomic.go +++ b/internal/objectstorage/atomic.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package common +package objectstorage import ( "io" @@ -22,12 +22,12 @@ import ( "strings" ) -// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a +// writeFileAtomicFunc atomically writes a file, it achieves this by creating a // temporary file and then moving it. writeFunc is the func that will write // data to the file. // TODO(sgotti) remove left over tmp files if process crashes before calling // os.Remove -func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { +func writeFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { f, err := ioutil.TempFile(tmpDir, "tmpfile") if err != nil { return err @@ -75,10 +75,12 @@ func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bo return nil } -func WriteFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error { - return WriteFileAtomicFunc(filename, baseDir, tmpDir, perm, persist, +/* +func writeFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error { + return writeFileAtomicFunc(filename, baseDir, tmpDir, perm, persist, func(f io.Writer) error { _, err := f.Write(data) return err }) } +*/ diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index 86c4126..08d7f71 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -16,13 +16,14 @@ package objectstorage import ( "io" + "time" - "agola.io/agola/internal/objectstorage/types" + errors "golang.org/x/xerrors" ) type Storage interface { - Stat(filepath string) (*types.ObjectInfo, error) - ReadObject(filepath string) (types.ReadSeekCloser, error) + Stat(filepath string) (*ObjectInfo, error) + ReadObject(filepath string) (ReadSeekCloser, error) // WriteObject atomically writes an object. If size is greater or equal to // zero then only size bytes will be read from data and wrote. If size is // less than zero data will be wrote until EOF. When persist is true the @@ -30,7 +31,23 @@ type Storage interface { // storage. WriteObject(filepath string, data io.Reader, size int64, persist bool) error DeleteObject(filepath string) error - List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo + List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo +} + +var ErrNotExist = errors.New("does not exist") + +type ReadSeekCloser interface { + io.Reader + io.Seeker + io.Closer +} + +type ObjectInfo struct { + Path string + LastModified time.Time + Size int64 + + Err error } // ObjStorage wraps a Storage providing additional helper functions @@ -47,7 +64,7 @@ func (s *ObjStorage) Delimiter() string { return s.delimiter } -func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan types.ObjectInfo { +func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo { delimiter := s.delimiter if recursive { delimiter = "" diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go index ce04a60..7e6a07f 100644 --- a/internal/objectstorage/objectstorage_test.go +++ b/internal/objectstorage/objectstorage_test.go @@ -24,21 +24,17 @@ import ( "reflect" "strings" "testing" - - "agola.io/agola/internal/objectstorage/posix" - "agola.io/agola/internal/objectstorage/posixflat" - "agola.io/agola/internal/objectstorage/s3" ) -func setupPosix(t *testing.T, dir string) (*posix.PosixStorage, error) { - return posix.New(path.Join(dir, "posix")) +func setupPosix(t *testing.T, dir string) (*PosixStorage, error) { + return NewPosix(path.Join(dir, "posix")) } -func setupPosixFlat(t *testing.T, dir string) (*posixflat.PosixFlatStorage, error) { - return posixflat.New(path.Join(dir, "posixflat")) +func setupPosixFlat(t *testing.T, dir string) (*PosixFlatStorage, error) { + return NewPosixFlat(path.Join(dir, "posixflat")) } -func setupS3(t *testing.T, dir string) (*s3.S3Storage, error) { +func setupS3(t *testing.T, dir string) (*S3Storage, error) { minioEndpoint := os.Getenv("MINIO_ENDPOINT") minioAccessKey := os.Getenv("MINIO_ACCESSKEY") minioSecretKey := os.Getenv("MINIO_SECRETKEY") @@ -47,7 +43,7 @@ func setupS3(t *testing.T, dir string) (*s3.S3Storage, error) { return nil, nil } - return s3.New(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) + return NewS3(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) } func TestList(t *testing.T) { @@ -272,7 +268,7 @@ func TestList(t *testing.T) { for sname, s := range tt.s { t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) { switch s := s.(type) { - case *s3.S3Storage: + case *S3Storage: if s == nil { t.SkipNow() } @@ -336,7 +332,7 @@ func TestWriteObject(t *testing.T) { for sname, s := range map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s} { t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) { switch s := s.(type) { - case *s3.S3Storage: + case *S3Storage: if s == nil { t.SkipNow() } diff --git a/internal/objectstorage/posix/posix.go b/internal/objectstorage/posix.go similarity index 80% rename from internal/objectstorage/posix/posix.go rename to internal/objectstorage/posix.go index 3364a03..75e1ff6 100644 --- a/internal/objectstorage/posix/posix.go +++ b/internal/objectstorage/posix.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posix +package objectstorage import ( "io" @@ -21,9 +21,6 @@ import ( "path/filepath" "strings" - "agola.io/agola/internal/objectstorage/common" - "agola.io/agola/internal/objectstorage/types" - errors "golang.org/x/xerrors" ) @@ -37,7 +34,7 @@ type PosixStorage struct { tmpDir string } -func New(baseDir string) (*PosixStorage, error) { +func NewPosix(baseDir string) (*PosixStorage, error) { if err := os.MkdirAll(baseDir, 0770); err != nil { return nil, err } @@ -59,7 +56,7 @@ func (s *PosixStorage) fsPath(p string) (string, error) { return filepath.Join(s.dataDir, p), nil } -func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) { +func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -68,15 +65,15 @@ func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) { fi, err := os.Stat(fspath) if err != nil { if os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, err } - return &types.ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil + return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil } -func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) { +func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -84,7 +81,7 @@ func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) { f, err := os.Open(fspath) if err != nil && os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return f, err } @@ -103,7 +100,7 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist if size >= 0 { r = io.LimitReader(data, size) } - return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { + return writeFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { _, err := io.Copy(f, r) return err }) @@ -117,7 +114,7 @@ func (s *PosixStorage) DeleteObject(p string) error { if err := os.Remove(fspath); err != nil { if os.IsNotExist(err) { - return types.ErrNotExist + return ErrNotExist } return err } @@ -151,16 +148,16 @@ func (s *PosixStorage) DeleteObject(p string) error { return nil } -func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { - objectCh := make(chan types.ObjectInfo, 1) +func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} return objectCh } if startWith != "" && !strings.Contains(startWith, prefix) { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} return objectCh } @@ -182,7 +179,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s startWith = strings.TrimPrefix(startWith, "/") } - go func(objectCh chan<- types.ObjectInfo) { + go func(objectCh chan<- ObjectInfo) { defer close(objectCh) err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { if err != nil && !os.IsNotExist(err) { @@ -219,7 +216,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s if strings.HasPrefix(p, prefix) && p > startWith { select { // Send object content. - case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: + case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: // If receives done from the caller, return here. case <-doneCh: return io.EOF @@ -229,7 +226,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s return nil }) if err != nil && err != io.EOF { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: err, } return diff --git a/internal/objectstorage/posix/posix_test.go b/internal/objectstorage/posix_test.go similarity index 94% rename from internal/objectstorage/posix/posix_test.go rename to internal/objectstorage/posix_test.go index aca1a2e..e5f3a39 100644 --- a/internal/objectstorage/posix/posix_test.go +++ b/internal/objectstorage/posix_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posix +package objectstorage import ( "bytes" @@ -22,7 +22,7 @@ import ( "testing" ) -func TestDeleteObject(t *testing.T) { +func TestPosixDeleteObject(t *testing.T) { objects := []string{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"} dir, err := ioutil.TempDir("", "objectstorage") @@ -31,7 +31,7 @@ func TestDeleteObject(t *testing.T) { } //defer os.RemoveAll(dir) - ls, err := New(dir) + ls, err := NewPosix(dir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/objectstorage/posixflat/posixflat.go b/internal/objectstorage/posixflat.go similarity index 88% rename from internal/objectstorage/posixflat/posixflat.go rename to internal/objectstorage/posixflat.go index cdcdd6e..85dbad5 100644 --- a/internal/objectstorage/posixflat/posixflat.go +++ b/internal/objectstorage/posixflat.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posixflat +package objectstorage import ( "io" @@ -23,15 +23,10 @@ import ( "strings" "unicode/utf8" - "agola.io/agola/internal/objectstorage/common" - "agola.io/agola/internal/objectstorage/types" - errors "golang.org/x/xerrors" ) const ( - dataDirName = "data" - tmpDirName = "tmp" splitLength = 8 ) @@ -210,7 +205,7 @@ type PosixFlatStorage struct { tmpDir string } -func New(baseDir string) (*PosixFlatStorage, error) { +func NewPosixFlat(baseDir string) (*PosixFlatStorage, error) { if err := os.MkdirAll(baseDir, 0770); err != nil { return nil, err } @@ -235,7 +230,7 @@ func (s *PosixFlatStorage) fsPath(p string) (string, error) { return filepath.Join(s.dataDir, escape(p)), nil } -func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) { +func (s *PosixFlatStorage) Stat(p string) (*ObjectInfo, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -244,15 +239,15 @@ func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) { fi, err := os.Stat(fspath) if err != nil { if os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, err } - return &types.ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil + return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil } -func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) { +func (s *PosixFlatStorage) ReadObject(p string) (ReadSeekCloser, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err @@ -260,7 +255,7 @@ func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) { f, err := os.Open(fspath) if err != nil && os.IsNotExist(err) { - return nil, types.ErrNotExist + return nil, ErrNotExist } return f, err } @@ -279,7 +274,7 @@ func (s *PosixFlatStorage) WriteObject(p string, data io.Reader, size int64, per if size >= 0 { r = io.LimitReader(data, size) } - return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { + return writeFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error { _, err := io.Copy(f, r) return err }) @@ -293,7 +288,7 @@ func (s *PosixFlatStorage) DeleteObject(p string) error { if err := os.Remove(fspath); err != nil { if os.IsNotExist(err) { - return types.ErrNotExist + return ErrNotExist } return err } @@ -327,16 +322,16 @@ func (s *PosixFlatStorage) DeleteObject(p string) error { return nil } -func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { - objectCh := make(chan types.ObjectInfo, 1) +func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} return objectCh } if startWith != "" && !strings.Contains(startWith, prefix) { - objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} return objectCh } @@ -358,7 +353,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch startWith = strings.TrimPrefix(startWith, "/") } - go func(objectCh chan<- types.ObjectInfo) { + go func(objectCh chan<- ObjectInfo) { var prevp string defer close(objectCh) err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { @@ -416,7 +411,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch if p > prevp { select { // Send object content. - case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: + case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}: // If receives done from the caller, return here. case <-doneCh: return io.EOF @@ -428,7 +423,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch return nil }) if err != nil && err != io.EOF { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: err, } return diff --git a/internal/objectstorage/posixflat/posixflat_test.go b/internal/objectstorage/posixflat_test.go similarity index 97% rename from internal/objectstorage/posixflat/posixflat_test.go rename to internal/objectstorage/posixflat_test.go index 4fc4328..2741243 100644 --- a/internal/objectstorage/posixflat/posixflat_test.go +++ b/internal/objectstorage/posixflat_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package posixflat +package objectstorage import ( "bytes" @@ -77,7 +77,7 @@ func TestEscapeUnescape(t *testing.T) { } } -func TestDeleteObject(t *testing.T) { +func TestPosixFlatDeleteObject(t *testing.T) { objects := []string{"/", "//", "☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"} dir, err := ioutil.TempDir("", "objectstorage") @@ -86,7 +86,7 @@ func TestDeleteObject(t *testing.T) { } //defer os.RemoveAll(dir) - ls, err := New(dir) + ls, err := NewPosixFlat(dir) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/objectstorage/s3/s3.go b/internal/objectstorage/s3.go similarity index 85% rename from internal/objectstorage/s3/s3.go rename to internal/objectstorage/s3.go index 06b28bd..da33d1c 100644 --- a/internal/objectstorage/s3/s3.go +++ b/internal/objectstorage/s3.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package s3 +package objectstorage import ( "io" @@ -21,8 +21,6 @@ import ( "os" "strings" - "agola.io/agola/internal/objectstorage/types" - minio "github.com/minio/minio-go" errors "golang.org/x/xerrors" ) @@ -34,7 +32,7 @@ type S3Storage struct { minioCore *minio.Core } -func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { +func NewS3(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure) if err != nil { return nil, err @@ -62,24 +60,24 @@ func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure }, nil } -func (s *S3Storage) Stat(p string) (*types.ObjectInfo, error) { +func (s *S3Storage) Stat(p string) (*ObjectInfo, error) { oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{}) if err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, merr } - return &types.ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil + return &ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil } -func (s *S3Storage) ReadObject(filepath string) (types.ReadSeekCloser, error) { +func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { - return nil, types.ErrNotExist + return nil, ErrNotExist } return nil, merr } @@ -120,11 +118,11 @@ func (s *S3Storage) DeleteObject(filepath string) error { return s.minioClient.RemoveObject(s.bucket, filepath) } -func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo { - objectCh := make(chan types.ObjectInfo, 1) +func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) if len(delimiter) > 1 { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: errors.Errorf("wrong delimiter %q", delimiter), } return objectCh @@ -139,7 +137,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru } // Initiate list objects goroutine here. - go func(objectCh chan<- types.ObjectInfo) { + go func(objectCh chan<- ObjectInfo) { defer close(objectCh) // Save continuationToken for next request. var continuationToken string @@ -147,7 +145,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru // Get list of objects a maximum of 1000 per request. result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith) if err != nil { - objectCh <- types.ObjectInfo{ + objectCh <- ObjectInfo{ Err: err, } return @@ -157,7 +155,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru for _, object := range result.Contents { select { // Send object content. - case objectCh <- types.ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}: + case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}: // If receives done from the caller, return here. case <-doneCh: return diff --git a/internal/objectstorage/types/types.go b/internal/objectstorage/types/types.go deleted file mode 100644 index 7911bbb..0000000 --- a/internal/objectstorage/types/types.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2019 Sorint.lab -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied -// See the License for the specific language governing permissions and -// limitations under the License. - -package types - -import ( - "errors" - "io" - "time" -) - -// TODO(sgotti) -// define common errors (like notFound) so the implementations will return them -// instead of their own errors - -var ErrNotExist = errors.New("does not exist") - -type ReadSeekCloser interface { - io.Reader - io.Seeker - io.Closer -} - -type ObjectInfo struct { - Path string - LastModified time.Time - Size int64 - - Err error -} diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index ca32a5a..707a702 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -28,7 +28,6 @@ import ( "agola.io/agola/internal/db" "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/sequence" "agola.io/agola/internal/util" "agola.io/agola/services/configstore/types" @@ -130,10 +129,10 @@ func (r *ReadDB) ResetDB(ctx context.Context) error { func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return "", err } - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/api/api.go b/internal/services/runservice/api/api.go index 27bb246..fd2ae22 100644 --- a/internal/services/runservice/api/api.go +++ b/internal/services/runservice/api/api.go @@ -26,7 +26,6 @@ import ( "agola.io/agola/internal/db" "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/services/runservice/action" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/readdb" @@ -231,7 +230,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se } f, err := h.ost.ReadObject(logPath) if err != nil { - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err), true } return err, true diff --git a/internal/services/runservice/api/executor.go b/internal/services/runservice/api/executor.go index c1c7960..2c78963 100644 --- a/internal/services/runservice/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -25,7 +25,6 @@ import ( "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/services/runservice/action" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" @@ -249,7 +248,7 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error archivePath := store.OSTRunTaskArchivePath(rtID, step) f, err := h.ost.ReadObject(archivePath) if err != nil { - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err) } return err @@ -326,7 +325,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, defer close(doneCh) // get the latest modified object - var lastObject *ostypes.ObjectInfo + var lastObject *objectstorage.ObjectInfo for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) { if object.Err != nil { return "", object.Err @@ -345,7 +344,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, } _, err := ost.Stat(cachePath) - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return "", nil } if err != nil { @@ -358,7 +357,7 @@ func (h *CacheHandler) readCache(key string, w io.Writer) error { cachePath := store.OSTCachePath(key) f, err := h.ost.ReadObject(cachePath) if err != nil { - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return common.NewErrNotExist(err) } return err diff --git a/internal/services/runservice/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go index 029af56..f971d63 100644 --- a/internal/services/runservice/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -32,7 +32,6 @@ import ( "agola.io/agola/internal/db" "agola.io/agola/internal/etcd" "agola.io/agola/internal/objectstorage" - ostypes "agola.io/agola/internal/objectstorage/types" "agola.io/agola/internal/sequence" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" @@ -668,10 +667,10 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) { dumpIndex, err := r.dm.GetLastDataStatus() - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return "", err } - if err == ostypes.ErrNotExist { + if err == objectstorage.ErrNotExist { return "", nil } for dataType, files := range dumpIndex.Files { diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index 46c7e97..cfd5387 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -26,7 +26,7 @@ import ( "agola.io/agola/internal/datamanager" "agola.io/agola/internal/etcd" slog "agola.io/agola/internal/log" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/runconfig" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/services/runservice/store" @@ -865,7 +865,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { func (s *Runservice) OSTFileExists(path string) (bool, error) { _, err := s.ost.Stat(path) - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return false, err } return err == nil, nil @@ -1359,7 +1359,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. } if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { log.Warnf("failed to delete cache object %q: %v", object.Path, err) } } @@ -1411,7 +1411,7 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv } if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) { if err := s.ost.DeleteObject(object.Path); err != nil { - if err != ostypes.ErrNotExist { + if err != objectstorage.ErrNotExist { log.Warnf("failed to delete workspace object %q: %v", object.Path, err) } } diff --git a/internal/services/runservice/store/store.go b/internal/services/runservice/store/store.go index 207c8d6..7a86552 100644 --- a/internal/services/runservice/store/store.go +++ b/internal/services/runservice/store/store.go @@ -24,7 +24,7 @@ import ( "agola.io/agola/internal/datamanager" "agola.io/agola/internal/etcd" - ostypes "agola.io/agola/internal/objectstorage/types" + "agola.io/agola/internal/objectstorage" "agola.io/agola/internal/services/runservice/common" "agola.io/agola/internal/util" "agola.io/agola/services/runservice/types" @@ -504,7 +504,7 @@ func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, dm *datamanager.DataMan } if r == nil { r, err = OSTGetRun(dm, runID) - if err != nil && err != ostypes.ErrNotExist { + if err != nil && err != objectstorage.ErrNotExist { return nil, err } }