objectstorage: add size option to WriteObject

On s3 limit the max object size to 1GiB when the size is not provided (-1) or
the minio client will calculate a big part size since it tries to use the
maximum object size (5TiB) and will allocate a very big buffer in ram. Also
leave as commented out the previous hack that was firstly creating the file
locally to calculate the size and then put it (for future reference).
This commit is contained in:
Simone Gotti 2019-05-02 09:47:38 +02:00
parent e964aa3537
commit 34cfdfeb3b
9 changed files with 25 additions and 16 deletions

View File

@ -89,7 +89,7 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error {
if err != nil {
return err
}
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), true); err != nil {
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), -1, true); err != nil {
return err
}
@ -203,7 +203,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
pos += len(dataEntryj)
}
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, true); err != nil {
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, -1, true); err != nil {
return err
}
@ -211,7 +211,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
if err != nil {
return err
}
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), true); err != nil {
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), -1, true); err != nil {
return err
}

View File

@ -485,7 +485,7 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti
return nil, err
}
}
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), true); err != nil {
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), -1, true); err != nil {
return nil, err
}
d.log.Debugf("wrote wal file: %s", walDataFilePath)
@ -629,7 +629,7 @@ func (d *DataManager) sync(ctx context.Context) error {
}
walFileCommittedPath := walFilePath + ".committed"
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), true); err != nil {
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), -1, true); err != nil {
return err
}
@ -659,7 +659,7 @@ func (d *DataManager) sync(ctx context.Context) error {
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("checkpointing committed wal to storage")
walFileCheckpointedPath := walFilePath + ".checkpointed"
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), true); err != nil {
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), -1, true); err != nil {
return err
}
}

View File

@ -35,7 +35,7 @@ type ReadSeekCloser interface {
type Storage interface {
Stat(filepath string) (*ObjectInfo, error)
ReadObject(filepath string) (ReadSeekCloser, error)
WriteObject(filepath string, data io.Reader, persist bool) error
WriteObject(filepath string, data io.Reader, size int64, persist bool) error
DeleteObject(filepath string) error
List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo
}

View File

@ -260,7 +260,7 @@ func TestList(t *testing.T) {
os := NewObjStorage(s, "/")
// populate
for _, p := range tt.objects {
if err := os.WriteObject(p, strings.NewReader(""), true); err != nil {
if err := os.WriteObject(p, strings.NewReader(""), 0, true); err != nil {
t.Fatalf("%s %d err: %v", sname, i, err)
}
}

View File

@ -264,7 +264,7 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) {
return f, err
}
func (s *PosixStorage) WriteObject(p string, data io.Reader, persist bool) error {
func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error {
fspath, err := s.fsPath(p)
if err != nil {
return err

View File

@ -92,7 +92,7 @@ func TestDeleteObject(t *testing.T) {
}
for _, obj := range objects {
if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), true); err != nil {
if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), 0, true); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := ls.DeleteObject(obj); err != nil {

View File

@ -84,16 +84,25 @@ func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) {
return s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{})
}
func (s *S3Storage) WriteObject(filepath string, data io.Reader, persist bool) error {
func (s *S3Storage) WriteObject(filepath string, data io.Reader, size int64, persist bool) error {
// if size is not specified, limit max object size to defaultMaxObjectSize so
// minio client will not calculate a very big part size using tons of ram.
// An alternative is to write the file locally so we can calculate the size and
// then put it. See commented out code below.
if size >= 0 {
_, err := s.minioClient.PutObject(s.bucket, filepath, data, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
return err
}
// hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes
// TODO(sgotti) wait for minio client to expose an api to provide the max part size so we can remove this
// TODO(sgotti) wait for minio client to expose an api to provide the max object size so we can remove this
tmpfile, err := ioutil.TempFile(os.TempDir(), "s3")
if err != nil {
return err
}
defer tmpfile.Close()
defer os.Remove(tmpfile.Name())
size, err := io.Copy(tmpfile, data)
size, err = io.Copy(tmpfile, data)
if err != nil {
return err
}

View File

@ -411,7 +411,7 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
cachePath := store.OSTCachePath(key)
if err := h.ost.WriteObject(cachePath, r.Body, false); err != nil {
if err := h.ost.WriteObject(cachePath, r.Body, -1, false); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

View File

@ -931,7 +931,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool,
return errors.Errorf("received http status: %d", r.StatusCode)
}
return s.ost.WriteObject(logPath, r.Body, false)
return s.ost.WriteObject(logPath, r.Body, -1, false)
}
func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error {
@ -1073,7 +1073,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum
return errors.Errorf("received http status: %d", r.StatusCode)
}
return s.ost.WriteObject(path, r.Body, false)
return s.ost.WriteObject(path, r.Body, -1, false)
}
func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {