diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 7c790c6..10de284 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -89,7 +89,7 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { if err != nil { return err } - if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), true); err != nil { + if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), -1, true); err != nil { return err } @@ -203,7 +203,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty pos += len(dataEntryj) } - if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, true); err != nil { + if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, -1, true); err != nil { return err } @@ -211,7 +211,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty if err != nil { return err } - if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), true); err != nil { + if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), -1, true); err != nil { return err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 1290cba..0adfba2 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -485,7 +485,7 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti return nil, err } } - if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), true); err != nil { + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), -1, true); err != nil { return nil, err } d.log.Debugf("wrote wal file: %s", walDataFilePath) @@ -629,7 +629,7 @@ func (d *DataManager) sync(ctx context.Context) error { } walFileCommittedPath := walFilePath + ".committed" - if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), true); err != nil { + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), -1, true); err != nil { return err } @@ -659,7 +659,7 @@ func (d *DataManager) sync(ctx context.Context) error { walFilePath := d.storageWalStatusFile(walData.WalSequence) d.log.Debugf("checkpointing committed wal to storage") walFileCheckpointedPath := walFilePath + ".checkpointed" - if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), true); err != nil { + if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), -1, true); err != nil { return err } } diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index eb5c996..1555a1d 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -35,7 +35,7 @@ type ReadSeekCloser interface { type Storage interface { Stat(filepath string) (*ObjectInfo, error) ReadObject(filepath string) (ReadSeekCloser, error) - WriteObject(filepath string, data io.Reader, persist bool) error + WriteObject(filepath string, data io.Reader, size int64, persist bool) error DeleteObject(filepath string) error List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo } diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go index 667d7e2..c008da2 100644 --- a/internal/objectstorage/objectstorage_test.go +++ b/internal/objectstorage/objectstorage_test.go @@ -260,7 +260,7 @@ func TestList(t *testing.T) { os := NewObjStorage(s, "/") // populate for _, p := range tt.objects { - if err := os.WriteObject(p, strings.NewReader(""), true); err != nil { + if err := os.WriteObject(p, strings.NewReader(""), 0, true); err != nil { t.Fatalf("%s %d err: %v", sname, i, err) } } diff --git a/internal/objectstorage/posix.go b/internal/objectstorage/posix.go index 5a3e5ff..75a687b 100644 --- a/internal/objectstorage/posix.go +++ b/internal/objectstorage/posix.go @@ -264,7 +264,7 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { return f, err } -func (s *PosixStorage) WriteObject(p string, data io.Reader, persist bool) error { +func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error { fspath, err := s.fsPath(p) if err != nil { return err diff --git a/internal/objectstorage/posix_test.go b/internal/objectstorage/posix_test.go index 7787532..35f1255 100644 --- a/internal/objectstorage/posix_test.go +++ b/internal/objectstorage/posix_test.go @@ -92,7 +92,7 @@ func TestDeleteObject(t *testing.T) { } for _, obj := range objects { - if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), true); err != nil { + if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), 0, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := ls.DeleteObject(obj); err != nil { diff --git a/internal/objectstorage/s3.go b/internal/objectstorage/s3.go index 303acf8..b94ba23 100644 --- a/internal/objectstorage/s3.go +++ b/internal/objectstorage/s3.go @@ -84,16 +84,25 @@ func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { return s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{}) } -func (s *S3Storage) WriteObject(filepath string, data io.Reader, persist bool) error { +func (s *S3Storage) WriteObject(filepath string, data io.Reader, size int64, persist bool) error { + // if size is not specified, limit max object size to defaultMaxObjectSize so + // minio client will not calculate a very big part size using tons of ram. + // An alternative is to write the file locally so we can calculate the size and + // then put it. See commented out code below. + if size >= 0 { + _, err := s.minioClient.PutObject(s.bucket, filepath, data, size, minio.PutObjectOptions{ContentType: "application/octet-stream"}) + return err + } + // hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes - // TODO(sgotti) wait for minio client to expose an api to provide the max part size so we can remove this + // TODO(sgotti) wait for minio client to expose an api to provide the max object size so we can remove this tmpfile, err := ioutil.TempFile(os.TempDir(), "s3") if err != nil { return err } defer tmpfile.Close() defer os.Remove(tmpfile.Name()) - size, err := io.Copy(tmpfile, data) + size, err = io.Copy(tmpfile, data) if err != nil { return err } diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index deab9ab..43d41f3 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -411,7 +411,7 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } cachePath := store.OSTCachePath(key) - if err := h.ost.WriteObject(cachePath, r.Body, false); err != nil { + if err := h.ost.WriteObject(cachePath, r.Body, -1, false); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 0ad45b5..d8dfa43 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -931,7 +931,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(logPath, r.Body, false) + return s.ost.WriteObject(logPath, r.Body, -1, false) } func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { @@ -1073,7 +1073,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(path, r.Body, false) + return s.ost.WriteObject(path, r.Body, -1, false) } func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {