diff --git a/go.mod b/go.mod index 06d0c6b..e4b29f3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,9 @@ module github.com/sorintlab/agola require ( + github.com/go-ini/ini v1.42.0 // indirect + github.com/minio/minio-go v6.0.14+incompatible + github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/pkg/errors v0.8.0 go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25 go.uber.org/zap v1.9.1 diff --git a/go.sum b/go.sum index 41a809c..6aac979 100644 --- a/go.sum +++ b/go.sum @@ -20,6 +20,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-ini/ini v1.42.0 h1:TWr1wGj35+UiWHlBA8er89seFXxzwFn11spilrrj+38= +github.com/go-ini/ini v1.42.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/gogo/protobuf v1.0.0 h1:2jyBKDKU/8v3v2xVR2PtiWQviFUyiaGk2rpfyFT8rTM= github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= @@ -55,6 +57,10 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc= github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o= +github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go new file mode 100644 index 0000000..a9dbce4 --- /dev/null +++ b/internal/objectstorage/objectstorage.go @@ -0,0 +1,63 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectstorage + +import ( + "errors" + "io" +) + +// TODO(sgotti) +// define common errors (like notFound) so the implementations will return them +// instead of their own errors + +var ErrNotExist = errors.New("does not exist") + +type Storage interface { + Stat(filepath string) (*ObjectInfo, error) + ReadObject(filepath string) (io.ReadCloser, error) + WriteObject(filepath string, data io.Reader) error + DeleteObject(filepath string) error + List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo +} + +type ObjectInfo struct { + Path string + + Err error +} + +// ObjStorage wraps a Storage providing additional helper functions +type ObjStorage struct { + Storage + delimiter string +} + +func NewObjStorage(s Storage, delimiter string) *ObjStorage { + return &ObjStorage{Storage: s, delimiter: delimiter} +} + +func (s *ObjStorage) Delimiter() string { + return s.delimiter +} + +func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo { + delimiter := s.delimiter + if recursive { + delimiter = "" + } + + return s.Storage.List(prefix, startWith, delimiter, doneCh) +} diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go new file mode 100644 index 0000000..8590cf1 --- /dev/null +++ b/internal/objectstorage/objectstorage_test.go @@ -0,0 +1,286 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectstorage + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" + "strings" + "testing" +) + +func TestList(t *testing.T) { + dir, err := ioutil.TempDir("", "objectstorage") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + ls, err := NewPosixStorage(dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + var s3 *S3Storage + minioEndpoint := os.Getenv("MINIO_ENDPOINT") + minioAccessKey := os.Getenv("MINIO_ACCESSKEY") + minioSecretKey := os.Getenv("MINIO_SECRETKEY") + if minioEndpoint == "" { + t.Logf("missing MINIO_ENDPOINT env, skipping tests with minio storage") + } else { + var err error + s3, err = NewS3Storage(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false) + if err != nil { + t.Fatalf("err: %v", err) + } + } + + type listop struct { + prefix string + start string + recursive bool + expected []string + } + tests := []struct { + s map[string]Storage + objects []string + ops []listop + }{ + { + map[string]Storage{"local": ls}, + []string{ + // Minio (as of 20190201) IMHO is not real S3 since it tries to map to a + // file system and not a flat namespace like S3. For this reason this test + // won't work with minio beacuse it creates a file called "path/of" and so + // it's not possible to create a file "path/of/a" because it needs "of" to + // be a directory + + // All of the below tests will fail on Minio due to the above reason and also the multiple '/' + // so we aren't testing these with it + + //"path/of", + //"path/of/a/file02", + //"path/of/a/file03", + //"path/of/a/file04", + //"path/of/a/file05", + + // These are multiple of 8 chars on purpose to test the filemarker behavior to + // distinguish between a file or a directory when the files ends at the path + // separator point + "s3/is/not/a/file///system/file01", + "s3/is/not/a/file///system/file02", + "s3/is/not/a/file///system/file03", + "s3/is/not/a/file///system/file04", + "s3/is/not/a/file///system/file041", + "s3/is/not/a/file///system/file042", + "s3/is/not/a/file///system/file042/", + "s3/is/not/a/file///system/file042/a", + "s3/is/not/a/file///system/file04/a", + "s3/is/not/a/file///system/file04/b", + "s3/is/not/a/file///system/file05", + "s3/is/not/a/file///system/file01a", + "s3/is/not/a/file///system/file01b", + "s3/is/not/a/file///system/file01/", + "s3/is/not/a/file///system/file01/a", + "s3/is/not/a/file///system/file01/b", + }, + []listop{ + { + prefix: "/s3/", + start: "/s3/is/not/a/file///system/file02", + recursive: true, + expected: []string{ + "s3/is/not/a/file///system/file03", + "s3/is/not/a/file///system/file04", + "s3/is/not/a/file///system/file04/a", + "s3/is/not/a/file///system/file04/b", + "s3/is/not/a/file///system/file041", + "s3/is/not/a/file///system/file042", + "s3/is/not/a/file///system/file042/", + "s3/is/not/a/file///system/file042/a", + "s3/is/not/a/file///system/file05", + }, + }, + { + prefix: "s3", + start: "s3/is/not/a/file///system/file02", + recursive: true, + expected: []string{ + "s3/is/not/a/file///system/file03", + "s3/is/not/a/file///system/file04", + "s3/is/not/a/file///system/file04/a", + "s3/is/not/a/file///system/file04/b", + "s3/is/not/a/file///system/file041", + "s3/is/not/a/file///system/file042", + "s3/is/not/a/file///system/file042/", + "s3/is/not/a/file///system/file042/a", + "s3/is/not/a/file///system/file05", + }, + }, + { + prefix: "s3/is/not/a/file///system/", + recursive: false, + expected: []string{ + "s3/is/not/a/file///system/file01", + "s3/is/not/a/file///system/file01a", + "s3/is/not/a/file///system/file01b", + "s3/is/not/a/file///system/file02", + "s3/is/not/a/file///system/file03", + "s3/is/not/a/file///system/file04", + "s3/is/not/a/file///system/file041", + "s3/is/not/a/file///system/file042", + "s3/is/not/a/file///system/file05", + }, + }, + { + prefix: "s3/is/not/a/file///system/", + recursive: true, + expected: []string{ + "s3/is/not/a/file///system/file01", + "s3/is/not/a/file///system/file01/", + "s3/is/not/a/file///system/file01/a", + "s3/is/not/a/file///system/file01/b", + "s3/is/not/a/file///system/file01a", + "s3/is/not/a/file///system/file01b", + "s3/is/not/a/file///system/file02", + "s3/is/not/a/file///system/file03", + "s3/is/not/a/file///system/file04", + "s3/is/not/a/file///system/file04/a", + "s3/is/not/a/file///system/file04/b", + "s3/is/not/a/file///system/file041", + "s3/is/not/a/file///system/file042", + "s3/is/not/a/file///system/file042/", + "s3/is/not/a/file///system/file042/a", + "s3/is/not/a/file///system/file05", + }, + }, + }, + }, + { + map[string]Storage{"local": ls, "minio": s3}, + []string{ + // These are multiple of 8 chars on purpose to test the filemarker behavior to + // distinguish between a file or a directory when the files ends at the path + // separator point + "s3/is/not/a/file/sy/st/em/file01", + "s3/is/not/a/file/sy/st/em/file02", + "s3/is/not/a/file/sy/st/em/file03", + "s3/is/not/a/file/sy/st/em/file05", + "s3/is/not/a/file/sy/st/em/file01a", + "s3/is/not/a/file/sy/st/em/file01b", + "s3/is/not/a/file/sy/st/em/file04/a", + "s3/is/not/a/file/sy/st/em/file04/b", + "s3/is/not/a/file/sy/st/em/file041", + "s3/is/not/a/file/sy/st/em/file042/a", + }, + []listop{ + { + prefix: "/s3/", + start: "/s3/is/not/a/file/sy/st/em/file02", + recursive: true, + expected: []string{ + "s3/is/not/a/file/sy/st/em/file03", + "s3/is/not/a/file/sy/st/em/file04/a", + "s3/is/not/a/file/sy/st/em/file04/b", + "s3/is/not/a/file/sy/st/em/file041", + "s3/is/not/a/file/sy/st/em/file042/a", + "s3/is/not/a/file/sy/st/em/file05", + }, + }, + { + prefix: "s3", + start: "s3/is/not/a/file/sy/st/em/file02", + recursive: true, + expected: []string{ + "s3/is/not/a/file/sy/st/em/file03", + "s3/is/not/a/file/sy/st/em/file04/a", + "s3/is/not/a/file/sy/st/em/file04/b", + "s3/is/not/a/file/sy/st/em/file041", + "s3/is/not/a/file/sy/st/em/file042/a", + "s3/is/not/a/file/sy/st/em/file05", + }, + }, + { + prefix: "s3/is/not/a/file/sy/st/em/", + recursive: false, + expected: []string{ + "s3/is/not/a/file/sy/st/em/file01", + "s3/is/not/a/file/sy/st/em/file01a", + "s3/is/not/a/file/sy/st/em/file01b", + "s3/is/not/a/file/sy/st/em/file02", + "s3/is/not/a/file/sy/st/em/file03", + "s3/is/not/a/file/sy/st/em/file041", + "s3/is/not/a/file/sy/st/em/file05", + }, + }, + { + prefix: "s3/is/not/a/file/sy/st/em/", + recursive: true, + expected: []string{ + "s3/is/not/a/file/sy/st/em/file01", + "s3/is/not/a/file/sy/st/em/file01a", + "s3/is/not/a/file/sy/st/em/file01b", + "s3/is/not/a/file/sy/st/em/file02", + "s3/is/not/a/file/sy/st/em/file03", + "s3/is/not/a/file/sy/st/em/file04/a", + "s3/is/not/a/file/sy/st/em/file04/b", + "s3/is/not/a/file/sy/st/em/file041", + "s3/is/not/a/file/sy/st/em/file042/a", + "s3/is/not/a/file/sy/st/em/file05", + }, + }, + }, + }, + } + + for i, tt := range tests { + for sname, s := range tt.s { + t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) { + switch s := s.(type) { + case *S3Storage: + if s == nil { + t.SkipNow() + } + } + os := NewObjStorage(s, "/") + // populate + for _, p := range tt.objects { + if err := os.WriteObject(p, strings.NewReader("")); err != nil { + t.Fatalf("%s %d err: %v", sname, i, err) + } + } + + doneCh := make(chan struct{}) + defer close(doneCh) + for j, op := range tt.ops { + paths := []string{} + for object := range os.List(op.prefix, op.start, op.recursive, doneCh) { + if object.Err != nil { + t.Fatalf("%s %d-%d err: %v", sname, i, j, object.Err) + return + } + paths = append(paths, object.Path) + } + if !reflect.DeepEqual(op.expected, paths) { + t.Errorf("%s %d-%d expected paths %v got %v", sname, i, j, op.expected, paths) + } + } + }) + } + } +} diff --git a/internal/objectstorage/posix.go b/internal/objectstorage/posix.go new file mode 100644 index 0000000..243c809 --- /dev/null +++ b/internal/objectstorage/posix.go @@ -0,0 +1,490 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectstorage + +import ( + "io" + "io/ioutil" + "os" + "path" + "path/filepath" + "strconv" + "strings" + "unicode/utf8" + + "github.com/pkg/errors" +) + +const ( + dataDirName = "data" + tmpDirName = "tmp" + splitLength = 8 +) + +func shouldEscape(c rune) bool { + return c == '/' || c == '%' +} + +// escape does percent encoding to '/' and adds a slash every 8 (of the original +// string) chars +func escape(s string) string { + sepCount, hexCount := 0, 0 + nc := 0 + for _, c := range s { + nc++ + if shouldEscape(c) { + hexCount++ + } + if nc%splitLength == 0 { + sepCount++ + } + } + + if sepCount == 0 && hexCount == 0 { + return s + } + + hasFileMarker := nc%splitLength == 0 + l := len(s) + sepCount + 2*hexCount + // if the string length is a multiple of 8 then we have to add a file marker + // ".f" to not ovverride a possible directory in our fs representation + if hasFileMarker { + l++ + } + + t := make([]byte, l) + j := 0 + nc = 0 + for _, c := range s { + nc++ + switch { + case shouldEscape(c): + t[j] = '%' + t[j+1] = "0123456789ABCDEF"[c>>4] + t[j+2] = "0123456789ABCDEF"[c&15] + j += 3 + default: + s := string(c) + for i := 0; i < len(s); i++ { + t[j] = s[i] + j++ + } + } + if nc%splitLength == 0 { + t[j] = '/' + j++ + } + } + + // add file marker + if hasFileMarker { + t[j-1] = '.' + t[j] = 'f' + } + + return string(t) +} + +func ishex(c byte) bool { + switch { + case '0' <= c && c <= '9': + return true + case 'a' <= c && c <= 'f': + return true + case 'A' <= c && c <= 'F': + return true + } + return false +} + +func unhex(c byte) byte { + switch { + case '0' <= c && c <= '9': + return c - '0' + case 'a' <= c && c <= 'f': + return c - 'a' + 10 + case 'A' <= c && c <= 'F': + return c - 'A' + 10 + } + return 0 +} + +type EscapeError string + +func (e EscapeError) Error() string { + return "invalid URL escape " + strconv.Quote(string(e)) +} + +func unescape(s string) (string, bool, error) { + // number of percent encoded + n := 0 + // number of slashes + ns := 0 + // number of char in the unescaped string + nc := 0 + + for i := 0; i < len(s); { + r, width := utf8.DecodeRuneInString(s[i:]) + if r == utf8.RuneError { + return "", false, errors.Errorf("bad UTF-8 string") + } + switch r { + case '%': + n++ + if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) { + s = s[i:] + if len(s) > 3 { + s = s[:3] + } + return "", false, EscapeError(s) + } + i += 3 + nc++ + case '/': + ns++ + if nc%splitLength != 0 { + return "", false, EscapeError(s) + } + i++ + default: + i += width + nc++ + } + } + + // check and remove trailing file marker + hasFileMarker := false + if nc > splitLength && nc%splitLength == 2 && s[len(s)-2:] == ".f" { + hasFileMarker = true + s = s[:len(s)-2] + nc -= 2 + } + + if n == 0 && ns == 0 { + return s, hasFileMarker, nil + } + + // destination string is + // the length of the escaped one (with the ending file marker already removed) - number of percent * 2 - number os slashes + t := make([]byte, len(s)-n*2-ns) + j := 0 + for i := 0; i < len(s); { + r, width := utf8.DecodeRuneInString(s[i:]) + if r == utf8.RuneError { + return "", false, errors.Errorf("bad UTF-8 string") + } + switch r { + case '%': + t[j] = unhex(s[i+1])<<4 | unhex(s[i+2]) + j++ + i += 3 + case '/': + // skip "/" + i++ + default: + for k := 0; k < width; k++ { + t[j] = s[i] + j++ + i++ + } + } + } + return string(t), hasFileMarker, nil +} + +type PosixStorage struct { + dataDir string + tmpDir string +} + +func NewPosixStorage(baseDir string) (*PosixStorage, error) { + if err := os.MkdirAll(baseDir, 0770); err != nil { + return nil, err + } + dataDir := filepath.Join(baseDir, dataDirName) + tmpDir := filepath.Join(baseDir, tmpDirName) + if err := os.MkdirAll(dataDir, 0770); err != nil { + return nil, errors.Wrapf(err, "failed to create data dir") + } + if err := os.MkdirAll(tmpDir, 0770); err != nil { + return nil, errors.Wrapf(err, "failed to create tmp dir") + } + return &PosixStorage{ + dataDir: dataDir, + tmpDir: tmpDir, + }, nil +} + +func (s *PosixStorage) fsPath(p string) (string, error) { + if p == "" { + return "", errors.Errorf("empty key name") + } + return filepath.Join(s.dataDir, escape(p)), nil +} + +func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { + fspath, err := s.fsPath(p) + if err != nil { + return nil, err + } + + if _, err := os.Stat(fspath); err != nil { + if os.IsNotExist(err) { + return nil, ErrNotExist + } + return nil, err + } + + return &ObjectInfo{Path: p}, nil +} + +func (s *PosixStorage) ReadObject(p string) (io.ReadCloser, error) { + fspath, err := s.fsPath(p) + if err != nil { + return nil, err + } + + f, err := os.Open(fspath) + if err != nil && os.IsNotExist(err) { + return nil, ErrNotExist + } + return f, err +} + +func (s *PosixStorage) WriteObject(p string, data io.Reader) error { + fspath, err := s.fsPath(p) + if err != nil { + return err + } + + if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil { + return err + } + return s.WriteFileAtomicFunc(fspath, 0660, func(f io.Writer) error { + _, err := io.Copy(f, data) + return err + }) +} + +func (s *PosixStorage) DeleteObject(p string) error { + fspath, err := s.fsPath(p) + if err != nil { + return err + } + + if err := os.Remove(fspath); err != nil { + if os.IsNotExist(err) { + return ErrNotExist + } + return err + } + + // try to remove parent empty dirs + // TODO(sgotti) if this fails we ignore errors and the dirs will be left as + // empty, clean them asynchronously + pdir := filepath.Dir(fspath) + for { + if pdir == s.dataDir || !strings.HasPrefix(pdir, s.dataDir) { + break + } + f, err := os.Open(pdir) + if err != nil { + return nil + } + + _, err = f.Readdirnames(1) + if err == io.EOF { + f.Close() + if err := os.Remove(pdir); err != nil { + return nil + } + } else { + f.Close() + break + } + + pdir = filepath.Dir(pdir) + } + return nil +} + +func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) + + if len(delimiter) > 1 { + objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)} + return objectCh + } + + if startWith != "" && !strings.Contains(startWith, prefix) { + objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)} + return objectCh + } + + recursive := delimiter == "" + + // remove leading slash from prefix + if strings.HasPrefix(prefix, "/") { + prefix = strings.TrimPrefix(prefix, "/") + } + + fprefix := filepath.Join(s.dataDir, escape(prefix)) + root := filepath.Dir(fprefix) + if len(root) < len(s.dataDir) { + root = s.dataDir + } + + // remove leading slash + if strings.HasPrefix(startWith, "/") { + startWith = strings.TrimPrefix(startWith, "/") + } + + go func(objectCh chan<- ObjectInfo) { + var prevp string + defer close(objectCh) + err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error { + if err != nil && !os.IsNotExist(err) { + return err + } + if os.IsNotExist(err) { + return nil + } + p := ep + + // get the path with / separator + p = filepath.ToSlash(p) + + p, err = filepath.Rel(s.dataDir, p) + if err != nil { + return err + } + p, _, err = unescape(p) + if err != nil { + return err + } + if !recursive && len(p) > len(prefix) { + rel := strings.TrimPrefix(p, prefix) + skip := strings.Contains(rel, delimiter) + + if info.IsDir() && skip { + return filepath.SkipDir + } + if skip { + return nil + } + } + + // don't list dirs if there's not a file with the same name (with filemarker) + // it's not an issue if the file in the meantime has been removed, it won't + // just be listed + hasFile := true + _, err = os.Stat(ep + ".f") + if err != nil && !os.IsNotExist(err) { + return err + } + if os.IsNotExist(err) { + hasFile = false + } + if info.IsDir() && !hasFile { + return nil + } + + if strings.HasPrefix(p, prefix) && p > startWith { + // skip keys smaller than the previously returned one. This happens when we + // receive a file with a file marker that we already returned previously + // when we received a dir with the same name + // it'not an issue if the dir has been removed since we already returned the file + if p > prevp { + select { + // Send object content. + case objectCh <- ObjectInfo{Path: p}: + // If receives done from the caller, return here. + case <-doneCh: + return io.EOF + } + } + prevp = p + } + + return nil + }) + if err != nil && err != io.EOF { + objectCh <- ObjectInfo{ + Err: err, + } + return + } + }(objectCh) + + return objectCh +} + +// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a +// temporary file and then moving it. writeFunc is the func that will write +// data to the file. +// TODO(sgotti) remove left over tmp files if process crashes before calling +// os.Remove +func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc func(f io.Writer) error) error { + f, err := ioutil.TempFile(s.tmpDir, "tmpfile") + if err != nil { + return err + } + err = writeFunc(f) + if err == nil { + err = f.Sync() + } + if closeErr := f.Close(); err == nil { + err = closeErr + } + if permErr := os.Chmod(f.Name(), perm); err == nil { + err = permErr + } + if err == nil { + err = os.Rename(f.Name(), p) + } + if err != nil { + os.Remove(f.Name()) + return err + } + + // sync parent dirs + pdir := filepath.Dir(p) + for { + if !strings.HasPrefix(pdir, s.dataDir) { + break + } + f, err := os.Open(pdir) + if err != nil { + f.Close() + return nil + } + if err := f.Sync(); err != nil { + f.Close() + return nil + } + f.Close() + + pdir = filepath.Dir(pdir) + } + return nil +} + +func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, data []byte) error { + return s.WriteFileAtomicFunc(filename, perm, + func(f io.Writer) error { + _, err := f.Write(data) + return err + }) +} diff --git a/internal/objectstorage/posix_test.go b/internal/objectstorage/posix_test.go new file mode 100644 index 0000000..fe256e4 --- /dev/null +++ b/internal/objectstorage/posix_test.go @@ -0,0 +1,115 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectstorage + +import ( + "bytes" + "io/ioutil" + "os" + "path/filepath" + "testing" +) + +func TestEscapeUnescape(t *testing.T) { + tests := []struct { + in string + expected string + err error + }{ + {"", "", nil}, + {"/", "%2F", nil}, + {"//", "%2F%2F", nil}, + {"☺", "☺", nil}, + {"☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺.f", nil}, + {"☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺.f", nil}, + {"☺☺☺☺☺☺☺☺☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺/☺☺☺☺☺☺☺☺.f", nil}, + {"☺☺☺☺a☺☺☺☺☺☺☺☺☺☺☺", "☺☺☺☺a☺☺☺/☺☺☺☺☺☺☺☺.f", nil}, + {"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "☺☺☺☺a☺☺☺/☺☺☺b☺☺☺☺.f", nil}, + {"⌘", "⌘", nil}, + {"⌘⌘⌘⌘⌘⌘⌘⌘⌘⌘⌘", "⌘⌘⌘⌘⌘⌘⌘⌘/⌘⌘⌘", nil}, + + // These are 16 chars on purpose to test the filemarker behavior + {"s3/is/not/a/file", "s3%2Fis%2Fno/t%2Fa%2Ffile.f", nil}, + {"s3/is/nota/file/", "s3%2Fis%2Fno/ta%2Ffile%2F.f", nil}, + {"s3/is/nota/files", "s3%2Fis%2Fno/ta%2Ffiles.f", nil}, + {"s3/is/nota/fil.f", "s3%2Fis%2Fno/ta%2Ffil.f.f", nil}, + + {"s3/is/nota/fil.fa", "s3%2Fis%2Fno/ta%2Ffil.f/a", nil}, + {"/s3/is/nota/fil.fa/", "%2Fs3%2Fis%2Fn/ota%2Ffil./fa%2F", nil}, + {"s3/is/not/a/file///system/fi%l%%e01", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffi%25l%25%25/e01", nil}, + {"s3/is/not/a/file///system/file01", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01.f", nil}, + {"s3/is/not/a/file///system/file01/", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01/%2F", nil}, + {"s3/is/not/a/file///system/file01/a", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01/%2Fa", nil}, + } + + for i, tt := range tests { + out := escape(tt.in) + if out != tt.expected { + t.Errorf("%d: escape: expected %q got %q", i, tt.expected, out) + } + + unescaped, _, err := unescape(out) + if err != nil { + if tt.err == nil { + t.Errorf("%d: unescape: expected no error got %v", i, err) + } else if tt.err != err { + t.Errorf("%d: unescape: expected error %v got %v", i, tt.err, err) + } + } else { + if tt.err != nil { + t.Errorf("%d: unescape: expected error %v got no error", i, tt.err) + } else if unescaped != tt.in { + t.Errorf("%d: unescape: expected %q got %q", i, tt.in, unescaped) + } + } + } +} + +func TestDeleteObject(t *testing.T) { + objects := []string{"/", "//", "☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"} + + dir, err := ioutil.TempDir("", "objectstorage") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + //defer os.RemoveAll(dir) + + ls, err := NewPosixStorage(dir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + for _, obj := range objects { + if err := ls.WriteObject(obj, bytes.NewReader([]byte{})); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := ls.DeleteObject(obj); err != nil { + t.Fatalf("unexpected err: %v", err) + } + } + + // no files and directories should be left + bd, err := os.Open(filepath.Join(dir, dataDirName)) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + files, err := bd.Readdirnames(0) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if len(files) > 0 { + t.Fatalf("expected 0 files got %d files", len(files)) + } +} diff --git a/internal/objectstorage/s3.go b/internal/objectstorage/s3.go new file mode 100644 index 0000000..6ec4455 --- /dev/null +++ b/internal/objectstorage/s3.go @@ -0,0 +1,167 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package objectstorage + +import ( + "io" + "io/ioutil" + "net/http" + "os" + "strings" + + minio "github.com/minio/minio-go" + "github.com/pkg/errors" +) + +type S3Storage struct { + bucket string + minioClient *minio.Client + // minio core client user for low level api + minioCore *minio.Core +} + +func NewS3Storage(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) { + minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure) + if err != nil { + return nil, err + } + + minioCore, err := minio.NewCore(endpoint, accessKeyID, secretAccessKey, secure) + if err != nil { + return nil, err + } + + exists, err := minioClient.BucketExists(bucket) + if err != nil { + return nil, errors.Wrapf(err, "cannot check if bucket %q in location %q exits", bucket, location) + } + if !exists { + if err := minioClient.MakeBucket(bucket, location); err != nil { + return nil, errors.Wrapf(err, "cannot create bucket %q in location %q", bucket, location) + } + } + + return &S3Storage{ + bucket: bucket, + minioClient: minioClient, + minioCore: minioCore, + }, nil +} + +func (s *S3Storage) Stat(p string) (*ObjectInfo, error) { + if _, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{}); err != nil { + merr := minio.ToErrorResponse(err) + if merr.StatusCode == http.StatusNotFound { + return nil, ErrNotExist + } + return nil, merr + } + + return &ObjectInfo{Path: p}, nil +} + +func (s *S3Storage) ReadObject(filepath string) (io.ReadCloser, error) { + if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil { + merr := minio.ToErrorResponse(err) + if merr.StatusCode == http.StatusNotFound { + return nil, ErrNotExist + } + return nil, merr + } + return s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{}) +} + +func (s *S3Storage) WriteObject(filepath string, data io.Reader) error { + // hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes + // TODO(sgotti) wait for minio client to expose an api to provide the max part size so we can remove this + tmpfile, err := ioutil.TempFile(os.TempDir(), "s3") + if err != nil { + return err + } + defer tmpfile.Close() + defer os.Remove(tmpfile.Name()) + size, err := io.Copy(tmpfile, data) + if err != nil { + return err + } + if _, err := tmpfile.Seek(0, 0); err != nil { + return err + } + _, err = s.minioClient.PutObject(s.bucket, filepath, tmpfile, size, minio.PutObjectOptions{ContentType: "application/octet-stream"}) + return err +} + +func (s *S3Storage) DeleteObject(filepath string) error { + return s.minioClient.RemoveObject(s.bucket, filepath) +} + +func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo { + objectCh := make(chan ObjectInfo, 1) + + if len(delimiter) > 1 { + objectCh <- ObjectInfo{ + Err: errors.Errorf("wrong delimiter %q", delimiter), + } + return objectCh + } + + // remove leading slash + if strings.HasPrefix(prefix, "/") { + prefix = strings.TrimPrefix(prefix, "/") + } + if strings.HasPrefix(startWith, "/") { + startWith = strings.TrimPrefix(startWith, "/") + } + + // Initiate list objects goroutine here. + go func(objectCh chan<- ObjectInfo) { + defer close(objectCh) + // Save continuationToken for next request. + var continuationToken string + for { + // Get list of objects a maximum of 1000 per request. + result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith) + if err != nil { + objectCh <- ObjectInfo{ + Err: err, + } + return + } + + // If contents are available loop through and send over channel. + for _, object := range result.Contents { + select { + // Send object content. + case objectCh <- ObjectInfo{Path: object.Key}: + // If receives done from the caller, return here. + case <-doneCh: + return + } + } + + // If continuation token present, save it for next request. + if result.NextContinuationToken != "" { + continuationToken = result.NextContinuationToken + } + + // Listing ends result is not truncated, return right here. + if !result.IsTruncated { + return + } + } + }(objectCh) + + return objectCh +}