d2b09d854f
Implement a new error handling library based on pkg/errors. It provides stack saving on wrapping and exports some function to add stack saving also to external errors. It also implements custom zerolog error formatting without adding too much verbosity by just printing the chain error file:line without a full stack trace of every error. * Add a --detailed-errors options to print error with they full chain * Wrap all error returns. Use errors.WithStack to wrap without adding a new messsage and error.Wrap[f] to add a message. * Add golangci-lint wrapcheck to check that external packages errors are wrapped. This won't check that internal packages error are wrapped. But we want also to ensure this case so we'll have to find something else to check also these.
178 lines
5.5 KiB
Go
178 lines
5.5 KiB
Go
// Copyright 2019 Sorint.lab
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package objectstorage
|
|
|
|
import (
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
|
|
"agola.io/agola/internal/errors"
|
|
minio "github.com/minio/minio-go/v6"
|
|
)
|
|
|
|
type S3Storage struct {
|
|
bucket string
|
|
minioClient *minio.Client
|
|
// minio core client user for low level api
|
|
minioCore *minio.Core
|
|
}
|
|
|
|
func NewS3(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) {
|
|
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
minioCore, err := minio.NewCore(endpoint, accessKeyID, secretAccessKey, secure)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
exists, err := minioClient.BucketExists(bucket)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "cannot check if bucket %q in location %q exits", bucket, location)
|
|
}
|
|
if !exists {
|
|
if err := minioClient.MakeBucket(bucket, location); err != nil {
|
|
return nil, errors.Wrapf(err, "cannot create bucket %q in location %q", bucket, location)
|
|
}
|
|
}
|
|
|
|
return &S3Storage{
|
|
bucket: bucket,
|
|
minioClient: minioClient,
|
|
minioCore: minioCore,
|
|
}, nil
|
|
}
|
|
|
|
func (s *S3Storage) Stat(p string) (*ObjectInfo, error) {
|
|
oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{})
|
|
if err != nil {
|
|
merr := minio.ToErrorResponse(err)
|
|
if merr.StatusCode == http.StatusNotFound {
|
|
return nil, NewErrNotExist(errors.Errorf("object %q doesn't exist", p))
|
|
}
|
|
return nil, errors.WithStack(merr)
|
|
}
|
|
|
|
return &ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil
|
|
}
|
|
|
|
func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) {
|
|
if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil {
|
|
merr := minio.ToErrorResponse(err)
|
|
if merr.StatusCode == http.StatusNotFound {
|
|
return nil, NewErrNotExist(errors.Errorf("object %q doesn't exist", filepath))
|
|
}
|
|
return nil, errors.WithStack(merr)
|
|
}
|
|
|
|
o, err := s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{})
|
|
|
|
return o, errors.WithStack(err)
|
|
}
|
|
|
|
func (s *S3Storage) WriteObject(filepath string, data io.Reader, size int64, persist bool) error {
|
|
// if size is not specified, limit max object size to defaultMaxObjectSize so
|
|
// minio client will not calculate a very big part size using tons of ram.
|
|
// An alternative is to write the file locally so we can calculate the size and
|
|
// then put it. See commented out code below.
|
|
if size >= 0 {
|
|
lr := io.LimitReader(data, size)
|
|
_, err := s.minioClient.PutObject(s.bucket, filepath, lr, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
// hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes
|
|
// TODO(sgotti) wait for minio client to expose an api to provide the max object size so we can remove this
|
|
tmpfile, err := ioutil.TempFile(os.TempDir(), "s3")
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
defer tmpfile.Close()
|
|
defer os.Remove(tmpfile.Name())
|
|
size, err = io.Copy(tmpfile, data)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
if _, err := tmpfile.Seek(0, 0); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
_, err = s.minioClient.PutObject(s.bucket, filepath, tmpfile, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (s *S3Storage) DeleteObject(filepath string) error {
|
|
return errors.WithStack(s.minioClient.RemoveObject(s.bucket, filepath))
|
|
}
|
|
|
|
func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
|
|
objectCh := make(chan ObjectInfo, 1)
|
|
|
|
if len(delimiter) > 1 {
|
|
objectCh <- ObjectInfo{
|
|
Err: errors.Errorf("wrong delimiter %q", delimiter),
|
|
}
|
|
return objectCh
|
|
}
|
|
|
|
// remove leading slash
|
|
prefix = strings.TrimPrefix(prefix, "/")
|
|
startWith = strings.TrimPrefix(startWith, "/")
|
|
|
|
// Initiate list objects goroutine here.
|
|
go func(objectCh chan<- ObjectInfo) {
|
|
defer close(objectCh)
|
|
// Save continuationToken for next request.
|
|
var continuationToken string
|
|
for {
|
|
// Get list of objects a maximum of 1000 per request.
|
|
result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith)
|
|
if err != nil {
|
|
objectCh <- ObjectInfo{
|
|
Err: err,
|
|
}
|
|
return
|
|
}
|
|
|
|
// If contents are available loop through and send over channel.
|
|
for _, object := range result.Contents {
|
|
select {
|
|
// Send object content.
|
|
case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}:
|
|
// If receives done from the caller, return here.
|
|
case <-doneCh:
|
|
return
|
|
}
|
|
}
|
|
|
|
// If continuation token present, save it for next request.
|
|
if result.NextContinuationToken != "" {
|
|
continuationToken = result.NextContinuationToken
|
|
}
|
|
|
|
// Listing ends result is not truncated, return right here.
|
|
if !result.IsTruncated {
|
|
return
|
|
}
|
|
}
|
|
}(objectCh)
|
|
|
|
return objectCh
|
|
}
|