objectstorage: add persist option to WriteObject

This options is a noop on s3 but on the posix implementation it becomes useful
when there isn't the need to have a persistent file, thus avoiding some fsync
calls.
This commit is contained in:
Simone Gotti 2019-05-01 15:06:47 +02:00
parent 68e6bd5bdf
commit e964aa3537
9 changed files with 22 additions and 19 deletions

View File

@ -89,7 +89,7 @@ func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error {
if err != nil {
return err
}
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj)); err != nil {
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), true); err != nil {
return err
}
@ -203,7 +203,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
pos += len(dataEntryj)
}
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf); err != nil {
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf, true); err != nil {
return err
}
@ -211,7 +211,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, dataty
if err != nil {
return err
}
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj)); err != nil {
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj), true); err != nil {
return err
}

View File

@ -485,7 +485,7 @@ func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Acti
return nil, err
}
}
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil {
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), true); err != nil {
return nil, err
}
d.log.Debugf("wrote wal file: %s", walDataFilePath)
@ -629,7 +629,7 @@ func (d *DataManager) sync(ctx context.Context) error {
}
walFileCommittedPath := walFilePath + ".committed"
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil {
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), true); err != nil {
return err
}
@ -659,7 +659,7 @@ func (d *DataManager) sync(ctx context.Context) error {
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("checkpointing committed wal to storage")
walFileCheckpointedPath := walFilePath + ".checkpointed"
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil {
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{}), true); err != nil {
return err
}
}

View File

@ -35,7 +35,7 @@ type ReadSeekCloser interface {
type Storage interface {
Stat(filepath string) (*ObjectInfo, error)
ReadObject(filepath string) (ReadSeekCloser, error)
WriteObject(filepath string, data io.Reader) error
WriteObject(filepath string, data io.Reader, persist bool) error
DeleteObject(filepath string) error
List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo
}

View File

@ -260,7 +260,7 @@ func TestList(t *testing.T) {
os := NewObjStorage(s, "/")
// populate
for _, p := range tt.objects {
if err := os.WriteObject(p, strings.NewReader("")); err != nil {
if err := os.WriteObject(p, strings.NewReader(""), true); err != nil {
t.Fatalf("%s %d err: %v", sname, i, err)
}
}

View File

@ -264,7 +264,7 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) {
return f, err
}
func (s *PosixStorage) WriteObject(p string, data io.Reader) error {
func (s *PosixStorage) WriteObject(p string, data io.Reader, persist bool) error {
fspath, err := s.fsPath(p)
if err != nil {
return err
@ -273,7 +273,7 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader) error {
if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil {
return err
}
return s.WriteFileAtomicFunc(fspath, 0660, func(f io.Writer) error {
return s.WriteFileAtomicFunc(fspath, 0660, persist, func(f io.Writer) error {
_, err := io.Copy(f, data)
return err
})
@ -437,13 +437,13 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s
// data to the file.
// TODO(sgotti) remove left over tmp files if process crashes before calling
// os.Remove
func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc func(f io.Writer) error) error {
func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, persist bool, writeFunc func(f io.Writer) error) error {
f, err := ioutil.TempFile(s.tmpDir, "tmpfile")
if err != nil {
return err
}
err = writeFunc(f)
if err == nil {
if persist && err == nil {
err = f.Sync()
}
if closeErr := f.Close(); err == nil {
@ -460,6 +460,9 @@ func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc
return err
}
if !persist {
return nil
}
// sync parent dirs
pdir := filepath.Dir(p)
for {
@ -482,8 +485,8 @@ func (s *PosixStorage) WriteFileAtomicFunc(p string, perm os.FileMode, writeFunc
return nil
}
func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, data []byte) error {
return s.WriteFileAtomicFunc(filename, perm,
func (s *PosixStorage) WriteFileAtomic(filename string, perm os.FileMode, persist bool, data []byte) error {
return s.WriteFileAtomicFunc(filename, perm, persist,
func(f io.Writer) error {
_, err := f.Write(data)
return err

View File

@ -92,7 +92,7 @@ func TestDeleteObject(t *testing.T) {
}
for _, obj := range objects {
if err := ls.WriteObject(obj, bytes.NewReader([]byte{})); err != nil {
if err := ls.WriteObject(obj, bytes.NewReader([]byte{}), true); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := ls.DeleteObject(obj); err != nil {

View File

@ -84,7 +84,7 @@ func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) {
return s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{})
}
func (s *S3Storage) WriteObject(filepath string, data io.Reader) error {
func (s *S3Storage) WriteObject(filepath string, data io.Reader, persist bool) error {
// hack to know the real file size or minio will do this in memory with big memory usage since s3 doesn't support real streaming of unknown sizes
// TODO(sgotti) wait for minio client to expose an api to provide the max part size so we can remove this
tmpfile, err := ioutil.TempFile(os.TempDir(), "s3")

View File

@ -411,7 +411,7 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
cachePath := store.OSTCachePath(key)
if err := h.ost.WriteObject(cachePath, r.Body); err != nil {
if err := h.ost.WriteObject(cachePath, r.Body, false); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

View File

@ -931,7 +931,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool,
return errors.Errorf("received http status: %d", r.StatusCode)
}
return s.ost.WriteObject(logPath, r.Body)
return s.ost.WriteObject(logPath, r.Body, false)
}
func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error {
@ -1073,7 +1073,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum
return errors.Errorf("received http status: %d", r.StatusCode)
}
return s.ost.WriteObject(path, r.Body)
return s.ost.WriteObject(path, r.Body, false)
}
func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {