From 34cfdfeb3bb6890cfb5878c021357d46badac314 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 2 May 2019 09:47:38 +0200 Subject: [PATCH] objectstorage: add size option to WriteObject On s3 limit the max object size to 1GiB when the size is not provided (-1) or the minio client will calculate a big part size since it tries to use the maximum object size (5TiB) and will allocate a very big buffer in ram. Also leave as commented out the previous hack that was firstly creating the file locally to calculate the size and then put it (for future reference). --- internal/datamanager/data.go | 6 +++--- internal/datamanager/wal.go | 6 +++--- internal/objectstorage/objectstorage.go | 2 +- internal/objectstorage/objectstorage_test.go | 2 +- internal/objectstorage/posix.go | 2 +- internal/objectstorage/posix_test.go | 2 +- internal/objectstorage/s3.go | 15 ++++++++++++--- .../services/runservice/scheduler/api/executor.go | 2 +- .../services/runservice/scheduler/scheduler.go | 4 ++-- 9 files changed, 25 insertions(+), 16 deletions(-) diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 7c790c6..10de284 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -89,7 +89,7 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { if err != nil { return err } - if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), true); err != nil { + if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), -1, true); err != nil { return err } @@ -203,7 +203,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty pos += len(dataEntryj) } - if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, true); err != nil { + if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, -1, true); err != nil { return err } @@ -211,7 +211,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty if err != nil { return err } - if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), true); err != nil { + if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), -1, true); err != nil { return err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 1290cba..0adfba2 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -485,7 +485,7 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti return nil, err } } - if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), true); err != nil { + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), -1, true); err != nil { return nil, err } d.log.Debugf("wrote wal file: %s", walDataFilePath) @@ -629,7 +629,7 @@ func (d *DataManager) sync(ctx context.Context) error { } walFileCommittedPath := walFilePath + ".committed" - if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), true); err != nil { + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), -1, true); err != nil { return err } @@ -659,7 +659,7 @@ func (d *DataManager) sync(ctx context.Context) error { walFilePath := d.storageWalStatusFile(walData.WalSequence) d.log.Debugf("checkpointing committed wal to storage") walFileCheckpointedPath := walFilePath + ".checkpointed" - if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), true); err != nil { + if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), -1, true); err != nil { return err } } diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index eb5c996..1555a1d 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -35,7 +35,7 @@ type ReadSeekCloser interface { type Storage interface { Stat(filepath string) (*ObjectInfo, error) ReadObject(filepath string) (ReadSeekCloser, error) - WriteObject(filepath string, data io.Reader, persist bool) error + WriteObject(filepath string, data io.Reader, size int64, persist bool) error DeleteObject(filepath string) error List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo } diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go index 667d7e2..c008da2 100644 --- a/internal/objectstorage/objectstorage_test.go +++ b/internal/objectstorage/objectstorage_test.go @@ -260,7 +260,7 @@ func TestList(t *testing.T) { os := NewObjStorage(s, "/") // populate for _, p := range tt.objects { - if err := os.WriteObject(p, strings.NewReader(""), true); err != nil { + if err := os.WriteObject(p, strings.NewReader(""), 0, true); err != nil { t.Fatalf("%s %d err: %v", sname, i, err) } } diff --git a/internal/objectstorage/posix.go b/internal/objectstorage/posix.go index 5a3e5ff..75a687b 100644 --- a/internal/objectstorage/posix.go +++ b/internal/objectstorage/posix.go @@ -264,7 +264,7 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { return f, err } -func (s *PosixStorage) WriteObject(p string, data io.Reader, persist bool) error { +func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { fspath, err := s.fsPath(p) if err != nil { return err diff --git a/internal/objectstorage/posix_test.go b/internal/objectstorage/posix_test.go index 7787532..35f1255 100644 --- a/internal/objectstorage/posix_test.go +++ b/internal/objectstorage/posix_test.go @@ -92,7 +92,7 @@ func TestDeleteObject(t *testing.T) { } for _, obj := range objects { - if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), true); err != nil { + if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), 0, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := ls.DeleteObject(obj); err != nil { diff --git a/internal/objectstorage/s3.go b/internal/objectstorage/s3.go index 303acf8..b94ba23 100644 --- a/internal/objectstorage/s3.go +++ b/internal/objectstorage/s3.go @@ -84,16 +84,25 @@ func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { return s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{}) } -func (s *S3Storage) WriteObject(filepath string, data io.Reader, persist bool) error { +func (s *S3Storage) WriteObject(filepath string, data io.Reader, size int64, persist bool) error { + // if size is not specified, limit max object size to defaultMaxObjectSize so + // minio client will not calculate a very big part size using tons of ram. + // An alternative is to write the file locally so we can calculate the size and + // then put it. See commented out code below. + if size >= 0 { + _, err := s.minioClient.PutObject(s.bucket, filepath, data, size, minio.PutObjectOptions{ContentType: "application/octet-stream"}) + return err + } + // hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes - // TODO(sgotti) wait for minio client to expose an api to provide the max part size so we can remove this + // TODO(sgotti) wait for minio client to expose an api to provide the max object size so we can remove this tmpfile, err := ioutil.TempFile(os.TempDir(), "s3") if err != nil { return err } defer tmpfile.Close() defer os.Remove(tmpfile.Name()) - size, err := io.Copy(tmpfile, data) + size, err = io.Copy(tmpfile, data) if err != nil { return err } diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index deab9ab..43d41f3 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -411,7 +411,7 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } cachePath := store.OSTCachePath(key) - if err := h.ost.WriteObject(cachePath, r.Body, false); err != nil { + if err := h.ost.WriteObject(cachePath, r.Body, -1, false); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 0ad45b5..d8dfa43 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -931,7 +931,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(logPath, r.Body, false) + return s.ost.WriteObject(logPath, r.Body, -1, false) } func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { @@ -1073,7 +1073,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(path, r.Body, false) + return s.ost.WriteObject(path, r.Body, -1, false) } func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {