diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go index 3f9ddb2..e1aec8f 100644 --- a/internal/datamanager/data.go +++ b/internal/datamanager/data.go @@ -406,6 +406,11 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s } if de != nil { + var lastSplitPos int64 + if len(splitPoints) > 0 { + lastSplitPos = splitPoints[len(splitPoints)-1].pos + } + lastEntryID = de.ID dataEntryj, err := json.Marshal(de) if err != nil { @@ -414,13 +419,9 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s if _, err := buf.Write(dataEntryj); err != nil { return nil, err } - dataFileIndex.Index[de.ID] = pos + dataFileIndex.Index[de.ID] = pos - lastSplitPos prevPos := pos pos += int64(len(dataEntryj)) - var lastSplitPos int64 - if len(splitPoints) > 0 { - lastSplitPos = splitPoints[len(splitPoints)-1].pos - } if pos-lastSplitPos > d.maxDataFileSize { // add split point only if it's different (less) than the previous one if lastSplitPos < prevPos { @@ -434,7 +435,7 @@ func (d *DataManager) writeDataType(ctx context.Context, wi walIndex, dataType s } } - // save remaining data + // save data if buf.Len() != 0 { var curPos int64 var lastSplitPos int64 @@ -483,7 +484,7 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) { matchingDataFileID = curFiles[dataType][0].ID for _, dataStatusFile := range curFiles[dataType] { - if dataStatusFile.LastEntryID > id { + if dataStatusFile.LastEntryID >= id { matchingDataFileID = dataStatusFile.ID break } diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index c5aa930..a2604b7 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -736,6 +736,98 @@ func testCheckpoint(t *testing.T, basePath string) { } } +func TestRead(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + ost, err := posix.New(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + BasePath: "basepath", + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01"}, + // checkpoint also with only one wal + MinCheckpointWalsNum: 1, + // use a small maxDataFileSize + MaxDataFileSize: 10 * 1024, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + dmReadyCh := make(chan struct{}) + go func() { _ = dm.Run(ctx, dmReadyCh) }() + <-dmReadyCh + + time.Sleep(5 * time.Second) + + contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + // test insert from scratch (no current entries) + actions := []*Action{} + for i := 0; i < 2000; i++ { + action := &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%04d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)), + } + actions = append(actions, action) + } + + currentEntries, err := doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // ensure that at least three datafiles are created + curDataStatus, err := dm.GetLastDataStatus() + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if len(curDataStatus.Files["datatype01"]) < 3 { + t.Fatalf("expected at least 3 datafiles, got: %d", len(curDataStatus.Files["datatype01"])) + } + + for i := 0; i < 2000; i++ { + id := fmt.Sprintf("object%04d", i) + + er, err := dm.Read("datatype01", id) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + data, err := ioutil.ReadAll(er) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if !reflect.DeepEqual(data, currentEntries[id].Data) { + t.Fatalf("expected data: %v, got data: %v", currentEntries[id].Data, data) + } + } +} + func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error { // read the data file curDataStatus, err := dm.GetLastDataStatus()