// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.

package datamanager

import (
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"os"
	"path"
	"reflect"
	"sort"
	"strings"
	"testing"
	"time"

	"agola.io/agola/internal/objectstorage"
	"agola.io/agola/internal/testutil"
	"agola.io/agola/internal/util"

	"github.com/google/go-cmp/cmp"
	"go.uber.org/zap"
	"go.uber.org/zap/zaptest"
	errors "golang.org/x/xerrors"
)

func setupEtcd(t *testing.T, logger *zap.Logger, dir string) *testutil.TestEmbeddedEtcd {
	tetcd, err := testutil.NewTestEmbeddedEtcd(t, logger, dir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	if err := tetcd.Start(); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	if err := tetcd.WaitUp(30 * time.Second); err != nil {
		t.Fatalf("error waiting on etcd up: %v", err)
	}
	return tetcd
}

func shutdownEtcd(tetcd *testutil.TestEmbeddedEtcd) {
	if tetcd.Etcd != nil {
		_ = tetcd.Kill()
	}
}

func TestEtcdReset(t *testing.T) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)

	ctx, cancel := context.WithCancel(context.Background())

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath:        "basepath",
		E:               tetcd.TestEtcd.Store,
		OST:             objectstorage.NewObjStorage(ost, "/"),
		EtcdWalsKeepNum: 10,
		DataTypes:       []string{"datatype01"},
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})

	t.Logf("starting datamanager")
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	actions := []*Action{
		{
			ActionType: ActionTypePut,
			DataType:   "datatype01",
			Data:       []byte("{}"),
		},
	}

	for i := 0; i < 20; i++ {
		objectID := fmt.Sprintf("object%02d", i)
		actions[0].ID = objectID
		if _, err := dm.WriteWal(ctx, actions, nil); err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	// wait for wal to be committed storage
	time.Sleep(5 * time.Second)

	t.Logf("stopping datamanager")
	cancel()

	// Reset etcd
	t.Logf("stopping etcd")
	shutdownEtcd(tetcd)

	t.Logf("resetting etcd")
	os.RemoveAll(etcdDir)
	t.Logf("starting etcd")
	tetcd = setupEtcd(t, logger, etcdDir)
	if err := tetcd.Start(); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer shutdownEtcd(tetcd)

	ctx, cancel = context.WithCancel(context.Background())
	defer cancel()
	dmConfig = &DataManagerConfig{
		BasePath:        "basepath",
		E:               tetcd.TestEtcd.Store,
		OST:             objectstorage.NewObjStorage(ost, "/"),
		EtcdWalsKeepNum: 10,
		DataTypes:       []string{"datatype01"},
	}
	dm, err = NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh = make(chan struct{})

	t.Logf("starting datamanager")
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	for i := 0; i < 20; i++ {
		objectID := fmt.Sprintf("object%02d", i)
		_, _, err = dm.ReadObject("datatype01", objectID, nil)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}
}

func TestEtcdResetWalsGap(t *testing.T) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)

	ctx, cancel := context.WithCancel(context.Background())

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath:        "basepath",
		E:               tetcd.TestEtcd.Store,
		OST:             objectstorage.NewObjStorage(ost, "/"),
		EtcdWalsKeepNum: 10,
		DataTypes:       []string{"datatype01"},
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})

	t.Logf("starting datamanager")
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	actions := []*Action{
		{
			ActionType: ActionTypePut,
			DataType:   "datatype01",
			Data:       []byte("{}"),
		},
	}

	for i := 0; i < 20; i++ {
		objectID := fmt.Sprintf("object%02d", i)
		actions[0].ID = objectID
		if _, err := dm.WriteWal(ctx, actions, nil); err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	// wait for wal to be committed storage
	time.Sleep(5 * time.Second)

	t.Logf("stopping datamanager")
	cancel()

	t.Logf("stopping etcd")
	// Reset etcd
	shutdownEtcd(tetcd)

	t.Logf("resetting etcd")
	os.RemoveAll(etcdDir)
	t.Logf("starting etcd")
	tetcd = setupEtcd(t, logger, etcdDir)
	if err := tetcd.Start(); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer shutdownEtcd(tetcd)

	// Remove a wal in the middle
	doneCh := make(chan struct{})
	defer close(doneCh)

	walStatusFiles := []string{}
	for object := range dm.ost.List(path.Join(dm.basePath, storageWalsStatusDir)+"/", "", true, doneCh) {
		if object.Err != nil {
			t.Fatalf("unexpected err: %v", err)
		}

		walStatusFiles = append(walStatusFiles, object.Path)
	}
	if len(walStatusFiles) < 20 {
		t.Fatalf("exptected at least 20 wals, got: %d wals", len(walStatusFiles))
	}

	removeIndex := 10
	if err := dm.ost.DeleteObject(walStatusFiles[removeIndex]); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	errorWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex+1]), path.Ext(walStatusFiles[removeIndex+1]))
	prevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex]), path.Ext(walStatusFiles[removeIndex]))
	expectedPrevWalSequence := strings.TrimSuffix(path.Base(walStatusFiles[removeIndex-1]), path.Ext(walStatusFiles[removeIndex-1]))

	ctx, cancel = context.WithCancel(context.Background())
	defer cancel()
	dmConfig = &DataManagerConfig{
		BasePath:        "basepath",
		E:               tetcd.TestEtcd.Store,
		OST:             objectstorage.NewObjStorage(ost, "/"),
		EtcdWalsKeepNum: 10,
		DataTypes:       []string{"datatype01"},
	}
	dm, err = NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh = make(chan struct{})

	expectedErr := errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", errorWalSequence, prevWalSequence, expectedPrevWalSequence)
	err = dm.InitEtcd(ctx, nil)
	if err == nil {
		t.Fatalf("expected err: %q, got nil error", expectedErr)
	}
	if expectedErr.Error() != err.Error() {
		t.Fatalf("expected err: %q, got err %q", expectedErr, err)
	}
}

func TestConcurrentUpdate(t *testing.T) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		E:               tetcd.TestEtcd.Store,
		OST:             objectstorage.NewObjStorage(ost, "/"),
		EtcdWalsKeepNum: 10,
		DataTypes:       []string{"datatype01"},
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	actions := []*Action{
		{
			ActionType: ActionTypePut,
			ID:         "object01",
			DataType:   "datatype01",
			Data:       []byte("{}"),
		},
	}

	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	cgNames := []string{"changegroup01", "changegroup02"}
	cgt, err := dm.GetChangeGroupsUpdateToken(cgNames)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// populate with a wal
	cgt, err = dm.WriteWal(ctx, actions, cgt)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// this must work successfully
	oldcgt := cgt
	cgt, err = dm.WriteWal(ctx, actions, cgt)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// this must fail since we are using the old cgt
	_, err = dm.WriteWal(ctx, actions, oldcgt)
	if err != ErrConcurrency {
		t.Fatalf("expected err: %v, got %v", ErrConcurrency, err)
	}

	oldcgt = cgt
	// this must work successfully
	_, err = dm.WriteWal(ctx, actions, cgt)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// this must fail since we are using the old cgt
	_, err = dm.WriteWal(ctx, actions, oldcgt)
	if err != ErrConcurrency {
		t.Fatalf("expected err: %v, got %v", ErrConcurrency, err)
	}
}

func TestEtcdWalCleaner(t *testing.T) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	walKeepNum := 10
	dmConfig := &DataManagerConfig{
		E:                    tetcd.TestEtcd.Store,
		OST:                  objectstorage.NewObjStorage(ost, "/"),
		EtcdWalsKeepNum:      walKeepNum,
		DataTypes:            []string{"datatype01"},
		MinCheckpointWalsNum: 1,
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	actions := []*Action{
		{
			ActionType: ActionTypePut,
			ID:         "object01",
			DataType:   "datatype01",
			Data:       []byte("{}"),
		},
	}

	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	for i := 0; i < 20; i++ {
		if _, err := dm.WriteWal(ctx, actions, nil); err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	if err := dm.checkpoint(ctx, true); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	if err := dm.etcdWalCleaner(ctx); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	walsCount := 0
	for range dm.ListEtcdWals(ctx, 0) {
		walsCount++
	}
	if walsCount != walKeepNum {
		t.Fatalf("expected %d wals in etcd, got %d wals", walKeepNum, walsCount)
	}
}

func TestReadObject(t *testing.T) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		E:   tetcd.TestEtcd.Store,
		OST: objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01"},
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	actions := []*Action{}
	for i := 0; i < 20; i++ {
		actions = append(actions, &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d" }`, i)),
		})
	}

	// populate with a wal
	_, err = dm.WriteWal(ctx, actions, nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// wait for the event to be read
	time.Sleep(500 * time.Millisecond)
	// should read it
	_, _, err = dm.ReadObject("datatype01", "object1", nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	_, _, err = dm.ReadObject("datatype01", "object19", nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	actions = []*Action{}
	for i := 0; i < 10; i++ {
		actions = append(actions, &Action{
			ActionType: ActionTypeDelete,
			ID:         fmt.Sprintf("object%d", i),
			DataType:   "datatype01",
		})
	}

	_, err = dm.WriteWal(ctx, actions, nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// wait for the event to be read
	time.Sleep(500 * time.Millisecond)

	// test read from changes (since not checkpoint yet)

	// should not exists
	_, _, err = dm.ReadObject("datatype01", "object1", nil)
	if !util.IsNotExist(err) {
		t.Fatalf("expected err %v, got: %v", &util.ErrNotExist{}, err)
	}
	// should exist
	_, _, err = dm.ReadObject("datatype01", "object19", nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// do a checkpoint and wal clean
	if err := dm.checkpoint(ctx, true); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	if err := dm.etcdWalCleaner(ctx); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// wait for the event to be read
	time.Sleep(500 * time.Millisecond)

	// test read from data

	// should not exists
	_, _, err = dm.ReadObject("datatype01", "object1", nil)
	if !util.IsNotExist(err) {
		t.Fatalf("expected err %v, got: %v", &util.ErrNotExist{}, err)
	}
	// should exist
	_, _, err = dm.ReadObject("datatype01", "object19", nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
}

func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, actionGroups [][]*Action, currentEntries map[string]*DataEntry) (map[string]*DataEntry, error) {
	expectedEntries := map[string]*DataEntry{}
	for _, e := range currentEntries {
		expectedEntries[e.ID] = e
	}

	for _, actionGroup := range actionGroups {
		for _, action := range actionGroup {
			switch action.ActionType {
			case ActionTypePut:
				expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
			case ActionTypeDelete:
				delete(expectedEntries, action.ID)
			}
		}
	}

	for _, actionGroup := range actionGroups {
		// populate with a wal
		_, err := dm.WriteWal(ctx, actionGroup, nil)
		if err != nil {
			return nil, err
		}
	}

	// wait for the event to be read
	time.Sleep(500 * time.Millisecond)

	// do a checkpoint
	if err := dm.checkpoint(ctx, true); err != nil {
		return nil, err
	}

	if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
		return nil, err
	}

	return expectedEntries, nil
}

func checkDataFiles(ctx context.Context, t *testing.T, dm *DataManager, expectedEntriesMap map[string]*DataEntry) error {
	// read the data file
	curDataStatus, err := dm.GetLastDataStatus()
	if err != nil {
		return err
	}

	allEntriesMap := map[string]*DataEntry{}

	for dataType := range curDataStatus.Files {
		var prevLastEntryID string
		for i, file := range curDataStatus.Files[dataType] {
			dataFileIndexf, err := dm.ost.ReadObject(dm.DataFileIndexPath(dataType, file.ID))
			if err != nil {
				return err
			}
			var dataFileIndex *DataFileIndex
			dec := json.NewDecoder(dataFileIndexf)
			err = dec.Decode(&dataFileIndex)
			if err != nil {
				dataFileIndexf.Close()
				return err
			}

			dataFileIndexf.Close()
			dataEntriesMap := map[string]*DataEntry{}
			dataEntries := []*DataEntry{}
			dataf, err := dm.ost.ReadObject(dm.DataFilePath(dataType, file.ID))
			if err != nil {
				return err
			}
			dec = json.NewDecoder(dataf)
			var prevEntryID string
			for {
				var de *DataEntry

				err := dec.Decode(&de)
				if err == io.EOF {
					// all done
					break
				}
				if err != nil {
					dataf.Close()
					return err
				}
				// check that there are no duplicate entries
				if _, ok := allEntriesMap[de.ID]; ok {
					return fmt.Errorf("duplicate entry id: %s", de.ID)
				}
				// check that the entries are in order
				if de.ID < prevEntryID {
					return fmt.Errorf("previous entry id: %s greater than entry id: %s", prevEntryID, de.ID)
				}

				dataEntriesMap[de.ID] = de
				dataEntries = append(dataEntries, de)
				allEntriesMap[de.ID] = de
			}
			dataf.Close()

			// check that the index matches the entries
			if len(dataFileIndex.Index) != len(dataEntriesMap) {
				return fmt.Errorf("index entries: %d different than data entries: %d", len(dataFileIndex.Index), len(dataEntriesMap))
			}
			indexIDs := make([]string, len(dataFileIndex.Index))
			entriesIDs := make([]string, len(dataEntriesMap))
			for id := range dataFileIndex.Index {
				indexIDs = append(indexIDs, id)
			}
			for id := range dataEntriesMap {
				entriesIDs = append(entriesIDs, id)
			}
			sort.Strings(indexIDs)
			sort.Strings(entriesIDs)
			if !reflect.DeepEqual(indexIDs, entriesIDs) {
				return fmt.Errorf("index entries ids don't match data entries ids: index: %v, data: %v", indexIDs, entriesIDs)
			}

			if file.LastEntryID != dataEntries[len(dataEntries)-1].ID {
				return fmt.Errorf("lastEntryID for datafile %d: %s is different than real last entry id: %s", i, file.LastEntryID, dataEntries[len(dataEntries)-1].ID)
			}

			// check that all the files are in order
			if file.LastEntryID == prevLastEntryID {
				return fmt.Errorf("lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s", i, file.LastEntryID, prevLastEntryID)
			}
			if file.LastEntryID < prevLastEntryID {
				return fmt.Errorf("lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s", i, file.LastEntryID, prevLastEntryID)
			}
			prevLastEntryID = file.LastEntryID
		}
	}

	// check that the number of entries is right
	if len(allEntriesMap) != len(expectedEntriesMap) {
		return fmt.Errorf("expected %d total entries, got %d", len(expectedEntriesMap), len(allEntriesMap))
	}
	if !reflect.DeepEqual(expectedEntriesMap, allEntriesMap) {
		return fmt.Errorf("expected entries don't match current entries")
	}

	return nil
}

// TODO(sgotti) some fuzzy testing will be really good
func TestCheckpoint(t *testing.T) {
	tests := []struct {
		name     string
		basePath string
	}{
		{
			name:     "test with empty basepath",
			basePath: "",
		},
		{
			name:     "test with relative basepath",
			basePath: "base/path",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			testCheckpoint(t, tt.basePath)
		})
	}
}

func testCheckpoint(t *testing.T, basePath string) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath: basePath,
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"

	// test insert from scratch (no current entries)
	actions := []*Action{}
	for i := 200; i < 400; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}

	currentEntries, err := doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// test delete of all existing entries
	actions = []*Action{}
	for i := 200; i < 400; i++ {
		actions = append(actions, &Action{
			ActionType: ActionTypeDelete,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
		})
	}

	currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// test insert from scratch again (no current entries)
	actions = []*Action{}
	for i := 200; i < 400; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}

	currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// test delete some existing entries in the middle
	actions = []*Action{}
	for i := 250; i < 350; i++ {
		action := &Action{
			ActionType: ActionTypeDelete,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
		}
		actions = append(actions, action)
	}

	currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// test delete of unexisting entries
	actions = []*Action{}
	for i := 1000; i < 1010; i++ {
		action := &Action{
			ActionType: ActionTypeDelete,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
		}
		actions = append(actions, action)
	}

	currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// test update and insert at the end
	actions = []*Action{}
	for i := 300; i < 500; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}

	currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// test update and insert at the start
	actions = []*Action{}
	for i := 0; i < 300; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}

	currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// test multiple wals with different insert, updated, deletes
	actionGroups := [][]*Action{}
	for i := 0; i < 150; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	actionGroups = append(actionGroups, actions)
	for i := 50; i < 100; i++ {
		action := &Action{
			ActionType: ActionTypeDelete,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	actionGroups = append(actionGroups, actions)
	for i := 250; i < 300; i++ {
		action := &Action{
			ActionType: ActionTypeDelete,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	for i := 70; i < 80; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	actionGroups = append(actionGroups, actions)

	_, err = doAndCheckCheckpoint(t, ctx, dm, actionGroups, currentEntries)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	if err := dm.CleanOldCheckpoints(ctx); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
}

func TestRead(t *testing.T) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath: "basepath",
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"

	// test insert from scratch (no current entries)
	actions := []*Action{}
	for i := 0; i < 2000; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}

	currentEntries, err := doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, nil)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// ensure that at least three datafiles are created
	curDataStatus, err := dm.GetLastDataStatus()
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	if len(curDataStatus.Files["datatype01"]) < 3 {
		t.Fatalf("expected at least 3 datafiles, got: %d", len(curDataStatus.Files["datatype01"]))
	}

	for i := 0; i < 2000; i++ {
		id := fmt.Sprintf("object%04d", i)

		er, err := dm.Read("datatype01", id)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
		data, err := ioutil.ReadAll(er)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
		if !reflect.DeepEqual(data, currentEntries[id].Data) {
			t.Fatalf("expected data: %v, got data: %v", currentEntries[id].Data, data)
		}
	}
}

func TestClean(t *testing.T) {
	tests := []struct {
		name     string
		basePath string
	}{
		{
			name:     "test with empty basepath",
			basePath: "",
		},
		{
			name:     "test with relative basepath",
			basePath: "base/path",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			testClean(t, tt.basePath)
		})
	}
}

func testClean(t *testing.T, basePath string) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath: basePath,
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"

	var currentEntries map[string]*DataEntry
	actions := []*Action{}
	for n := 0; n < 10; n++ {
		for i := 0; i < 400; i++ {
			action := &Action{
				ActionType: ActionTypePut,
				ID:         fmt.Sprintf("object%04d", i),
				DataType:   "datatype01",
				Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
			}
			actions = append(actions, action)
		}

		currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	// get the last data status sequence
	lastDataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	if err := dm.CleanOldCheckpoints(ctx); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// check last data file
	if err := checkDataFiles(ctx, t, dm, currentEntries); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// check that only the last dataStatusToKeep status files are left
	curDataStatusSequences, err := dm.GetLastDataStatusSequences(1000)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	if len(curDataStatusSequences) != dataStatusToKeep {
		t.Fatalf("expected %d data status files, got %d: %s", dataStatusToKeep, len(curDataStatusSequences), curDataStatusSequences)
	}
	if diff := cmp.Diff(lastDataStatusSequences, curDataStatusSequences); diff != "" {
		t.Fatalf("different data status sequences: %v", diff)
	}
}

func TestCleanConcurrentCheckpoint(t *testing.T) {
	tests := []struct {
		name     string
		basePath string
	}{
		{
			name:     "test with empty basepath",
			basePath: "",
		},
		{
			name:     "test with relative basepath",
			basePath: "base/path",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			testCleanConcurrentCheckpoint(t, tt.basePath)
		})
	}
}

func testCleanConcurrentCheckpoint(t *testing.T, basePath string) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath: basePath,
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"

	var currentEntries map[string]*DataEntry
	actions := []*Action{}
	for n := 0; n < 10; n++ {
		for i := 0; i < 400; i++ {
			action := &Action{
				ActionType: ActionTypePut,
				ID:         fmt.Sprintf("object%04d", i),
				DataType:   "datatype01",
				Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
			}
			actions = append(actions, action)
		}

		currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	// get the current last data status sequences before doing other actions and checkpoints
	dataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	for i := 0; i < 400; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}

	if _, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	if err := dm.cleanOldCheckpoints(ctx, dataStatusSequences); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// check the datastatus after clean
	curDataStatus, err := dm.GetLastDataStatus()
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	if curDataStatus.DataSequence <= dataStatusSequences[0].String() {
		t.Fatalf("expected data status sequence greater than %q", dataStatusSequences[0])
	}

	// check last data file
	if err := checkDataFiles(ctx, t, dm, currentEntries); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
}

func TestStorageWalCleaner(t *testing.T) {
	tests := []struct {
		name     string
		basePath string
	}{
		{
			name:     "test with empty basepath",
			basePath: "",
		},
		{
			name:     "test with relative basepath",
			basePath: "base/path",
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			testStorageWalCleaner(t, tt.basePath)
		})
	}
}

func testStorageWalCleaner(t *testing.T, basePath string) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)
	defer shutdownEtcd(tetcd)

	ctx := context.Background()

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath: basePath,
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"

	var currentEntries map[string]*DataEntry
	actions := []*Action{}
	for n := 0; n < 10; n++ {
		for i := 0; i < 400; i++ {
			action := &Action{
				ActionType: ActionTypePut,
				ID:         fmt.Sprintf("object%04d", i),
				DataType:   "datatype01",
				Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
			}
			actions = append(actions, action)
		}

		currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	// get the last data status sequence
	lastDataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	// Use the first dataStatusToKeep data status
	dataStatus, err := dm.GetDataStatus(lastDataStatusSequences[dataStatusToKeep-1])
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	// get the list of expected wals
	doneCh := make(chan struct{})
	defer close(doneCh)

	expectedWalStatusFiles := []string{}
	expectedWalDataFiles := []string{}
	for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) {
		if object.Err != nil {
			t.Fatalf("unexpected err: %v", err)
		}

		name := path.Base(object.Path)
		ext := path.Ext(name)
		walSequence := strings.TrimSuffix(name, ext)

		if walSequence < dataStatus.WalSequence {
			continue
		}
		header, err := dm.ReadWal(walSequence)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}

		expectedWalStatusFiles = append(expectedWalStatusFiles, object.Path)
		expectedWalDataFiles = append(expectedWalDataFiles, dm.storageWalDataFile(header.WalDataFileID))
	}
	sort.Strings(expectedWalDataFiles)

	if err := dm.CleanOldCheckpoints(ctx); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	if err := dm.storageWalCleaner(ctx); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	currentWalStatusFiles := []string{}
	currentWalDataFiles := []string{}
	for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) {
		if object.Err != nil {
			t.Fatalf("unexpected err: %v", err)
		}

		currentWalStatusFiles = append(currentWalStatusFiles, object.Path)
	}
	for object := range dm.ost.List(dm.storageWalDataDir()+"/", "", true, doneCh) {
		if object.Err != nil {
			t.Fatalf("unexpected err: %v", err)
		}

		currentWalDataFiles = append(currentWalDataFiles, object.Path)
	}
	sort.Strings(currentWalDataFiles)
	if diff := cmp.Diff(currentWalStatusFiles, expectedWalStatusFiles); diff != "" {
		t.Fatalf("different wal status files: %v", diff)
	}
	if diff := cmp.Diff(currentWalDataFiles, expectedWalDataFiles); diff != "" {
		t.Fatalf("different wal data files: %v", diff)
	}
}

func TestExportImport(t *testing.T) {
	dir, err := ioutil.TempDir("", "agola")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer os.RemoveAll(dir)

	logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel))

	etcdDir, err := ioutil.TempDir(dir, "etcd")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	tetcd := setupEtcd(t, logger, etcdDir)

	ctx, cancel := context.WithCancel(context.Background())

	ostDir, err := ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	ost, err := objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	dmConfig := &DataManagerConfig{
		BasePath: "basepath",
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01", "datatype02"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
	}
	dm, err := NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh := make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"

	expectedEntries := map[string]*DataEntry{}

	// test insert from scratch (no current entries)
	actionGroups := [][]*Action{}
	actions := []*Action{}
	for i := 200; i < 400; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	actionGroups = append(actionGroups, actions)

	actions = []*Action{}
	for i := 600; i < 1000; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype02",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	actionGroups = append(actionGroups, actions)

	for _, actionGroup := range actionGroups {
		for _, action := range actionGroup {
			switch action.ActionType {
			case ActionTypePut:
				expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
			case ActionTypeDelete:
				delete(expectedEntries, action.ID)
			}
		}
	}

	for _, actionGroup := range actionGroups {
		// populate with a wal
		_, err := dm.WriteWal(ctx, actionGroup, nil)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	// wait for the event to be read
	time.Sleep(500 * time.Millisecond)

	var export bytes.Buffer
	if err := dm.Export(ctx, &export); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	t.Logf("stopping datamanager")
	cancel()

	time.Sleep(5 * time.Second)

	t.Logf("stopping etcd")
	// Reset etcd
	shutdownEtcd(tetcd)

	t.Logf("resetting etcd")
	os.RemoveAll(etcdDir)
	t.Logf("starting etcd")
	tetcd = setupEtcd(t, logger, etcdDir)
	if err := tetcd.Start(); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	defer shutdownEtcd(tetcd)

	ostDir, err = ioutil.TempDir(dir, "ost")
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	ost, err = objectstorage.NewPosix(ostDir)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	ctx, cancel = context.WithCancel(context.Background())

	dmConfig = &DataManagerConfig{
		BasePath: "basepath",
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01", "datatype02"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
		MaintenanceMode: true,
	}
	dm, err = NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh = make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)
	if err := dm.Import(ctx, &export); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	t.Logf("stopping datamanager")
	cancel()

	time.Sleep(5 * time.Second)

	ctx = context.Background()

	// restart datamanager in normal mode
	dmConfig = &DataManagerConfig{
		BasePath: "basepath",
		E:        tetcd.TestEtcd.Store,
		OST:      objectstorage.NewObjStorage(ost, "/"),
		// remove almost all wals to see that they are removed also from changes
		EtcdWalsKeepNum: 1,
		DataTypes:       []string{"datatype01", "datatype02"},
		// checkpoint also with only one wal
		MinCheckpointWalsNum: 1,
		// use a small maxDataFileSize
		MaxDataFileSize: 10 * 1024,
	}
	dm, err = NewDataManager(ctx, logger, dmConfig)
	if err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
	dmReadyCh = make(chan struct{})
	go func() { _ = dm.Run(ctx, dmReadyCh) }()
	<-dmReadyCh

	time.Sleep(5 * time.Second)

	if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	actionGroups = [][]*Action{}
	actions = []*Action{}
	for i := 400; i < 600; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype01",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	actionGroups = append(actionGroups, actions)

	actions = []*Action{}
	for i := 1000; i < 1400; i++ {
		action := &Action{
			ActionType: ActionTypePut,
			ID:         fmt.Sprintf("object%04d", i),
			DataType:   "datatype02",
			Data:       []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
		}
		actions = append(actions, action)
	}
	actionGroups = append(actionGroups, actions)

	for _, actionGroup := range actionGroups {
		for _, action := range actionGroup {
			switch action.ActionType {
			case ActionTypePut:
				expectedEntries[action.ID] = &DataEntry{ID: action.ID, DataType: action.DataType, Data: action.Data}
			case ActionTypeDelete:
				delete(expectedEntries, action.ID)
			}
		}
	}

	for _, actionGroup := range actionGroups {
		// populate with a wal
		_, err := dm.WriteWal(ctx, actionGroup, nil)
		if err != nil {
			t.Fatalf("unexpected err: %v", err)
		}
	}

	// wait for the event to be read
	time.Sleep(500 * time.Millisecond)

	if err := dm.checkpoint(ctx, false); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}

	if err := checkDataFiles(ctx, t, dm, expectedEntries); err != nil {
		t.Fatalf("unexpected err: %v", err)
	}
}