diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index d100843..7c790c6 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -89,7 +89,7 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { if err != nil { return err } - if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj)); err != nil { + if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), true); err != nil { return err } @@ -203,7 +203,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty pos += len(dataEntryj) } - if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf); err != nil { + if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, true); err != nil { return err } @@ -211,7 +211,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty if err != nil { return err } - if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj)); err != nil { + if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), true); err != nil { return err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 8dff9d6..1290cba 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -485,7 +485,7 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti return nil, err } } - if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil { + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), true); err != nil { return nil, err } d.log.Debugf("wrote wal file: %s", walDataFilePath) @@ -629,7 +629,7 @@ func (d *DataManager) sync(ctx context.Context) error { } walFileCommittedPath := walFilePath + ".committed" - if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil { + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), true); err != nil { return err } @@ -659,7 +659,7 @@ func (d *DataManager) sync(ctx context.Context) error { walFilePath := d.storageWalStatusFile(walData.WalSequence) d.log.Debugf("checkpointing committed wal to storage") walFileCheckpointedPath := walFilePath + ".checkpointed" - if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil { + if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), true); err != nil { return err } } diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index 69ebd5b..eb5c996 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -35,7 +35,7 @@ type ReadSeekCloser interface { type Storage interface { Stat(filepath string) (*ObjectInfo, error) ReadObject(filepath string) (ReadSeekCloser, error) - WriteObject(filepath string, data io.Reader) error + WriteObject(filepath string, data io.Reader, persist bool) error DeleteObject(filepath string) error List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo } diff --git a/internal/objectstorage/objectstorage_test.go b/internal/objectstorage/objectstorage_test.go index 8590cf1..667d7e2 100644 --- a/internal/objectstorage/objectstorage_test.go +++ b/internal/objectstorage/objectstorage_test.go @@ -260,7 +260,7 @@ func TestList(t *testing.T) { os := NewObjStorage(s, "/") // populate for _, p := range tt.objects { - if err := os.WriteObject(p, strings.NewReader("")); err != nil { + if err := os.WriteObject(p, strings.NewReader(""), true); err != nil { t.Fatalf("%s %d err: %v", sname, i, err) } } diff --git a/internal/objectstorage/posix.go b/internal/objectstorage/posix.go index aa51be1..5a3e5ff 100644 --- a/internal/objectstorage/posix.go +++ b/internal/objectstorage/posix.go @@ -264,7 +264,7 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { return f, err } -func (s *PosixStorage) WriteObject(p string, data io.Reader) error { +func (s *PosixStorage) WriteObject(p string, data io.Reader, persist bool) error { fspath, err := s.fsPath(p) if err != nil { return err @@ -273,7 +273,7 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader) error { if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil { return err } - return s.WriteFileAtomicFunc(fspath, 0660, func(f io.Writer) error { + return s.WriteFileAtomicFunc(fspath, 0660, persist, func(f io.Writer) error { _, err := io.Copy(f, data) return err }) @@ -437,13 +437,13 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s // data to the file. // TODO(sgotti) remove left over tmp files if process crashes before calling // os.Remove -func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc func(f io.Writer) error) error { +func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error { f, err := ioutil.TempFile(s.tmpDir, "tmpfile") if err != nil { return err } err = writeFunc(f) - if err == nil { + if persist && err == nil { err = f.Sync() } if closeErr := f.Close(); err == nil { @@ -460,6 +460,9 @@ func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc return err } + if !persist { + return nil + } // sync parent dirs pdir := filepath.Dir(p) for { @@ -482,8 +485,8 @@ func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc return nil } -func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, data []byte) error { - return s.WriteFileAtomicFunc(filename, perm, +func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, persist bool, data []byte) error { + return s.WriteFileAtomicFunc(filename, perm, persist, func(f io.Writer) error { _, err := f.Write(data) return err diff --git a/internal/objectstorage/posix_test.go b/internal/objectstorage/posix_test.go index fe256e4..7787532 100644 --- a/internal/objectstorage/posix_test.go +++ b/internal/objectstorage/posix_test.go @@ -92,7 +92,7 @@ func TestDeleteObject(t *testing.T) { } for _, obj := range objects { - if err := ls.WriteObject(obj, bytes.NewReader([]byte{})); err != nil { + if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := ls.DeleteObject(obj); err != nil { diff --git a/internal/objectstorage/s3.go b/internal/objectstorage/s3.go index 1be8f81..303acf8 100644 --- a/internal/objectstorage/s3.go +++ b/internal/objectstorage/s3.go @@ -84,7 +84,7 @@ func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { return s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{}) } -func (s *S3Storage) WriteObject(filepath string, data io.Reader) error { +func (s *S3Storage) WriteObject(filepath string, data io.Reader, persist bool) error { // hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes // TODO(sgotti) wait for minio client to expose an api to provide the max part size so we can remove this tmpfile, err := ioutil.TempFile(os.TempDir(), "s3") diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/scheduler/api/executor.go index 233ace4..deab9ab 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/scheduler/api/executor.go @@ -411,7 +411,7 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } cachePath := store.OSTCachePath(key) - if err := h.ost.WriteObject(cachePath, r.Body); err != nil { + if err := h.ost.WriteObject(cachePath, r.Body, false); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index c46f220..0ad45b5 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -931,7 +931,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(logPath, r.Body) + return s.ost.WriteObject(logPath, r.Body, false) } func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { @@ -1073,7 +1073,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return errors.Errorf("received http status: %d", r.StatusCode) } - return s.ost.WriteObject(path, r.Body) + return s.ost.WriteObject(path, r.Body, false) } func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {