Add initial objectstorage implementation

This commit is contained in:
Simone Gotti 2019-02-21 16:00:48 +01:00
parent fb022c5992
commit 7aae1d3e1b
7 changed files with 1130 additions and 0 deletions

3
go.mod
View File

@ -1,6 +1,9 @@
module github.com/sorintlab/agola
require (
github.com/go-ini/ini v1.42.0 // indirect
github.com/minio/minio-go v6.0.14+incompatible
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pkg/errors v0.8.0
go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25
go.uber.org/zap v1.9.1

6
go.sum
View File

@ -20,6 +20,8 @@ github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-ini/ini v1.42.0 h1:TWr1wGj35+UiWHlBA8er89seFXxzwFn11spilrrj+38=
github.com/go-ini/ini v1.42.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/gogo/protobuf v1.0.0 h1:2jyBKDKU/8v3v2xVR2PtiWQviFUyiaGk2rpfyFT8rTM=
github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
@ -55,6 +57,10 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc=
github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o=
github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=

View File

@ -0,0 +1,63 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package objectstorage
import (
"errors"
"io"
)
// TODO(sgotti)
// define common errors (like notFound) so the implementations will return them
// instead of their own errors
var ErrNotExist = errors.New("does not exist")
type Storage interface {
Stat(filepath string) (*ObjectInfo, error)
ReadObject(filepath string) (io.ReadCloser, error)
WriteObject(filepath string, data io.Reader) error
DeleteObject(filepath string) error
List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo
}
type ObjectInfo struct {
Path string
Err error
}
// ObjStorage wraps a Storage providing additional helper functions
type ObjStorage struct {
Storage
delimiter string
}
func NewObjStorage(s Storage, delimiter string) *ObjStorage {
return &ObjStorage{Storage: s, delimiter: delimiter}
}
func (s *ObjStorage) Delimiter() string {
return s.delimiter
}
func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo {
delimiter := s.delimiter
if recursive {
delimiter = ""
}
return s.Storage.List(prefix, startWith, delimiter, doneCh)
}

View File

@ -0,0 +1,286 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package objectstorage
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
)
func TestList(t *testing.T) {
dir, err := ioutil.TempDir("", "objectstorage")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
ls, err := NewPosixStorage(dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
var s3 *S3Storage
minioEndpoint := os.Getenv("MINIO_ENDPOINT")
minioAccessKey := os.Getenv("MINIO_ACCESSKEY")
minioSecretKey := os.Getenv("MINIO_SECRETKEY")
if minioEndpoint == "" {
t.Logf("missing MINIO_ENDPOINT env, skipping tests with minio storage")
} else {
var err error
s3, err = NewS3Storage(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false)
if err != nil {
t.Fatalf("err: %v", err)
}
}
type listop struct {
prefix string
start string
recursive bool
expected []string
}
tests := []struct {
s map[string]Storage
objects []string
ops []listop
}{
{
map[string]Storage{"local": ls},
[]string{
// Minio (as of 20190201) IMHO is not real S3 since it tries to map to a
// file system and not a flat namespace like S3. For this reason this test
// won't work with minio beacuse it creates a file called "path/of" and so
// it's not possible to create a file "path/of/a" because it needs "of" to
// be a directory
// All of the below tests will fail on Minio due to the above reason and also the multiple '/'
// so we aren't testing these with it
//"path/of",
//"path/of/a/file02",
//"path/of/a/file03",
//"path/of/a/file04",
//"path/of/a/file05",
// These are multiple of 8 chars on purpose to test the filemarker behavior to
// distinguish between a file or a directory when the files ends at the path
// separator point
"s3/is/not/a/file///system/file01",
"s3/is/not/a/file///system/file02",
"s3/is/not/a/file///system/file03",
"s3/is/not/a/file///system/file04",
"s3/is/not/a/file///system/file041",
"s3/is/not/a/file///system/file042",
"s3/is/not/a/file///system/file042/",
"s3/is/not/a/file///system/file042/a",
"s3/is/not/a/file///system/file04/a",
"s3/is/not/a/file///system/file04/b",
"s3/is/not/a/file///system/file05",
"s3/is/not/a/file///system/file01a",
"s3/is/not/a/file///system/file01b",
"s3/is/not/a/file///system/file01/",
"s3/is/not/a/file///system/file01/a",
"s3/is/not/a/file///system/file01/b",
},
[]listop{
{
prefix: "/s3/",
start: "/s3/is/not/a/file///system/file02",
recursive: true,
expected: []string{
"s3/is/not/a/file///system/file03",
"s3/is/not/a/file///system/file04",
"s3/is/not/a/file///system/file04/a",
"s3/is/not/a/file///system/file04/b",
"s3/is/not/a/file///system/file041",
"s3/is/not/a/file///system/file042",
"s3/is/not/a/file///system/file042/",
"s3/is/not/a/file///system/file042/a",
"s3/is/not/a/file///system/file05",
},
},
{
prefix: "s3",
start: "s3/is/not/a/file///system/file02",
recursive: true,
expected: []string{
"s3/is/not/a/file///system/file03",
"s3/is/not/a/file///system/file04",
"s3/is/not/a/file///system/file04/a",
"s3/is/not/a/file///system/file04/b",
"s3/is/not/a/file///system/file041",
"s3/is/not/a/file///system/file042",
"s3/is/not/a/file///system/file042/",
"s3/is/not/a/file///system/file042/a",
"s3/is/not/a/file///system/file05",
},
},
{
prefix: "s3/is/not/a/file///system/",
recursive: false,
expected: []string{
"s3/is/not/a/file///system/file01",
"s3/is/not/a/file///system/file01a",
"s3/is/not/a/file///system/file01b",
"s3/is/not/a/file///system/file02",
"s3/is/not/a/file///system/file03",
"s3/is/not/a/file///system/file04",
"s3/is/not/a/file///system/file041",
"s3/is/not/a/file///system/file042",
"s3/is/not/a/file///system/file05",
},
},
{
prefix: "s3/is/not/a/file///system/",
recursive: true,
expected: []string{
"s3/is/not/a/file///system/file01",
"s3/is/not/a/file///system/file01/",
"s3/is/not/a/file///system/file01/a",
"s3/is/not/a/file///system/file01/b",
"s3/is/not/a/file///system/file01a",
"s3/is/not/a/file///system/file01b",
"s3/is/not/a/file///system/file02",
"s3/is/not/a/file///system/file03",
"s3/is/not/a/file///system/file04",
"s3/is/not/a/file///system/file04/a",
"s3/is/not/a/file///system/file04/b",
"s3/is/not/a/file///system/file041",
"s3/is/not/a/file///system/file042",
"s3/is/not/a/file///system/file042/",
"s3/is/not/a/file///system/file042/a",
"s3/is/not/a/file///system/file05",
},
},
},
},
{
map[string]Storage{"local": ls, "minio": s3},
[]string{
// These are multiple of 8 chars on purpose to test the filemarker behavior to
// distinguish between a file or a directory when the files ends at the path
// separator point
"s3/is/not/a/file/sy/st/em/file01",
"s3/is/not/a/file/sy/st/em/file02",
"s3/is/not/a/file/sy/st/em/file03",
"s3/is/not/a/file/sy/st/em/file05",
"s3/is/not/a/file/sy/st/em/file01a",
"s3/is/not/a/file/sy/st/em/file01b",
"s3/is/not/a/file/sy/st/em/file04/a",
"s3/is/not/a/file/sy/st/em/file04/b",
"s3/is/not/a/file/sy/st/em/file041",
"s3/is/not/a/file/sy/st/em/file042/a",
},
[]listop{
{
prefix: "/s3/",
start: "/s3/is/not/a/file/sy/st/em/file02",
recursive: true,
expected: []string{
"s3/is/not/a/file/sy/st/em/file03",
"s3/is/not/a/file/sy/st/em/file04/a",
"s3/is/not/a/file/sy/st/em/file04/b",
"s3/is/not/a/file/sy/st/em/file041",
"s3/is/not/a/file/sy/st/em/file042/a",
"s3/is/not/a/file/sy/st/em/file05",
},
},
{
prefix: "s3",
start: "s3/is/not/a/file/sy/st/em/file02",
recursive: true,
expected: []string{
"s3/is/not/a/file/sy/st/em/file03",
"s3/is/not/a/file/sy/st/em/file04/a",
"s3/is/not/a/file/sy/st/em/file04/b",
"s3/is/not/a/file/sy/st/em/file041",
"s3/is/not/a/file/sy/st/em/file042/a",
"s3/is/not/a/file/sy/st/em/file05",
},
},
{
prefix: "s3/is/not/a/file/sy/st/em/",
recursive: false,
expected: []string{
"s3/is/not/a/file/sy/st/em/file01",
"s3/is/not/a/file/sy/st/em/file01a",
"s3/is/not/a/file/sy/st/em/file01b",
"s3/is/not/a/file/sy/st/em/file02",
"s3/is/not/a/file/sy/st/em/file03",
"s3/is/not/a/file/sy/st/em/file041",
"s3/is/not/a/file/sy/st/em/file05",
},
},
{
prefix: "s3/is/not/a/file/sy/st/em/",
recursive: true,
expected: []string{
"s3/is/not/a/file/sy/st/em/file01",
"s3/is/not/a/file/sy/st/em/file01a",
"s3/is/not/a/file/sy/st/em/file01b",
"s3/is/not/a/file/sy/st/em/file02",
"s3/is/not/a/file/sy/st/em/file03",
"s3/is/not/a/file/sy/st/em/file04/a",
"s3/is/not/a/file/sy/st/em/file04/b",
"s3/is/not/a/file/sy/st/em/file041",
"s3/is/not/a/file/sy/st/em/file042/a",
"s3/is/not/a/file/sy/st/em/file05",
},
},
},
},
}
for i, tt := range tests {
for sname, s := range tt.s {
t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) {
switch s := s.(type) {
case *S3Storage:
if s == nil {
t.SkipNow()
}
}
os := NewObjStorage(s, "/")
// populate
for _, p := range tt.objects {
if err := os.WriteObject(p, strings.NewReader("")); err != nil {
t.Fatalf("%s %d err: %v", sname, i, err)
}
}
doneCh := make(chan struct{})
defer close(doneCh)
for j, op := range tt.ops {
paths := []string{}
for object := range os.List(op.prefix, op.start, op.recursive, doneCh) {
if object.Err != nil {
t.Fatalf("%s %d-%d err: %v", sname, i, j, object.Err)
return
}
paths = append(paths, object.Path)
}
if !reflect.DeepEqual(op.expected, paths) {
t.Errorf("%s %d-%d expected paths %v got %v", sname, i, j, op.expected, paths)
}
}
})
}
}
}

View File

@ -0,0 +1,490 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package objectstorage
import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"unicode/utf8"
"github.com/pkg/errors"
)
const (
dataDirName = "data"
tmpDirName = "tmp"
splitLength = 8
)
func shouldEscape(c rune) bool {
return c == '/' || c == '%'
}
// escape does percent encoding to '/' and adds a slash every 8 (of the original
// string) chars
func escape(s string) string {
sepCount, hexCount := 0, 0
nc := 0
for _, c := range s {
nc++
if shouldEscape(c) {
hexCount++
}
if nc%splitLength == 0 {
sepCount++
}
}
if sepCount == 0 && hexCount == 0 {
return s
}
hasFileMarker := nc%splitLength == 0
l := len(s) + sepCount + 2*hexCount
// if the string length is a multiple of 8 then we have to add a file marker
// ".f" to not ovverride a possible directory in our fs representation
if hasFileMarker {
l++
}
t := make([]byte, l)
j := 0
nc = 0
for _, c := range s {
nc++
switch {
case shouldEscape(c):
t[j] = '%'
t[j+1] = "0123456789ABCDEF"[c>>4]
t[j+2] = "0123456789ABCDEF"[c&15]
j += 3
default:
s := string(c)
for i := 0; i < len(s); i++ {
t[j] = s[i]
j++
}
}
if nc%splitLength == 0 {
t[j] = '/'
j++
}
}
// add file marker
if hasFileMarker {
t[j-1] = '.'
t[j] = 'f'
}
return string(t)
}
func ishex(c byte) bool {
switch {
case '0' <= c && c <= '9':
return true
case 'a' <= c && c <= 'f':
return true
case 'A' <= c && c <= 'F':
return true
}
return false
}
func unhex(c byte) byte {
switch {
case '0' <= c && c <= '9':
return c - '0'
case 'a' <= c && c <= 'f':
return c - 'a' + 10
case 'A' <= c && c <= 'F':
return c - 'A' + 10
}
return 0
}
type EscapeError string
func (e EscapeError) Error() string {
return "invalid URL escape " + strconv.Quote(string(e))
}
func unescape(s string) (string, bool, error) {
// number of percent encoded
n := 0
// number of slashes
ns := 0
// number of char in the unescaped string
nc := 0
for i := 0; i < len(s); {
r, width := utf8.DecodeRuneInString(s[i:])
if r == utf8.RuneError {
return "", false, errors.Errorf("bad UTF-8 string")
}
switch r {
case '%':
n++
if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) {
s = s[i:]
if len(s) > 3 {
s = s[:3]
}
return "", false, EscapeError(s)
}
i += 3
nc++
case '/':
ns++
if nc%splitLength != 0 {
return "", false, EscapeError(s)
}
i++
default:
i += width
nc++
}
}
// check and remove trailing file marker
hasFileMarker := false
if nc > splitLength && nc%splitLength == 2 && s[len(s)-2:] == ".f" {
hasFileMarker = true
s = s[:len(s)-2]
nc -= 2
}
if n == 0 && ns == 0 {
return s, hasFileMarker, nil
}
// destination string is
// the length of the escaped one (with the ending file marker already removed) - number of percent * 2 - number os slashes
t := make([]byte, len(s)-n*2-ns)
j := 0
for i := 0; i < len(s); {
r, width := utf8.DecodeRuneInString(s[i:])
if r == utf8.RuneError {
return "", false, errors.Errorf("bad UTF-8 string")
}
switch r {
case '%':
t[j] = unhex(s[i+1])<<4 | unhex(s[i+2])
j++
i += 3
case '/':
// skip "/"
i++
default:
for k := 0; k < width; k++ {
t[j] = s[i]
j++
i++
}
}
}
return string(t), hasFileMarker, nil
}
type PosixStorage struct {
dataDir string
tmpDir string
}
func NewPosixStorage(baseDir string) (*PosixStorage, error) {
if err := os.MkdirAll(baseDir, 0770); err != nil {
return nil, err
}
dataDir := filepath.Join(baseDir, dataDirName)
tmpDir := filepath.Join(baseDir, tmpDirName)
if err := os.MkdirAll(dataDir, 0770); err != nil {
return nil, errors.Wrapf(err, "failed to create data dir")
}
if err := os.MkdirAll(tmpDir, 0770); err != nil {
return nil, errors.Wrapf(err, "failed to create tmp dir")
}
return &PosixStorage{
dataDir: dataDir,
tmpDir: tmpDir,
}, nil
}
func (s *PosixStorage) fsPath(p string) (string, error) {
if p == "" {
return "", errors.Errorf("empty key name")
}
return filepath.Join(s.dataDir, escape(p)), nil
}
func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) {
fspath, err := s.fsPath(p)
if err != nil {
return nil, err
}
if _, err := os.Stat(fspath); err != nil {
if os.IsNotExist(err) {
return nil, ErrNotExist
}
return nil, err
}
return &ObjectInfo{Path: p}, nil
}
func (s *PosixStorage) ReadObject(p string) (io.ReadCloser, error) {
fspath, err := s.fsPath(p)
if err != nil {
return nil, err
}
f, err := os.Open(fspath)
if err != nil && os.IsNotExist(err) {
return nil, ErrNotExist
}
return f, err
}
func (s *PosixStorage) WriteObject(p string, data io.Reader) error {
fspath, err := s.fsPath(p)
if err != nil {
return err
}
if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil {
return err
}
return s.WriteFileAtomicFunc(fspath, 0660, func(f io.Writer) error {
_, err := io.Copy(f, data)
return err
})
}
func (s *PosixStorage) DeleteObject(p string) error {
fspath, err := s.fsPath(p)
if err != nil {
return err
}
if err := os.Remove(fspath); err != nil {
if os.IsNotExist(err) {
return ErrNotExist
}
return err
}
// try to remove parent empty dirs
// TODO(sgotti) if this fails we ignore errors and the dirs will be left as
// empty, clean them asynchronously
pdir := filepath.Dir(fspath)
for {
if pdir == s.dataDir || !strings.HasPrefix(pdir, s.dataDir) {
break
}
f, err := os.Open(pdir)
if err != nil {
return nil
}
_, err = f.Readdirnames(1)
if err == io.EOF {
f.Close()
if err := os.Remove(pdir); err != nil {
return nil
}
} else {
f.Close()
break
}
pdir = filepath.Dir(pdir)
}
return nil
}
func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
objectCh := make(chan ObjectInfo, 1)
if len(delimiter) > 1 {
objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)}
return objectCh
}
if startWith != "" && !strings.Contains(startWith, prefix) {
objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)}
return objectCh
}
recursive := delimiter == ""
// remove leading slash from prefix
if strings.HasPrefix(prefix, "/") {
prefix = strings.TrimPrefix(prefix, "/")
}
fprefix := filepath.Join(s.dataDir, escape(prefix))
root := filepath.Dir(fprefix)
if len(root) < len(s.dataDir) {
root = s.dataDir
}
// remove leading slash
if strings.HasPrefix(startWith, "/") {
startWith = strings.TrimPrefix(startWith, "/")
}
go func(objectCh chan<- ObjectInfo) {
var prevp string
defer close(objectCh)
err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error {
if err != nil && !os.IsNotExist(err) {
return err
}
if os.IsNotExist(err) {
return nil
}
p := ep
// get the path with / separator
p = filepath.ToSlash(p)
p, err = filepath.Rel(s.dataDir, p)
if err != nil {
return err
}
p, _, err = unescape(p)
if err != nil {
return err
}
if !recursive && len(p) > len(prefix) {
rel := strings.TrimPrefix(p, prefix)
skip := strings.Contains(rel, delimiter)
if info.IsDir() && skip {
return filepath.SkipDir
}
if skip {
return nil
}
}
// don't list dirs if there's not a file with the same name (with filemarker)
// it's not an issue if the file in the meantime has been removed, it won't
// just be listed
hasFile := true
_, err = os.Stat(ep + ".f")
if err != nil && !os.IsNotExist(err) {
return err
}
if os.IsNotExist(err) {
hasFile = false
}
if info.IsDir() && !hasFile {
return nil
}
if strings.HasPrefix(p, prefix) && p > startWith {
// skip keys smaller than the previously returned one. This happens when we
// receive a file with a file marker that we already returned previously
// when we received a dir with the same name
// it'not an issue if the dir has been removed since we already returned the file
if p > prevp {
select {
// Send object content.
case objectCh <- ObjectInfo{Path: p}:
// If receives done from the caller, return here.
case <-doneCh:
return io.EOF
}
}
prevp = p
}
return nil
})
if err != nil && err != io.EOF {
objectCh <- ObjectInfo{
Err: err,
}
return
}
}(objectCh)
return objectCh
}
// WriteFileAtomicFunc atomically writes a file, it achieves this by creating a
// temporary file and then moving it. writeFunc is the func that will write
// data to the file.
// TODO(sgotti) remove left over tmp files if process crashes before calling
// os.Remove
func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc func(f io.Writer) error) error {
f, err := ioutil.TempFile(s.tmpDir, "tmpfile")
if err != nil {
return err
}
err = writeFunc(f)
if err == nil {
err = f.Sync()
}
if closeErr := f.Close(); err == nil {
err = closeErr
}
if permErr := os.Chmod(f.Name(), perm); err == nil {
err = permErr
}
if err == nil {
err = os.Rename(f.Name(), p)
}
if err != nil {
os.Remove(f.Name())
return err
}
// sync parent dirs
pdir := filepath.Dir(p)
for {
if !strings.HasPrefix(pdir, s.dataDir) {
break
}
f, err := os.Open(pdir)
if err != nil {
f.Close()
return nil
}
if err := f.Sync(); err != nil {
f.Close()
return nil
}
f.Close()
pdir = filepath.Dir(pdir)
}
return nil
}
func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, data []byte) error {
return s.WriteFileAtomicFunc(filename, perm,
func(f io.Writer) error {
_, err := f.Write(data)
return err
})
}

View File

@ -0,0 +1,115 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package objectstorage
import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"
)
func TestEscapeUnescape(t *testing.T) {
tests := []struct {
in string
expected string
err error
}{
{"", "", nil},
{"/", "%2F", nil},
{"//", "%2F%2F", nil},
{"☺", "☺", nil},
{"☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺.f", nil},
{"☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺.f", nil},
{"☺☺☺☺☺☺☺☺☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺/☺☺☺☺☺☺☺☺.f", nil},
{"☺☺☺☺a☺☺☺☺☺☺☺☺☺☺☺", "☺☺☺☺a☺☺☺/☺☺☺☺☺☺☺☺.f", nil},
{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "☺☺☺☺a☺☺☺/☺☺☺b☺☺☺☺.f", nil},
{"⌘", "⌘", nil},
{"⌘⌘⌘⌘⌘⌘⌘⌘⌘⌘⌘", "⌘⌘⌘⌘⌘⌘⌘⌘/⌘⌘⌘", nil},
// These are 16 chars on purpose to test the filemarker behavior
{"s3/is/not/a/file", "s3%2Fis%2Fno/t%2Fa%2Ffile.f", nil},
{"s3/is/nota/file/", "s3%2Fis%2Fno/ta%2Ffile%2F.f", nil},
{"s3/is/nota/files", "s3%2Fis%2Fno/ta%2Ffiles.f", nil},
{"s3/is/nota/fil.f", "s3%2Fis%2Fno/ta%2Ffil.f.f", nil},
{"s3/is/nota/fil.fa", "s3%2Fis%2Fno/ta%2Ffil.f/a", nil},
{"/s3/is/nota/fil.fa/", "%2Fs3%2Fis%2Fn/ota%2Ffil./fa%2F", nil},
{"s3/is/not/a/file///system/fi%l%%e01", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffi%25l%25%25/e01", nil},
{"s3/is/not/a/file///system/file01", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01.f", nil},
{"s3/is/not/a/file///system/file01/", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01/%2F", nil},
{"s3/is/not/a/file///system/file01/a", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01/%2Fa", nil},
}
for i, tt := range tests {
out := escape(tt.in)
if out != tt.expected {
t.Errorf("%d: escape: expected %q got %q", i, tt.expected, out)
}
unescaped, _, err := unescape(out)
if err != nil {
if tt.err == nil {
t.Errorf("%d: unescape: expected no error got %v", i, err)
} else if tt.err != err {
t.Errorf("%d: unescape: expected error %v got %v", i, tt.err, err)
}
} else {
if tt.err != nil {
t.Errorf("%d: unescape: expected error %v got no error", i, tt.err)
} else if unescaped != tt.in {
t.Errorf("%d: unescape: expected %q got %q", i, tt.in, unescaped)
}
}
}
}
func TestDeleteObject(t *testing.T) {
objects := []string{"/", "//", "☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"}
dir, err := ioutil.TempDir("", "objectstorage")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
//defer os.RemoveAll(dir)
ls, err := NewPosixStorage(dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
for _, obj := range objects {
if err := ls.WriteObject(obj, bytes.NewReader([]byte{})); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := ls.DeleteObject(obj); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// no files and directories should be left
bd, err := os.Open(filepath.Join(dir, dataDirName))
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
files, err := bd.Readdirnames(0)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if len(files) > 0 {
t.Fatalf("expected 0 files got %d files", len(files))
}
}

View File

@ -0,0 +1,167 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package objectstorage
import (
"io"
"io/ioutil"
"net/http"
"os"
"strings"
minio "github.com/minio/minio-go"
"github.com/pkg/errors"
)
type S3Storage struct {
bucket string
minioClient *minio.Client
// minio core client user for low level api
minioCore *minio.Core
}
func NewS3Storage(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) {
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure)
if err != nil {
return nil, err
}
minioCore, err := minio.NewCore(endpoint, accessKeyID, secretAccessKey, secure)
if err != nil {
return nil, err
}
exists, err := minioClient.BucketExists(bucket)
if err != nil {
return nil, errors.Wrapf(err, "cannot check if bucket %q in location %q exits", bucket, location)
}
if !exists {
if err := minioClient.MakeBucket(bucket, location); err != nil {
return nil, errors.Wrapf(err, "cannot create bucket %q in location %q", bucket, location)
}
}
return &S3Storage{
bucket: bucket,
minioClient: minioClient,
minioCore: minioCore,
}, nil
}
func (s *S3Storage) Stat(p string) (*ObjectInfo, error) {
if _, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{}); err != nil {
merr := minio.ToErrorResponse(err)
if merr.StatusCode == http.StatusNotFound {
return nil, ErrNotExist
}
return nil, merr
}
return &ObjectInfo{Path: p}, nil
}
func (s *S3Storage) ReadObject(filepath string) (io.ReadCloser, error) {
if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil {
merr := minio.ToErrorResponse(err)
if merr.StatusCode == http.StatusNotFound {
return nil, ErrNotExist
}
return nil, merr
}
return s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{})
}
func (s *S3Storage) WriteObject(filepath string, data io.Reader) error {
// hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes
// TODO(sgotti) wait for minio client to expose an api to provide the max part size so we can remove this
tmpfile, err := ioutil.TempFile(os.TempDir(), "s3")
if err != nil {
return err
}
defer tmpfile.Close()
defer os.Remove(tmpfile.Name())
size, err := io.Copy(tmpfile, data)
if err != nil {
return err
}
if _, err := tmpfile.Seek(0, 0); err != nil {
return err
}
_, err = s.minioClient.PutObject(s.bucket, filepath, tmpfile, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
return err
}
func (s *S3Storage) DeleteObject(filepath string) error {
return s.minioClient.RemoveObject(s.bucket, filepath)
}
func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
objectCh := make(chan ObjectInfo, 1)
if len(delimiter) > 1 {
objectCh <- ObjectInfo{
Err: errors.Errorf("wrong delimiter %q", delimiter),
}
return objectCh
}
// remove leading slash
if strings.HasPrefix(prefix, "/") {
prefix = strings.TrimPrefix(prefix, "/")
}
if strings.HasPrefix(startWith, "/") {
startWith = strings.TrimPrefix(startWith, "/")
}
// Initiate list objects goroutine here.
go func(objectCh chan<- ObjectInfo) {
defer close(objectCh)
// Save continuationToken for next request.
var continuationToken string
for {
// Get list of objects a maximum of 1000 per request.
result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith)
if err != nil {
objectCh <- ObjectInfo{
Err: err,
}
return
}
// If contents are available loop through and send over channel.
for _, object := range result.Contents {
select {
// Send object content.
case objectCh <- ObjectInfo{Path: object.Key}:
// If receives done from the caller, return here.
case <-doneCh:
return
}
}
// If continuation token present, save it for next request.
if result.NextContinuationToken != "" {
continuationToken = result.NextContinuationToken
}
// Listing ends result is not truncated, return right here.
if !result.IsTruncated {
return
}
}
}(objectCh)
return objectCh
}