Merge pull request #163 from sgotti/datamanager_fix_index_pos

datamanager: fix index creation on multiple data files
This commit is contained in:
Simone Gotti 2019-11-06 17:50:12 +01:00 committed by GitHub
commit 156995a804
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 7 deletions

View File

@ -406,6 +406,11 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
} }
if de != nil { if de != nil {
var lastSplitPos int64
if len(splitPoints) > 0 {
lastSplitPos = splitPoints[len(splitPoints)-1].pos
}
lastEntryID = de.ID lastEntryID = de.ID
dataEntryj, err := json.Marshal(de) dataEntryj, err := json.Marshal(de)
if err != nil { if err != nil {
@ -414,13 +419,9 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
if _, err := buf.Write(dataEntryj); err != nil { if _, err := buf.Write(dataEntryj); err != nil {
return nil, err return nil, err
} }
dataFileIndex.Index[de.ID] = pos dataFileIndex.Index[de.ID] = pos - lastSplitPos
prevPos := pos prevPos := pos
pos += int64(len(dataEntryj)) pos += int64(len(dataEntryj))
var lastSplitPos int64
if len(splitPoints) > 0 {
lastSplitPos = splitPoints[len(splitPoints)-1].pos
}
if pos-lastSplitPos > d.maxDataFileSize { if pos-lastSplitPos > d.maxDataFileSize {
// add split point only if it's different (less) than the previous one // add split point only if it's different (less) than the previous one
if lastSplitPos < prevPos { if lastSplitPos < prevPos {
@ -434,7 +435,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
} }
} }
// save remaining data // save data
if buf.Len() != 0 { if buf.Len() != 0 {
var curPos int64 var curPos int64
var lastSplitPos int64 var lastSplitPos int64
@ -483,7 +484,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
matchingDataFileID = curFiles[dataType][0].ID matchingDataFileID = curFiles[dataType][0].ID
for _, dataStatusFile := range curFiles[dataType] { for _, dataStatusFile := range curFiles[dataType] {
if dataStatusFile.LastEntryID > id { if dataStatusFile.LastEntryID >= id {
matchingDataFileID = dataStatusFile.ID matchingDataFileID = dataStatusFile.ID
break break
} }

View File

@ -736,6 +736,98 @@ func testCheckpoint(t *testing.T, basePath string) {
} }
} }
func TestRead(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx := context.Background()
ostDir, err := ioutil.TempDir(dir, "ost")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ost, err := posix.New(ostDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmConfig := &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01"},
// checkpoint also with only one wal
MinCheckpointWalsNum: 1,
// use a small maxDataFileSize
MaxDataFileSize: 10 * 1024,
}
dm, err := NewDataManager(ctx, logger, dmConfig)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmReadyCh := make(chan struct{})
go func() { _ = dm.Run(ctx, dmReadyCh) }()
<-dmReadyCh
time.Sleep(5 * time.Second)
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
// test insert from scratch (no current entries)
actions := []*Action{}
for i := 0; i < 2000; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
currentEntries, err := doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// ensure that at least three datafiles are created
curDataStatus, err := dm.GetLastDataStatus()
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if len(curDataStatus.Files["datatype01"]) < 3 {
t.Fatalf("expected at least 3 datafiles, got: %d", len(curDataStatus.Files["datatype01"]))
}
for i := 0; i < 2000; i++ {
id := fmt.Sprintf("object%04d", i)
er, err := dm.Read("datatype01", id)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
data, err := ioutil.ReadAll(er)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if !reflect.DeepEqual(data, currentEntries[id].Data) {
t.Fatalf("expected data: %v, got data: %v", currentEntries[id].Data, data)
}
}
}
func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error { func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error {
// read the data file // read the data file
curDataStatus, err := dm.GetLastDataStatus() curDataStatus, err := dm.GetLastDataStatus()