datamanager: add data sequence to data file name

When creating a datafile name make it start with the current data sequence. This
is useful in future to know which data sequence created a new data file.
This commit is contained in:
Simone Gotti 2019-10-29 13:23:42 +01:00
parent e0346617ab
commit 4b4416fc99
2 changed files with 25 additions and 12 deletions

View File

@ -74,6 +74,10 @@ func (w walActions) Len() int { return len(w) }
func (w walActions) Less(i, j int) bool { return w[i].ID < w[j].ID } func (w walActions) Less(i, j int) bool { return w[i].ID < w[j].ID }
func (w walActions) Swap(i, j int) { w[i], w[j] = w[j], w[i] } func (w walActions) Swap(i, j int) { w[i], w[j] = w[j], w[i] }
func (d *DataManager) dataFileID(dataSequence *sequence.Sequence, next string) string {
return fmt.Sprintf("%s-%s", dataSequence.String(), next)
}
func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex, error) { func (d *DataManager) walIndex(ctx context.Context, wals []*WalData) (walIndex, error) {
wimap := map[string]map[string]*Action{} wimap := map[string]map[string]*Action{}
@ -168,7 +172,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er
if curDataStatus != nil { if curDataStatus != nil {
curDataStatusFiles = curDataStatus.Files[dataType] curDataStatusFiles = curDataStatus.Files[dataType]
} }
dataStatusFiles, err := d.writeDataType(ctx, wi, dataType, curDataStatusFiles) dataStatusFiles, err := d.writeDataType(ctx, wi, dataType, dataSequence, curDataStatusFiles)
if err != nil { if err != nil {
return err return err
} }
@ -179,7 +183,7 @@ func (d *DataManager) writeDataSnapshot(ctx context.Context, wals []*WalData) er
if err != nil { if err != nil {
return err return err
} }
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { if err := d.ost.WriteObject(d.dataStatusPath(dataSequence), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err return err
} }
@ -285,7 +289,7 @@ func (d *DataManager) actionGroups(ctx context.Context, wi walIndex, dataType st
return actionGroups, remainingDataStatusFiles return actionGroups, remainingDataStatusFiles
} }
func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType string, curDataStatusFiles []*DataStatusFile) ([]*DataStatusFile, error) { func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType string, dataSequence *sequence.Sequence, curDataStatusFiles []*DataStatusFile) ([]*DataStatusFile, error) {
type SplitPoint struct { type SplitPoint struct {
pos int64 pos int64
lastEntryID string lastEntryID string
@ -443,7 +447,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
} }
dataFileIndexes = append(dataFileIndexes, dataFileIndex) dataFileIndexes = append(dataFileIndexes, dataFileIndex)
for i, sp := range splitPoints { for i, sp := range splitPoints {
curDataFileID := uuid.NewV4().String() curDataFileID := d.dataFileID(dataSequence, uuid.NewV4().String())
if err := d.writeDataFile(ctx, &buf, sp.pos-curPos, dataFileIndexes[i], curDataFileID, dataType); err != nil { if err := d.writeDataFile(ctx, &buf, sp.pos-curPos, dataFileIndexes[i], curDataFileID, dataType); err != nil {
return nil, err return nil, err
} }
@ -629,7 +633,7 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {
err := dec.Decode(&de) err := dec.Decode(&de)
if err == io.EOF { if err == io.EOF {
dataFileID := uuid.NewV4().String() dataFileID := d.dataFileID(dataSequence, uuid.NewV4().String())
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil { if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
return err return err
} }
@ -663,7 +667,7 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {
} }
if mustWrite { if mustWrite {
dataFileID := uuid.NewV4().String() dataFileID := d.dataFileID(dataSequence, uuid.NewV4().String())
if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil { if err := d.writeDataFile(ctx, &buf, int64(buf.Len()), dataFileIndex, dataFileID, curDataType); err != nil {
return err return err
} }
@ -709,7 +713,7 @@ func (d *DataManager) Import(ctx context.Context, r io.Reader) error {
if err != nil { if err != nil {
return err return err
} }
if err := d.ost.WriteObject(d.dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil { if err := d.ost.WriteObject(d.dataStatusPath(dataSequence), bytes.NewReader(dataStatusj), int64(len(dataStatusj)), true); err != nil {
return err return err
} }

View File

@ -23,6 +23,7 @@ import (
"agola.io/agola/internal/etcd" "agola.io/agola/internal/etcd"
"agola.io/agola/internal/objectstorage" "agola.io/agola/internal/objectstorage"
"agola.io/agola/internal/sequence"
"go.uber.org/zap" "go.uber.org/zap"
errors "golang.org/x/xerrors" errors "golang.org/x/xerrors"
@ -158,16 +159,24 @@ func (d *DataManager) storageDataDir() string {
return path.Join(d.basePath, storageDataDir) return path.Join(d.basePath, storageDataDir)
} }
func (d *DataManager) dataStatusPath(sequence string) string { func (d *DataManager) dataStatusPath(sequence *sequence.Sequence) string {
return fmt.Sprintf("%s/%s.status", d.storageDataDir(), sequence) return fmt.Sprintf("%s/%s.status", d.storageDataDir(), sequence)
} }
func (d *DataManager) DataFileIndexPath(dataType, id string) string { func (d *DataManager) DataTypeDir(dataType string) string {
return fmt.Sprintf("%s/%s/%s.index", d.storageDataDir(), dataType, id) return fmt.Sprintf("%s/%s", d.storageDataDir(), dataType)
} }
func (d *DataManager) DataFilePath(dataType, id string) string { func (d *DataManager) DataFileBasePath(dataType, name string) string {
return fmt.Sprintf("%s/%s/%s.data", d.storageDataDir(), dataType, id) return fmt.Sprintf("%s/%s", d.DataTypeDir(dataType), name)
}
func (d *DataManager) DataFileIndexPath(dataType, name string) string {
return fmt.Sprintf("%s.index", d.DataFileBasePath(dataType, name))
}
func (d *DataManager) DataFilePath(dataType, name string) string {
return fmt.Sprintf("%s.data", d.DataFileBasePath(dataType, name))
} }
func etcdWalKey(walSeq string) string { func etcdWalKey(walSeq string) string {