objectstorage: use a single package
remove all the subpackages and just use a single package
This commit is contained in:
parent
35e1ec0e15
commit
5af07d0852
|
@ -23,8 +23,6 @@ import (
|
||||||
|
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
"agola.io/agola/internal/objectstorage"
|
"agola.io/agola/internal/objectstorage"
|
||||||
"agola.io/agola/internal/objectstorage/posix"
|
|
||||||
"agola.io/agola/internal/objectstorage/s3"
|
|
||||||
"agola.io/agola/internal/services/config"
|
"agola.io/agola/internal/services/config"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
|
@ -83,7 +81,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error
|
||||||
|
|
||||||
switch c.Type {
|
switch c.Type {
|
||||||
case config.ObjectStorageTypePosix:
|
case config.ObjectStorageTypePosix:
|
||||||
ost, err = posix.New(c.Path)
|
ost, err = objectstorage.NewPosix(c.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Errorf("failed to create posix object storage: %w", err)
|
return nil, errors.Errorf("failed to create posix object storage: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -102,7 +100,7 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error
|
||||||
return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme)
|
return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ost, err = s3.New(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure)
|
ost, err = objectstorage.NewS3(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Errorf("failed to create s3 object storage: %w", err)
|
return nil, errors.Errorf("failed to create s3 object storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
"agola.io/agola/internal/objectstorage"
|
||||||
"agola.io/agola/internal/sequence"
|
"agola.io/agola/internal/sequence"
|
||||||
|
|
||||||
uuid "github.com/satori/go.uuid"
|
uuid "github.com/satori/go.uuid"
|
||||||
|
@ -165,7 +165,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er
|
||||||
}
|
}
|
||||||
|
|
||||||
curDataStatus, err := d.GetLastDataStatus()
|
curDataStatus, err := d.GetLastDataStatus()
|
||||||
if err != nil && err != ostypes.ErrNotExist {
|
if err != nil && err != objectstorage.ErrNotExist {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,10 +321,10 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
|
||||||
if actionGroup.DataStatusFile != nil {
|
if actionGroup.DataStatusFile != nil {
|
||||||
// TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed
|
// TODO(sgotti) instead of reading all entries in memory decode it's contents one by one when needed
|
||||||
oldDataf, err := d.ost.ReadObject(d.DataFilePath(dataType, actionGroup.DataStatusFile.ID))
|
oldDataf, err := d.ost.ReadObject(d.DataFilePath(dataType, actionGroup.DataStatusFile.ID))
|
||||||
if err != nil && err != ostypes.ErrNotExist {
|
if err != nil && err != objectstorage.ErrNotExist {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
dec := json.NewDecoder(oldDataf)
|
dec := json.NewDecoder(oldDataf)
|
||||||
for {
|
for {
|
||||||
var de *DataEntry
|
var de *DataEntry
|
||||||
|
@ -481,7 +481,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
|
||||||
var matchingDataFileID string
|
var matchingDataFileID string
|
||||||
// get the matching data file for the action entry ID
|
// get the matching data file for the action entry ID
|
||||||
if len(curFiles[dataType]) == 0 {
|
if len(curFiles[dataType]) == 0 {
|
||||||
return nil, ostypes.ErrNotExist
|
return nil, objectstorage.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
matchingDataFileID = curFiles[dataType][0].ID
|
matchingDataFileID = curFiles[dataType][0].ID
|
||||||
|
@ -507,7 +507,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
|
||||||
|
|
||||||
pos, ok := dataFileIndex.Index[id]
|
pos, ok := dataFileIndex.Index[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, ostypes.ErrNotExist
|
return nil, objectstorage.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, matchingDataFileID))
|
dataf, err := d.ost.ReadObject(d.DataFilePath(dataType, matchingDataFileID))
|
||||||
|
@ -560,7 +560,7 @@ func (d *DataManager) GetFirstDataStatusSequences(n int) ([]*sequence.Sequence,
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(dataStatusSequences) == 0 {
|
if len(dataStatusSequences) == 0 {
|
||||||
return nil, ostypes.ErrNotExist
|
return nil, objectstorage.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
return dataStatusSequences, nil
|
return dataStatusSequences, nil
|
||||||
|
@ -601,7 +601,7 @@ func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, e
|
||||||
})
|
})
|
||||||
|
|
||||||
if len(dataStatusSequences) == 0 {
|
if len(dataStatusSequences) == 0 {
|
||||||
return nil, ostypes.ErrNotExist
|
return nil, objectstorage.ErrNotExist
|
||||||
}
|
}
|
||||||
|
|
||||||
return dataStatusSequences, nil
|
return dataStatusSequences, nil
|
||||||
|
@ -862,7 +862,7 @@ func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequenc
|
||||||
if _, ok := dataStatusPathsMap[object.Path]; !ok {
|
if _, ok := dataStatusPathsMap[object.Path]; !ok {
|
||||||
d.log.Infof("removing %q", object.Path)
|
d.log.Infof("removing %q", object.Path)
|
||||||
if err := d.ost.DeleteObject(object.Path); err != nil {
|
if err := d.ost.DeleteObject(object.Path); err != nil {
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -930,7 +930,7 @@ func (d *DataManager) cleanOldCheckpoints(ctx context.Context, dataStatusSequenc
|
||||||
if _, ok := files[pne]; !ok {
|
if _, ok := files[pne]; !ok {
|
||||||
d.log.Infof("removing %q", object.Path)
|
d.log.Infof("removing %q", object.Path)
|
||||||
if err := d.ost.DeleteObject(object.Path); err != nil {
|
if err := d.ost.DeleteObject(object.Path); err != nil {
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,8 +31,6 @@ import (
|
||||||
|
|
||||||
slog "agola.io/agola/internal/log"
|
slog "agola.io/agola/internal/log"
|
||||||
"agola.io/agola/internal/objectstorage"
|
"agola.io/agola/internal/objectstorage"
|
||||||
"agola.io/agola/internal/objectstorage/posix"
|
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
|
||||||
"agola.io/agola/internal/testutil"
|
"agola.io/agola/internal/testutil"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
|
@ -85,7 +83,7 @@ func TestEtcdReset(t *testing.T) {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -195,7 +193,7 @@ func TestEtcdResetWalsGap(t *testing.T) {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -324,7 +322,7 @@ func TestConcurrentUpdate(t *testing.T) {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -415,7 +413,7 @@ func TestEtcdWalCleaner(t *testing.T) {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -488,7 +486,7 @@ func TestReadObject(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -560,8 +558,8 @@ func TestReadObject(t *testing.T) {
|
||||||
|
|
||||||
// should not exists
|
// should not exists
|
||||||
_, _, err = dm.ReadObject("datatype01", "object1", nil)
|
_, _, err = dm.ReadObject("datatype01", "object1", nil)
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err)
|
t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err)
|
||||||
}
|
}
|
||||||
// should exist
|
// should exist
|
||||||
_, _, err = dm.ReadObject("datatype01", "object19", nil)
|
_, _, err = dm.ReadObject("datatype01", "object19", nil)
|
||||||
|
@ -584,8 +582,8 @@ func TestReadObject(t *testing.T) {
|
||||||
|
|
||||||
// should not exists
|
// should not exists
|
||||||
_, _, err = dm.ReadObject("datatype01", "object1", nil)
|
_, _, err = dm.ReadObject("datatype01", "object1", nil)
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
t.Fatalf("expected err %v, got: %v", ostypes.ErrNotExist, err)
|
t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err)
|
||||||
}
|
}
|
||||||
// should exist
|
// should exist
|
||||||
_, _, err = dm.ReadObject("datatype01", "object19", nil)
|
_, _, err = dm.ReadObject("datatype01", "object19", nil)
|
||||||
|
@ -781,7 +779,7 @@ func testCheckpoint(t *testing.T, basePath string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -997,7 +995,7 @@ func TestRead(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1111,7 +1109,7 @@ func testClean(t *testing.T, basePath string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1229,7 +1227,7 @@ func testCleanConcurrentCheckpoint(t *testing.T, basePath string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1358,7 +1356,7 @@ func testStorageWalCleaner(t *testing.T, basePath string) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1498,7 +1496,7 @@ func TestExportImport(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
ost, err := posix.New(ostDir)
|
ost, err := objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1607,7 +1605,7 @@ func TestExportImport(t *testing.T) {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ost, err = posix.New(ostDir)
|
ost, err = objectstorage.NewPosix(ostDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
"agola.io/agola/internal/objectstorage"
|
||||||
"agola.io/agola/internal/sequence"
|
"agola.io/agola/internal/sequence"
|
||||||
|
|
||||||
uuid "github.com/satori/go.uuid"
|
uuid "github.com/satori/go.uuid"
|
||||||
|
@ -132,7 +132,7 @@ func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.Read
|
||||||
|
|
||||||
func (d *DataManager) HasOSTWal(walseq string) (bool, error) {
|
func (d *DataManager) HasOSTWal(walseq string) (bool, error) {
|
||||||
_, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed")
|
_, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed")
|
||||||
if err == ostypes.ErrNotExist {
|
if err == objectstorage.ErrNotExist {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -909,7 +909,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error {
|
||||||
walStatusFilePath := d.storageWalDataFile(header.WalDataFileID)
|
walStatusFilePath := d.storageWalDataFile(header.WalDataFileID)
|
||||||
d.log.Infof("removing %q", walStatusFilePath)
|
d.log.Infof("removing %q", walStatusFilePath)
|
||||||
if err := d.ost.DeleteObject(walStatusFilePath); err != nil {
|
if err := d.ost.DeleteObject(walStatusFilePath); err != nil {
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -917,7 +917,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error {
|
||||||
// then remove wal status files
|
// then remove wal status files
|
||||||
d.log.Infof("removing %q", object.Path)
|
d.log.Infof("removing %q", object.Path)
|
||||||
if err := d.ost.DeleteObject(object.Path); err != nil {
|
if err := d.ost.DeleteObject(object.Path); err != nil {
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -928,7 +928,7 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error {
|
||||||
if ext == ".checkpointed" {
|
if ext == ".checkpointed" {
|
||||||
d.log.Infof("removing %q", object.Path)
|
d.log.Infof("removing %q", object.Path)
|
||||||
if err := d.ost.DeleteObject(object.Path); err != nil {
|
if err := d.ost.DeleteObject(object.Path); err != nil {
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1149,7 +1149,7 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro
|
||||||
firstWal = dataStatus.WalSequence
|
firstWal = dataStatus.WalSequence
|
||||||
} else {
|
} else {
|
||||||
dataStatus, err = d.GetLastDataStatus()
|
dataStatus, err = d.GetLastDataStatus()
|
||||||
if err != nil && err != ostypes.ErrNotExist {
|
if err != nil && err != objectstorage.ErrNotExist {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// set the first wal to import in etcd if there's a snapshot. In this way we'll
|
// set the first wal to import in etcd if there's a snapshot. In this way we'll
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package common
|
package objectstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
@ -22,12 +22,12 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a
|
// writeFileAtomicFunc atomically writes a file, it achieves this by creating a
|
||||||
// temporary file and then moving it. writeFunc is the func that will write
|
// temporary file and then moving it. writeFunc is the func that will write
|
||||||
// data to the file.
|
// data to the file.
|
||||||
// TODO(sgotti) remove left over tmp files if process crashes before calling
|
// TODO(sgotti) remove left over tmp files if process crashes before calling
|
||||||
// os.Remove
|
// os.Remove
|
||||||
func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error {
|
func writeFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error {
|
||||||
f, err := ioutil.TempFile(tmpDir, "tmpfile")
|
f, err := ioutil.TempFile(tmpDir, "tmpfile")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -75,10 +75,12 @@ func WriteFileAtomicFunc(p, baseDir, tmpDir string, perm os.FileMode, persist bo
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func WriteFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error {
|
/*
|
||||||
return WriteFileAtomicFunc(filename, baseDir, tmpDir, perm, persist,
|
func writeFileAtomic(filename, baseDir, tmpDir string, perm os.FileMode, persist bool, data []byte) error {
|
||||||
|
return writeFileAtomicFunc(filename, baseDir, tmpDir, perm, persist,
|
||||||
func(f io.Writer) error {
|
func(f io.Writer) error {
|
||||||
_, err := f.Write(data)
|
_, err := f.Write(data)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
*/
|
|
@ -16,13 +16,14 @@ package objectstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
"agola.io/agola/internal/objectstorage/types"
|
errors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Storage interface {
|
type Storage interface {
|
||||||
Stat(filepath string) (*types.ObjectInfo, error)
|
Stat(filepath string) (*ObjectInfo, error)
|
||||||
ReadObject(filepath string) (types.ReadSeekCloser, error)
|
ReadObject(filepath string) (ReadSeekCloser, error)
|
||||||
// WriteObject atomically writes an object. If size is greater or equal to
|
// WriteObject atomically writes an object. If size is greater or equal to
|
||||||
// zero then only size bytes will be read from data and wrote. If size is
|
// zero then only size bytes will be read from data and wrote. If size is
|
||||||
// less than zero data will be wrote until EOF. When persist is true the
|
// less than zero data will be wrote until EOF. When persist is true the
|
||||||
|
@ -30,7 +31,23 @@ type Storage interface {
|
||||||
// storage.
|
// storage.
|
||||||
WriteObject(filepath string, data io.Reader, size int64, persist bool) error
|
WriteObject(filepath string, data io.Reader, size int64, persist bool) error
|
||||||
DeleteObject(filepath string) error
|
DeleteObject(filepath string) error
|
||||||
List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo
|
List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
var ErrNotExist = errors.New("does not exist")
|
||||||
|
|
||||||
|
type ReadSeekCloser interface {
|
||||||
|
io.Reader
|
||||||
|
io.Seeker
|
||||||
|
io.Closer
|
||||||
|
}
|
||||||
|
|
||||||
|
type ObjectInfo struct {
|
||||||
|
Path string
|
||||||
|
LastModified time.Time
|
||||||
|
Size int64
|
||||||
|
|
||||||
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjStorage wraps a Storage providing additional helper functions
|
// ObjStorage wraps a Storage providing additional helper functions
|
||||||
|
@ -47,7 +64,7 @@ func (s *ObjStorage) Delimiter() string {
|
||||||
return s.delimiter
|
return s.delimiter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan types.ObjectInfo {
|
func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||||
delimiter := s.delimiter
|
delimiter := s.delimiter
|
||||||
if recursive {
|
if recursive {
|
||||||
delimiter = ""
|
delimiter = ""
|
||||||
|
|
|
@ -24,21 +24,17 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"agola.io/agola/internal/objectstorage/posix"
|
|
||||||
"agola.io/agola/internal/objectstorage/posixflat"
|
|
||||||
"agola.io/agola/internal/objectstorage/s3"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func setupPosix(t *testing.T, dir string) (*posix.PosixStorage, error) {
|
func setupPosix(t *testing.T, dir string) (*PosixStorage, error) {
|
||||||
return posix.New(path.Join(dir, "posix"))
|
return NewPosix(path.Join(dir, "posix"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupPosixFlat(t *testing.T, dir string) (*posixflat.PosixFlatStorage, error) {
|
func setupPosixFlat(t *testing.T, dir string) (*PosixFlatStorage, error) {
|
||||||
return posixflat.New(path.Join(dir, "posixflat"))
|
return NewPosixFlat(path.Join(dir, "posixflat"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupS3(t *testing.T, dir string) (*s3.S3Storage, error) {
|
func setupS3(t *testing.T, dir string) (*S3Storage, error) {
|
||||||
minioEndpoint := os.Getenv("MINIO_ENDPOINT")
|
minioEndpoint := os.Getenv("MINIO_ENDPOINT")
|
||||||
minioAccessKey := os.Getenv("MINIO_ACCESSKEY")
|
minioAccessKey := os.Getenv("MINIO_ACCESSKEY")
|
||||||
minioSecretKey := os.Getenv("MINIO_SECRETKEY")
|
minioSecretKey := os.Getenv("MINIO_SECRETKEY")
|
||||||
|
@ -47,7 +43,7 @@ func setupS3(t *testing.T, dir string) (*s3.S3Storage, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return s3.New(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false)
|
return NewS3(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
|
@ -272,7 +268,7 @@ func TestList(t *testing.T) {
|
||||||
for sname, s := range tt.s {
|
for sname, s := range tt.s {
|
||||||
t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) {
|
t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) {
|
||||||
switch s := s.(type) {
|
switch s := s.(type) {
|
||||||
case *s3.S3Storage:
|
case *S3Storage:
|
||||||
if s == nil {
|
if s == nil {
|
||||||
t.SkipNow()
|
t.SkipNow()
|
||||||
}
|
}
|
||||||
|
@ -336,7 +332,7 @@ func TestWriteObject(t *testing.T) {
|
||||||
for sname, s := range map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s} {
|
for sname, s := range map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s} {
|
||||||
t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) {
|
t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) {
|
||||||
switch s := s.(type) {
|
switch s := s.(type) {
|
||||||
case *s3.S3Storage:
|
case *S3Storage:
|
||||||
if s == nil {
|
if s == nil {
|
||||||
t.SkipNow()
|
t.SkipNow()
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package posix
|
package objectstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
@ -21,9 +21,6 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"agola.io/agola/internal/objectstorage/common"
|
|
||||||
"agola.io/agola/internal/objectstorage/types"
|
|
||||||
|
|
||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,7 +34,7 @@ type PosixStorage struct {
|
||||||
tmpDir string
|
tmpDir string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(baseDir string) (*PosixStorage, error) {
|
func NewPosix(baseDir string) (*PosixStorage, error) {
|
||||||
if err := os.MkdirAll(baseDir, 0770); err != nil {
|
if err := os.MkdirAll(baseDir, 0770); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -59,7 +56,7 @@ func (s *PosixStorage) fsPath(p string) (string, error) {
|
||||||
return filepath.Join(s.dataDir, p), nil
|
return filepath.Join(s.dataDir, p), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) {
|
func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) {
|
||||||
fspath, err := s.fsPath(p)
|
fspath, err := s.fsPath(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -68,15 +65,15 @@ func (s *PosixStorage) Stat(p string) (*types.ObjectInfo, error) {
|
||||||
fi, err := os.Stat(fspath)
|
fi, err := os.Stat(fspath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, types.ErrNotExist
|
return nil, ErrNotExist
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &types.ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil
|
return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) {
|
func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) {
|
||||||
fspath, err := s.fsPath(p)
|
fspath, err := s.fsPath(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -84,7 +81,7 @@ func (s *PosixStorage) ReadObject(p string) (types.ReadSeekCloser, error) {
|
||||||
|
|
||||||
f, err := os.Open(fspath)
|
f, err := os.Open(fspath)
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
return nil, types.ErrNotExist
|
return nil, ErrNotExist
|
||||||
}
|
}
|
||||||
return f, err
|
return f, err
|
||||||
}
|
}
|
||||||
|
@ -103,7 +100,7 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist
|
||||||
if size >= 0 {
|
if size >= 0 {
|
||||||
r = io.LimitReader(data, size)
|
r = io.LimitReader(data, size)
|
||||||
}
|
}
|
||||||
return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error {
|
return writeFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error {
|
||||||
_, err := io.Copy(f, r)
|
_, err := io.Copy(f, r)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -117,7 +114,7 @@ func (s *PosixStorage) DeleteObject(p string) error {
|
||||||
|
|
||||||
if err := os.Remove(fspath); err != nil {
|
if err := os.Remove(fspath); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return types.ErrNotExist
|
return ErrNotExist
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -151,16 +148,16 @@ func (s *PosixStorage) DeleteObject(p string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo {
|
func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||||
objectCh := make(chan types.ObjectInfo, 1)
|
objectCh := make(chan ObjectInfo, 1)
|
||||||
|
|
||||||
if len(delimiter) > 1 {
|
if len(delimiter) > 1 {
|
||||||
objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)}
|
objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)}
|
||||||
return objectCh
|
return objectCh
|
||||||
}
|
}
|
||||||
|
|
||||||
if startWith != "" && !strings.Contains(startWith, prefix) {
|
if startWith != "" && !strings.Contains(startWith, prefix) {
|
||||||
objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)}
|
objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)}
|
||||||
return objectCh
|
return objectCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,7 +179,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s
|
||||||
startWith = strings.TrimPrefix(startWith, "/")
|
startWith = strings.TrimPrefix(startWith, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(objectCh chan<- types.ObjectInfo) {
|
go func(objectCh chan<- ObjectInfo) {
|
||||||
defer close(objectCh)
|
defer close(objectCh)
|
||||||
err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error {
|
err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error {
|
||||||
if err != nil && !os.IsNotExist(err) {
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
@ -219,7 +216,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s
|
||||||
if strings.HasPrefix(p, prefix) && p > startWith {
|
if strings.HasPrefix(p, prefix) && p > startWith {
|
||||||
select {
|
select {
|
||||||
// Send object content.
|
// Send object content.
|
||||||
case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}:
|
case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}:
|
||||||
// If receives done from the caller, return here.
|
// If receives done from the caller, return here.
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
return io.EOF
|
return io.EOF
|
||||||
|
@ -229,7 +226,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
objectCh <- types.ObjectInfo{
|
objectCh <- ObjectInfo{
|
||||||
Err: err,
|
Err: err,
|
||||||
}
|
}
|
||||||
return
|
return
|
|
@ -12,7 +12,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package posix
|
package objectstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@ -22,7 +22,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDeleteObject(t *testing.T) {
|
func TestPosixDeleteObject(t *testing.T) {
|
||||||
objects := []string{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"}
|
objects := []string{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"}
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "objectstorage")
|
dir, err := ioutil.TempDir("", "objectstorage")
|
||||||
|
@ -31,7 +31,7 @@ func TestDeleteObject(t *testing.T) {
|
||||||
}
|
}
|
||||||
//defer os.RemoveAll(dir)
|
//defer os.RemoveAll(dir)
|
||||||
|
|
||||||
ls, err := New(dir)
|
ls, err := NewPosix(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
|
@ -12,7 +12,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package posixflat
|
package objectstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
@ -23,15 +23,10 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"agola.io/agola/internal/objectstorage/common"
|
|
||||||
"agola.io/agola/internal/objectstorage/types"
|
|
||||||
|
|
||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dataDirName = "data"
|
|
||||||
tmpDirName = "tmp"
|
|
||||||
splitLength = 8
|
splitLength = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -210,7 +205,7 @@ type PosixFlatStorage struct {
|
||||||
tmpDir string
|
tmpDir string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(baseDir string) (*PosixFlatStorage, error) {
|
func NewPosixFlat(baseDir string) (*PosixFlatStorage, error) {
|
||||||
if err := os.MkdirAll(baseDir, 0770); err != nil {
|
if err := os.MkdirAll(baseDir, 0770); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -235,7 +230,7 @@ func (s *PosixFlatStorage) fsPath(p string) (string, error) {
|
||||||
return filepath.Join(s.dataDir, escape(p)), nil
|
return filepath.Join(s.dataDir, escape(p)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) {
|
func (s *PosixFlatStorage) Stat(p string) (*ObjectInfo, error) {
|
||||||
fspath, err := s.fsPath(p)
|
fspath, err := s.fsPath(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -244,15 +239,15 @@ func (s *PosixFlatStorage) Stat(p string) (*types.ObjectInfo, error) {
|
||||||
fi, err := os.Stat(fspath)
|
fi, err := os.Stat(fspath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, types.ErrNotExist
|
return nil, ErrNotExist
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &types.ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil
|
return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) {
|
func (s *PosixFlatStorage) ReadObject(p string) (ReadSeekCloser, error) {
|
||||||
fspath, err := s.fsPath(p)
|
fspath, err := s.fsPath(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -260,7 +255,7 @@ func (s *PosixFlatStorage) ReadObject(p string) (types.ReadSeekCloser, error) {
|
||||||
|
|
||||||
f, err := os.Open(fspath)
|
f, err := os.Open(fspath)
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
return nil, types.ErrNotExist
|
return nil, ErrNotExist
|
||||||
}
|
}
|
||||||
return f, err
|
return f, err
|
||||||
}
|
}
|
||||||
|
@ -279,7 +274,7 @@ func (s *PosixFlatStorage) WriteObject(p string, data io.Reader, size int64, per
|
||||||
if size >= 0 {
|
if size >= 0 {
|
||||||
r = io.LimitReader(data, size)
|
r = io.LimitReader(data, size)
|
||||||
}
|
}
|
||||||
return common.WriteFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error {
|
return writeFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error {
|
||||||
_, err := io.Copy(f, r)
|
_, err := io.Copy(f, r)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
@ -293,7 +288,7 @@ func (s *PosixFlatStorage) DeleteObject(p string) error {
|
||||||
|
|
||||||
if err := os.Remove(fspath); err != nil {
|
if err := os.Remove(fspath); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return types.ErrNotExist
|
return ErrNotExist
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -327,16 +322,16 @@ func (s *PosixFlatStorage) DeleteObject(p string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo {
|
func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||||
objectCh := make(chan types.ObjectInfo, 1)
|
objectCh := make(chan ObjectInfo, 1)
|
||||||
|
|
||||||
if len(delimiter) > 1 {
|
if len(delimiter) > 1 {
|
||||||
objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)}
|
objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)}
|
||||||
return objectCh
|
return objectCh
|
||||||
}
|
}
|
||||||
|
|
||||||
if startWith != "" && !strings.Contains(startWith, prefix) {
|
if startWith != "" && !strings.Contains(startWith, prefix) {
|
||||||
objectCh <- types.ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)}
|
objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)}
|
||||||
return objectCh
|
return objectCh
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -358,7 +353,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch
|
||||||
startWith = strings.TrimPrefix(startWith, "/")
|
startWith = strings.TrimPrefix(startWith, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(objectCh chan<- types.ObjectInfo) {
|
go func(objectCh chan<- ObjectInfo) {
|
||||||
var prevp string
|
var prevp string
|
||||||
defer close(objectCh)
|
defer close(objectCh)
|
||||||
err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error {
|
err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error {
|
||||||
|
@ -416,7 +411,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch
|
||||||
if p > prevp {
|
if p > prevp {
|
||||||
select {
|
select {
|
||||||
// Send object content.
|
// Send object content.
|
||||||
case objectCh <- types.ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}:
|
case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}:
|
||||||
// If receives done from the caller, return here.
|
// If receives done from the caller, return here.
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
return io.EOF
|
return io.EOF
|
||||||
|
@ -428,7 +423,7 @@ func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-ch
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
objectCh <- types.ObjectInfo{
|
objectCh <- ObjectInfo{
|
||||||
Err: err,
|
Err: err,
|
||||||
}
|
}
|
||||||
return
|
return
|
|
@ -12,7 +12,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package posixflat
|
package objectstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@ -77,7 +77,7 @@ func TestEscapeUnescape(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeleteObject(t *testing.T) {
|
func TestPosixFlatDeleteObject(t *testing.T) {
|
||||||
objects := []string{"/", "//", "☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"}
|
objects := []string{"/", "//", "☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"}
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "objectstorage")
|
dir, err := ioutil.TempDir("", "objectstorage")
|
||||||
|
@ -86,7 +86,7 @@ func TestDeleteObject(t *testing.T) {
|
||||||
}
|
}
|
||||||
//defer os.RemoveAll(dir)
|
//defer os.RemoveAll(dir)
|
||||||
|
|
||||||
ls, err := New(dir)
|
ls, err := NewPosixFlat(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
|
@ -12,7 +12,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
package s3
|
package objectstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
@ -21,8 +21,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"agola.io/agola/internal/objectstorage/types"
|
|
||||||
|
|
||||||
minio "github.com/minio/minio-go"
|
minio "github.com/minio/minio-go"
|
||||||
errors "golang.org/x/xerrors"
|
errors "golang.org/x/xerrors"
|
||||||
)
|
)
|
||||||
|
@ -34,7 +32,7 @@ type S3Storage struct {
|
||||||
minioCore *minio.Core
|
minioCore *minio.Core
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) {
|
func NewS3(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) {
|
||||||
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure)
|
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -62,24 +60,24 @@ func New(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S3Storage) Stat(p string) (*types.ObjectInfo, error) {
|
func (s *S3Storage) Stat(p string) (*ObjectInfo, error) {
|
||||||
oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{})
|
oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
merr := minio.ToErrorResponse(err)
|
merr := minio.ToErrorResponse(err)
|
||||||
if merr.StatusCode == http.StatusNotFound {
|
if merr.StatusCode == http.StatusNotFound {
|
||||||
return nil, types.ErrNotExist
|
return nil, ErrNotExist
|
||||||
}
|
}
|
||||||
return nil, merr
|
return nil, merr
|
||||||
}
|
}
|
||||||
|
|
||||||
return &types.ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil
|
return &ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S3Storage) ReadObject(filepath string) (types.ReadSeekCloser, error) {
|
func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) {
|
||||||
if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil {
|
if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil {
|
||||||
merr := minio.ToErrorResponse(err)
|
merr := minio.ToErrorResponse(err)
|
||||||
if merr.StatusCode == http.StatusNotFound {
|
if merr.StatusCode == http.StatusNotFound {
|
||||||
return nil, types.ErrNotExist
|
return nil, ErrNotExist
|
||||||
}
|
}
|
||||||
return nil, merr
|
return nil, merr
|
||||||
}
|
}
|
||||||
|
@ -120,11 +118,11 @@ func (s *S3Storage) DeleteObject(filepath string) error {
|
||||||
return s.minioClient.RemoveObject(s.bucket, filepath)
|
return s.minioClient.RemoveObject(s.bucket, filepath)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan types.ObjectInfo {
|
func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||||
objectCh := make(chan types.ObjectInfo, 1)
|
objectCh := make(chan ObjectInfo, 1)
|
||||||
|
|
||||||
if len(delimiter) > 1 {
|
if len(delimiter) > 1 {
|
||||||
objectCh <- types.ObjectInfo{
|
objectCh <- ObjectInfo{
|
||||||
Err: errors.Errorf("wrong delimiter %q", delimiter),
|
Err: errors.Errorf("wrong delimiter %q", delimiter),
|
||||||
}
|
}
|
||||||
return objectCh
|
return objectCh
|
||||||
|
@ -139,7 +137,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initiate list objects goroutine here.
|
// Initiate list objects goroutine here.
|
||||||
go func(objectCh chan<- types.ObjectInfo) {
|
go func(objectCh chan<- ObjectInfo) {
|
||||||
defer close(objectCh)
|
defer close(objectCh)
|
||||||
// Save continuationToken for next request.
|
// Save continuationToken for next request.
|
||||||
var continuationToken string
|
var continuationToken string
|
||||||
|
@ -147,7 +145,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru
|
||||||
// Get list of objects a maximum of 1000 per request.
|
// Get list of objects a maximum of 1000 per request.
|
||||||
result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith)
|
result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
objectCh <- types.ObjectInfo{
|
objectCh <- ObjectInfo{
|
||||||
Err: err,
|
Err: err,
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -157,7 +155,7 @@ func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan stru
|
||||||
for _, object := range result.Contents {
|
for _, object := range result.Contents {
|
||||||
select {
|
select {
|
||||||
// Send object content.
|
// Send object content.
|
||||||
case objectCh <- types.ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}:
|
case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}:
|
||||||
// If receives done from the caller, return here.
|
// If receives done from the caller, return here.
|
||||||
case <-doneCh:
|
case <-doneCh:
|
||||||
return
|
return
|
|
@ -1,41 +0,0 @@
|
||||||
// Copyright 2019 Sorint.lab
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package types
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TODO(sgotti)
|
|
||||||
// define common errors (like notFound) so the implementations will return them
|
|
||||||
// instead of their own errors
|
|
||||||
|
|
||||||
var ErrNotExist = errors.New("does not exist")
|
|
||||||
|
|
||||||
type ReadSeekCloser interface {
|
|
||||||
io.Reader
|
|
||||||
io.Seeker
|
|
||||||
io.Closer
|
|
||||||
}
|
|
||||||
|
|
||||||
type ObjectInfo struct {
|
|
||||||
Path string
|
|
||||||
LastModified time.Time
|
|
||||||
Size int64
|
|
||||||
|
|
||||||
Err error
|
|
||||||
}
|
|
|
@ -28,7 +28,6 @@ import (
|
||||||
"agola.io/agola/internal/db"
|
"agola.io/agola/internal/db"
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
"agola.io/agola/internal/objectstorage"
|
"agola.io/agola/internal/objectstorage"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
|
||||||
"agola.io/agola/internal/sequence"
|
"agola.io/agola/internal/sequence"
|
||||||
"agola.io/agola/internal/util"
|
"agola.io/agola/internal/util"
|
||||||
"agola.io/agola/services/configstore/types"
|
"agola.io/agola/services/configstore/types"
|
||||||
|
@ -130,10 +129,10 @@ func (r *ReadDB) ResetDB(ctx context.Context) error {
|
||||||
|
|
||||||
func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) {
|
func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) {
|
||||||
dumpIndex, err := r.dm.GetLastDataStatus()
|
dumpIndex, err := r.dm.GetLastDataStatus()
|
||||||
if err != nil && err != ostypes.ErrNotExist {
|
if err != nil && err != objectstorage.ErrNotExist {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if err == ostypes.ErrNotExist {
|
if err == objectstorage.ErrNotExist {
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
for dataType, files := range dumpIndex.Files {
|
for dataType, files := range dumpIndex.Files {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import (
|
||||||
"agola.io/agola/internal/db"
|
"agola.io/agola/internal/db"
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
"agola.io/agola/internal/objectstorage"
|
"agola.io/agola/internal/objectstorage"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
|
||||||
"agola.io/agola/internal/services/runservice/action"
|
"agola.io/agola/internal/services/runservice/action"
|
||||||
"agola.io/agola/internal/services/runservice/common"
|
"agola.io/agola/internal/services/runservice/common"
|
||||||
"agola.io/agola/internal/services/runservice/readdb"
|
"agola.io/agola/internal/services/runservice/readdb"
|
||||||
|
@ -231,7 +230,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se
|
||||||
}
|
}
|
||||||
f, err := h.ost.ReadObject(logPath)
|
f, err := h.ost.ReadObject(logPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ostypes.ErrNotExist {
|
if err == objectstorage.ErrNotExist {
|
||||||
return common.NewErrNotExist(err), true
|
return common.NewErrNotExist(err), true
|
||||||
}
|
}
|
||||||
return err, true
|
return err, true
|
||||||
|
|
|
@ -25,7 +25,6 @@ import (
|
||||||
|
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
"agola.io/agola/internal/objectstorage"
|
"agola.io/agola/internal/objectstorage"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
|
||||||
"agola.io/agola/internal/services/runservice/action"
|
"agola.io/agola/internal/services/runservice/action"
|
||||||
"agola.io/agola/internal/services/runservice/common"
|
"agola.io/agola/internal/services/runservice/common"
|
||||||
"agola.io/agola/internal/services/runservice/store"
|
"agola.io/agola/internal/services/runservice/store"
|
||||||
|
@ -249,7 +248,7 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error
|
||||||
archivePath := store.OSTRunTaskArchivePath(rtID, step)
|
archivePath := store.OSTRunTaskArchivePath(rtID, step)
|
||||||
f, err := h.ost.ReadObject(archivePath)
|
f, err := h.ost.ReadObject(archivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ostypes.ErrNotExist {
|
if err == objectstorage.ErrNotExist {
|
||||||
return common.NewErrNotExist(err)
|
return common.NewErrNotExist(err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
@ -326,7 +325,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string,
|
||||||
defer close(doneCh)
|
defer close(doneCh)
|
||||||
|
|
||||||
// get the latest modified object
|
// get the latest modified object
|
||||||
var lastObject *ostypes.ObjectInfo
|
var lastObject *objectstorage.ObjectInfo
|
||||||
for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) {
|
for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) {
|
||||||
if object.Err != nil {
|
if object.Err != nil {
|
||||||
return "", object.Err
|
return "", object.Err
|
||||||
|
@ -345,7 +344,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string,
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := ost.Stat(cachePath)
|
_, err := ost.Stat(cachePath)
|
||||||
if err == ostypes.ErrNotExist {
|
if err == objectstorage.ErrNotExist {
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -358,7 +357,7 @@ func (h *CacheHandler) readCache(key string, w io.Writer) error {
|
||||||
cachePath := store.OSTCachePath(key)
|
cachePath := store.OSTCachePath(key)
|
||||||
f, err := h.ost.ReadObject(cachePath)
|
f, err := h.ost.ReadObject(cachePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ostypes.ErrNotExist {
|
if err == objectstorage.ErrNotExist {
|
||||||
return common.NewErrNotExist(err)
|
return common.NewErrNotExist(err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -32,7 +32,6 @@ import (
|
||||||
"agola.io/agola/internal/db"
|
"agola.io/agola/internal/db"
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
"agola.io/agola/internal/objectstorage"
|
"agola.io/agola/internal/objectstorage"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
|
||||||
"agola.io/agola/internal/sequence"
|
"agola.io/agola/internal/sequence"
|
||||||
"agola.io/agola/internal/services/runservice/common"
|
"agola.io/agola/internal/services/runservice/common"
|
||||||
"agola.io/agola/internal/services/runservice/store"
|
"agola.io/agola/internal/services/runservice/store"
|
||||||
|
@ -668,10 +667,10 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
|
||||||
|
|
||||||
func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) {
|
func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) {
|
||||||
dumpIndex, err := r.dm.GetLastDataStatus()
|
dumpIndex, err := r.dm.GetLastDataStatus()
|
||||||
if err != nil && err != ostypes.ErrNotExist {
|
if err != nil && err != objectstorage.ErrNotExist {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if err == ostypes.ErrNotExist {
|
if err == objectstorage.ErrNotExist {
|
||||||
return "", nil
|
return "", nil
|
||||||
}
|
}
|
||||||
for dataType, files := range dumpIndex.Files {
|
for dataType, files := range dumpIndex.Files {
|
||||||
|
|
|
@ -26,7 +26,7 @@ import (
|
||||||
"agola.io/agola/internal/datamanager"
|
"agola.io/agola/internal/datamanager"
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
slog "agola.io/agola/internal/log"
|
slog "agola.io/agola/internal/log"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
"agola.io/agola/internal/objectstorage"
|
||||||
"agola.io/agola/internal/runconfig"
|
"agola.io/agola/internal/runconfig"
|
||||||
"agola.io/agola/internal/services/runservice/common"
|
"agola.io/agola/internal/services/runservice/common"
|
||||||
"agola.io/agola/internal/services/runservice/store"
|
"agola.io/agola/internal/services/runservice/store"
|
||||||
|
@ -865,7 +865,7 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error {
|
||||||
|
|
||||||
func (s *Runservice) OSTFileExists(path string) (bool, error) {
|
func (s *Runservice) OSTFileExists(path string) (bool, error) {
|
||||||
_, err := s.ost.Stat(path)
|
_, err := s.ost.Stat(path)
|
||||||
if err != nil && err != ostypes.ErrNotExist {
|
if err != nil && err != objectstorage.ErrNotExist {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return err == nil, nil
|
return err == nil, nil
|
||||||
|
@ -1359,7 +1359,7 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.
|
||||||
}
|
}
|
||||||
if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) {
|
if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) {
|
||||||
if err := s.ost.DeleteObject(object.Path); err != nil {
|
if err := s.ost.DeleteObject(object.Path); err != nil {
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
log.Warnf("failed to delete cache object %q: %v", object.Path, err)
|
log.Warnf("failed to delete cache object %q: %v", object.Path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1411,7 +1411,7 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv
|
||||||
}
|
}
|
||||||
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
|
if object.LastModified.Add(workspaceExpireInterval).Before(time.Now()) {
|
||||||
if err := s.ost.DeleteObject(object.Path); err != nil {
|
if err := s.ost.DeleteObject(object.Path); err != nil {
|
||||||
if err != ostypes.ErrNotExist {
|
if err != objectstorage.ErrNotExist {
|
||||||
log.Warnf("failed to delete workspace object %q: %v", object.Path, err)
|
log.Warnf("failed to delete workspace object %q: %v", object.Path, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ import (
|
||||||
|
|
||||||
"agola.io/agola/internal/datamanager"
|
"agola.io/agola/internal/datamanager"
|
||||||
"agola.io/agola/internal/etcd"
|
"agola.io/agola/internal/etcd"
|
||||||
ostypes "agola.io/agola/internal/objectstorage/types"
|
"agola.io/agola/internal/objectstorage"
|
||||||
"agola.io/agola/internal/services/runservice/common"
|
"agola.io/agola/internal/services/runservice/common"
|
||||||
"agola.io/agola/internal/util"
|
"agola.io/agola/internal/util"
|
||||||
"agola.io/agola/services/runservice/types"
|
"agola.io/agola/services/runservice/types"
|
||||||
|
@ -504,7 +504,7 @@ func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, dm *datamanager.DataMan
|
||||||
}
|
}
|
||||||
if r == nil {
|
if r == nil {
|
||||||
r, err = OSTGetRun(dm, runID)
|
r, err = OSTGetRun(dm, runID)
|
||||||
if err != nil && err != ostypes.ErrNotExist {
|
if err != nil && err != objectstorage.ErrNotExist {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue