datamanager: fix index creation on multiple data files
When during a checkpoint more than one file is created the entries position in the index is not right since it's not reset at every new index. Fix it and add related tests.
This commit is contained in:
parent
778bc93755
commit
52cb683267
@ -406,6 +406,11 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
|
||||
}
|
||||
|
||||
if de != nil {
|
||||
var lastSplitPos int64
|
||||
if len(splitPoints) > 0 {
|
||||
lastSplitPos = splitPoints[len(splitPoints)-1].pos
|
||||
}
|
||||
|
||||
lastEntryID = de.ID
|
||||
dataEntryj, err := json.Marshal(de)
|
||||
if err != nil {
|
||||
@ -414,13 +419,9 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
|
||||
if _, err := buf.Write(dataEntryj); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dataFileIndex.Index[de.ID] = pos
|
||||
dataFileIndex.Index[de.ID] = pos - lastSplitPos
|
||||
prevPos := pos
|
||||
pos += int64(len(dataEntryj))
|
||||
var lastSplitPos int64
|
||||
if len(splitPoints) > 0 {
|
||||
lastSplitPos = splitPoints[len(splitPoints)-1].pos
|
||||
}
|
||||
if pos-lastSplitPos > d.maxDataFileSize {
|
||||
// add split point only if it's different (less) than the previous one
|
||||
if lastSplitPos < prevPos {
|
||||
@ -434,7 +435,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s
|
||||
}
|
||||
}
|
||||
|
||||
// save remaining data
|
||||
// save data
|
||||
if buf.Len() != 0 {
|
||||
var curPos int64
|
||||
var lastSplitPos int64
|
||||
@ -483,7 +484,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
|
||||
|
||||
matchingDataFileID = curFiles[dataType][0].ID
|
||||
for _, dataStatusFile := range curFiles[dataType] {
|
||||
if dataStatusFile.LastEntryID > id {
|
||||
if dataStatusFile.LastEntryID >= id {
|
||||
matchingDataFileID = dataStatusFile.ID
|
||||
break
|
||||
}
|
||||
|
@ -736,6 +736,98 @@ func testCheckpoint(t *testing.T, basePath string) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRead(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "agola")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
etcdDir, err := ioutil.TempDir(dir, "etcd")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
tetcd := setupEtcd(t, etcdDir)
|
||||
defer shutdownEtcd(tetcd)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
ostDir, err := ioutil.TempDir(dir, "ost")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
ost, err := posix.New(ostDir)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
dmConfig := &DataManagerConfig{
|
||||
BasePath: "basepath",
|
||||
E: tetcd.TestEtcd.Store,
|
||||
OST: objectstorage.NewObjStorage(ost, "/"),
|
||||
// remove almost all wals to see that they are removed also from changes
|
||||
EtcdWalsKeepNum: 1,
|
||||
DataTypes: []string{"datatype01"},
|
||||
// checkpoint also with only one wal
|
||||
MinCheckpointWalsNum: 1,
|
||||
// use a small maxDataFileSize
|
||||
MaxDataFileSize: 10 * 1024,
|
||||
}
|
||||
dm, err := NewDataManager(ctx, logger, dmConfig)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
dmReadyCh := make(chan struct{})
|
||||
go func() { _ = dm.Run(ctx, dmReadyCh) }()
|
||||
<-dmReadyCh
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
|
||||
|
||||
// test insert from scratch (no current entries)
|
||||
actions := []*Action{}
|
||||
for i := 0; i < 2000; i++ {
|
||||
action := &Action{
|
||||
ActionType: ActionTypePut,
|
||||
ID: fmt.Sprintf("object%04d", i),
|
||||
DataType: "datatype01",
|
||||
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
|
||||
}
|
||||
actions = append(actions, action)
|
||||
}
|
||||
|
||||
currentEntries, err := doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
// ensure that at least three datafiles are created
|
||||
curDataStatus, err := dm.GetLastDataStatus()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if len(curDataStatus.Files["datatype01"]) < 3 {
|
||||
t.Fatalf("expected at least 3 datafiles, got: %d", len(curDataStatus.Files["datatype01"]))
|
||||
}
|
||||
|
||||
for i := 0; i < 2000; i++ {
|
||||
id := fmt.Sprintf("object%04d", i)
|
||||
|
||||
er, err := dm.Read("datatype01", id)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
data, err := ioutil.ReadAll(er)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected err: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(data, currentEntries[id].Data) {
|
||||
t.Fatalf("expected data: %v, got data: %v", currentEntries[id].Data, data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error {
|
||||
// read the data file
|
||||
curDataStatus, err := dm.GetLastDataStatus()
|
||||
|
Loading…
Reference in New Issue
Block a user