datamanager: remove old storage wals

Remove all wals not required by the existing data status files and not existing
in etcd.
This commit is contained in:
Simone Gotti 2019-11-08 10:10:56 +01:00
parent 9fbfeb9d13
commit 35e1ec0e15
4 changed files with 365 additions and 21 deletions

View File

@ -529,6 +529,43 @@ func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
return bytes.NewReader(de.Data), nil return bytes.NewReader(de.Data), nil
} }
func (d *DataManager) GetFirstDataStatusSequences(n int) ([]*sequence.Sequence, error) {
if n < 1 {
return nil, errors.Errorf("n must be greater than 0")
}
dataStatusSequences := []*sequence.Sequence{}
c := 0
doneCh := make(chan struct{})
defer close(doneCh)
for object := range d.ost.List(d.storageDataDir()+"/", "", false, doneCh) {
if object.Err != nil {
return nil, object.Err
}
if m := DataStatusFileRegexp.FindStringSubmatch(path.Base(object.Path)); m != nil {
seq, err := sequence.Parse(m[1])
if err != nil {
d.log.Warnf("cannot parse sequence for data status file %q", object.Path)
continue
}
dataStatusSequences = append(dataStatusSequences, seq)
c++
} else {
d.log.Warnf("bad file %q found in storage data dir", object.Path)
}
if c >= n {
break
}
}
if len(dataStatusSequences) == 0 {
return nil, ostypes.ErrNotExist
}
return dataStatusSequences, nil
}
func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error) { func (d *DataManager) GetLastDataStatusSequences(n int) ([]*sequence.Sequence, error) {
if n < 1 { if n < 1 {
return nil, errors.Errorf("n must be greater than 0") return nil, errors.Errorf("n must be greater than 0")
@ -582,6 +619,15 @@ func (d *DataManager) GetDataStatus(dataSequence *sequence.Sequence) (*DataStatu
return dataStatus, dec.Decode(&dataStatus) return dataStatus, dec.Decode(&dataStatus)
} }
func (d *DataManager) GetFirstDataStatusSequence() (*sequence.Sequence, error) {
dataStatusSequences, err := d.GetFirstDataStatusSequences(1)
if err != nil {
return nil, err
}
return dataStatusSequences[0], nil
}
func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) { func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) {
dataStatusSequences, err := d.GetLastDataStatusSequences(1) dataStatusSequences, err := d.GetLastDataStatusSequences(1)
if err != nil { if err != nil {
@ -591,6 +637,15 @@ func (d *DataManager) GetLastDataStatusSequence() (*sequence.Sequence, error) {
return dataStatusSequences[0], nil return dataStatusSequences[0], nil
} }
func (d *DataManager) GetFirstDataStatus() (*DataStatus, error) {
dataStatusSequence, err := d.GetFirstDataStatusSequence()
if err != nil {
return nil, err
}
return d.GetDataStatus(dataStatusSequence)
}
func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
dataStatusSequence, err := d.GetLastDataStatusSequence() dataStatusSequence, err := d.GetLastDataStatusSequence()
if err != nil { if err != nil {

View File

@ -34,8 +34,13 @@ import (
// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one // * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one
const ( const (
DefaultSyncInterval = 5 * time.Second
DefaultCheckpointInterval = 10 * time.Second DefaultCheckpointInterval = 10 * time.Second
DefaultCheckpointCleanInterval = 5 * time.Minute DefaultCheckpointCleanInterval = 5 * time.Minute
DefaultEtcdWalCleanInterval = 2 * time.Second
DefaultStorageWalCleanInterval = 5 * time.Minute
DefaultCompactChangeGroupsInterval = 1 * time.Second
DefaultEtcdPingerInterval = 1 * time.Second
DefaultEtcdWalsKeepNum = 100 DefaultEtcdWalsKeepNum = 100
DefaultMinCheckpointWalsNum = 100 DefaultMinCheckpointWalsNum = 100
) )
@ -66,6 +71,7 @@ var (
etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock") etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock")
etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock")
etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock")
etcdStorageWalCleanerLockKey = path.Join(etcdWalBaseDir, "storagewalcleanerlock")
etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups")
etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev")
@ -154,12 +160,20 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
return d, nil return d, nil
} }
func (d *DataManager) storageWalStatusDir() string {
return path.Join(d.basePath, storageWalsStatusDir)
}
func (d *DataManager) storageWalStatusFile(walSeq string) string { func (d *DataManager) storageWalStatusFile(walSeq string) string {
return path.Join(d.basePath, storageWalsStatusDir, walSeq) return path.Join(d.storageWalStatusDir(), walSeq)
}
func (d *DataManager) storageWalDataDir() string {
return path.Join(d.basePath, storageWalsDataDir)
} }
func (d *DataManager) storageWalDataFile(walFileID string) string { func (d *DataManager) storageWalDataFile(walFileID string) string {
return path.Join(d.basePath, storageWalsDataDir, walFileID) return path.Join(d.storageWalDataDir(), walFileID)
} }
func (d *DataManager) storageDataDir() string { func (d *DataManager) storageDataDir() string {
@ -239,7 +253,8 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
go d.syncLoop(ctx) go d.syncLoop(ctx)
go d.checkpointLoop(ctx) go d.checkpointLoop(ctx)
go d.checkpointCleanLoop(ctx) go d.checkpointCleanLoop(ctx)
go d.walCleanerLoop(ctx) go d.etcdWalCleanerLoop(ctx)
go d.storageWalCleanerLoop(ctx)
go d.compactChangeGroupsLoop(ctx) go d.compactChangeGroupsLoop(ctx)
go d.etcdPingerLoop(ctx) go d.etcdPingerLoop(ctx)

View File

@ -394,7 +394,7 @@ func TestConcurrentUpdate(t *testing.T) {
} }
} }
func TestWalCleaner(t *testing.T) { func TestEtcdWalCleaner(t *testing.T) {
dir, err := ioutil.TempDir("", "agola") dir, err := ioutil.TempDir("", "agola")
if err != nil { if err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
@ -455,7 +455,7 @@ func TestWalCleaner(t *testing.T) {
if err := dm.checkpoint(ctx, true); err != nil { if err := dm.checkpoint(ctx, true); err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
if err := dm.walCleaner(ctx); err != nil { if err := dm.etcdWalCleaner(ctx); err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -573,7 +573,7 @@ func TestReadObject(t *testing.T) {
if err := dm.checkpoint(ctx, true); err != nil { if err := dm.checkpoint(ctx, true); err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
if err := dm.walCleaner(ctx); err != nil { if err := dm.etcdWalCleaner(ctx); err != nil {
t.Fatalf("unexpected err: %v", err) t.Fatalf("unexpected err: %v", err)
} }
@ -1316,6 +1316,168 @@ func testCleanConcurrentCheckpoint(t *testing.T, basePath string) {
} }
} }
func TestStorageWalCleaner(t *testing.T) {
tests := []struct {
name string
basePath string
}{
{
name: "test with empty basepath",
basePath: "",
},
{
name: "test with relative basepath",
basePath: "base/path",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testStorageWalCleaner(t, tt.basePath)
})
}
}
func testStorageWalCleaner(t *testing.T, basePath string) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx := context.Background()
ostDir, err := ioutil.TempDir(dir, "ost")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ost, err := posix.New(ostDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmConfig := &DataManagerConfig{
BasePath: basePath,
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01"},
// checkpoint also with only one wal
MinCheckpointWalsNum: 1,
// use a small maxDataFileSize
MaxDataFileSize: 10 * 1024,
}
dm, err := NewDataManager(ctx, logger, dmConfig)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmReadyCh := make(chan struct{})
go func() { _ = dm.Run(ctx, dmReadyCh) }()
<-dmReadyCh
time.Sleep(5 * time.Second)
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
var currentEntries map[string]*DataEntry
actions := []*Action{}
for n := 0; n < 10; n++ {
for i := 0; i < 400; i++ {
action := &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%04d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d", "Contents": %s }`, i, contents)),
}
actions = append(actions, action)
}
currentEntries, err = doAndCheckCheckpoint(t, ctx, dm, [][]*Action{actions}, currentEntries)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// get the last data status sequence
lastDataStatusSequences, err := dm.GetLastDataStatusSequences(dataStatusToKeep)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// Use the first dataStatusToKeep data status
dataStatus, err := dm.GetDataStatus(lastDataStatusSequences[dataStatusToKeep-1])
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// get the list of expected wals
doneCh := make(chan struct{})
defer close(doneCh)
expectedWalStatusFiles := []string{}
expectedWalDataFiles := []string{}
for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) {
if object.Err != nil {
t.Fatalf("unexpected err: %v", err)
}
name := path.Base(object.Path)
ext := path.Ext(name)
walSequence := strings.TrimSuffix(name, ext)
if walSequence < dataStatus.WalSequence {
continue
}
header, err := dm.ReadWal(walSequence)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
expectedWalStatusFiles = append(expectedWalStatusFiles, object.Path)
expectedWalDataFiles = append(expectedWalDataFiles, dm.storageWalDataFile(header.WalDataFileID))
}
sort.Strings(expectedWalDataFiles)
if err := dm.CleanOldCheckpoints(ctx); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := dm.storageWalCleaner(ctx); err != nil {
t.Fatalf("unexpected err: %v", err)
}
currentWalStatusFiles := []string{}
currentWalDataFiles := []string{}
for object := range dm.ost.List(dm.storageWalStatusDir()+"/", "", true, doneCh) {
if object.Err != nil {
t.Fatalf("unexpected err: %v", err)
}
currentWalStatusFiles = append(currentWalStatusFiles, object.Path)
}
for object := range dm.ost.List(dm.storageWalDataDir()+"/", "", true, doneCh) {
if object.Err != nil {
t.Fatalf("unexpected err: %v", err)
}
currentWalDataFiles = append(currentWalDataFiles, object.Path)
}
sort.Strings(currentWalDataFiles)
if diff := cmp.Diff(currentWalStatusFiles, expectedWalStatusFiles); diff != "" {
t.Fatalf("different wal status files: %v", diff)
}
if diff := cmp.Diff(currentWalDataFiles, expectedWalDataFiles); diff != "" {
t.Fatalf("different wal data files: %v", diff)
}
}
func TestExportImport(t *testing.T) { func TestExportImport(t *testing.T) {
dir, err := ioutil.TempDir("", "agola") dir, err := ioutil.TempDir("", "agola")
if err != nil { if err != nil {

View File

@ -179,7 +179,7 @@ func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
startPath = d.storageWalStatusFile(start) startPath = d.storageWalStatusFile(start)
} }
for object := range d.ost.List(path.Join(d.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { for object := range d.ost.List(d.storageWalStatusDir()+"/", startPath, true, doneCh) {
if object.Err != nil { if object.Err != nil {
walCh <- &WalFile{ walCh <- &WalFile{
Err: object.Err, Err: object.Err,
@ -547,7 +547,7 @@ func (d *DataManager) syncLoop(ctx context.Context) {
d.log.Errorf("syncer error: %+v", err) d.log.Errorf("syncer error: %+v", err)
} }
sleepCh := time.NewTimer(5 * time.Second).C sleepCh := time.NewTimer(DefaultSyncInterval).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -751,14 +751,14 @@ func (d *DataManager) checkpointClean(ctx context.Context) error {
return nil return nil
} }
func (d *DataManager) walCleanerLoop(ctx context.Context) { func (d *DataManager) etcdWalCleanerLoop(ctx context.Context) {
for { for {
d.log.Debugf("walcleaner") d.log.Debugf("etcdwalcleaner")
if err := d.walCleaner(ctx); err != nil { if err := d.etcdWalCleaner(ctx); err != nil {
d.log.Errorf("walcleaner error: %v", err) d.log.Errorf("etcdwalcleaner error: %v", err)
} }
sleepCh := time.NewTimer(2 * time.Second).C sleepCh := time.NewTimer(DefaultEtcdWalCleanInterval).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -767,10 +767,10 @@ func (d *DataManager) walCleanerLoop(ctx context.Context) {
} }
} }
// walCleaner will clean already checkpointed wals from etcd // etcdWalCleaner will clean already checkpointed wals from etcd
// it must always keep at least one wal that is needed for resync operations // it must always keep at least one wal that is needed for resync operations
// from clients // from clients
func (d *DataManager) walCleaner(ctx context.Context) error { func (d *DataManager) etcdWalCleaner(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil { if err != nil {
return err return err
@ -826,13 +826,125 @@ func (d *DataManager) walCleaner(ctx context.Context) error {
return nil return nil
} }
func (d *DataManager) storageWalCleanerLoop(ctx context.Context) {
for {
d.log.Debugf("storagewalcleaner")
if err := d.storageWalCleaner(ctx); err != nil {
d.log.Errorf("storagewalcleaner error: %v", err)
}
sleepCh := time.NewTimer(DefaultStorageWalCleanInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
// storageWalCleaner will clean unneeded wals from the storage
func (d *DataManager) storageWalCleaner(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
defer session.Close()
m := concurrency.NewMutex(session, etcdStorageWalCleanerLockKey)
// TODO(sgotti) find a way to use a trylock so we'll just return if already
// locked. Currently multiple task updaters will enqueue and start when another
// finishes (unuseful and consume resources)
if err := m.Lock(ctx); err != nil {
return err
}
defer func() { _ = m.Unlock(ctx) }()
firstDataStatus, err := d.GetFirstDataStatus()
if err != nil {
return err
}
firstWalSequence := firstDataStatus.WalSequence
// get the first wal in etcd (in any state) and use it's wal sequence if
// it's lesser than the first data status wal sequence
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return err
}
if len(resp.Kvs) == 0 {
return errors.Errorf("no wals in etcd")
}
var walData WalData
if err := json.Unmarshal(resp.Kvs[0].Value, &walData); err != nil {
return err
}
if walData.WalSequence < firstWalSequence {
firstWalSequence = walData.WalSequence
}
doneCh := make(chan struct{})
defer close(doneCh)
for object := range d.ost.List(d.storageWalStatusDir()+"/", "", true, doneCh) {
if object.Err != nil {
return err
}
name := path.Base(object.Path)
ext := path.Ext(name)
walSequence := strings.TrimSuffix(name, ext)
// handle committed status file and related data file
if ext == ".committed" {
if walSequence >= firstWalSequence {
break
}
header, err := d.ReadWal(walSequence)
if err != nil {
return err
}
// first remove wal data file
walStatusFilePath := d.storageWalDataFile(header.WalDataFileID)
d.log.Infof("removing %q", walStatusFilePath)
if err := d.ost.DeleteObject(walStatusFilePath); err != nil {
if err != ostypes.ErrNotExist {
return err
}
}
// then remove wal status files
d.log.Infof("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
return err
}
}
}
// handle old checkpointed status file
// TODO(sgotti) remove this in future versions since .checkpointed files are not created anymore
if ext == ".checkpointed" {
d.log.Infof("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if err != ostypes.ErrNotExist {
return err
}
}
}
}
return nil
}
func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) { func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) {
for { for {
if err := d.compactChangeGroups(ctx); err != nil { if err := d.compactChangeGroups(ctx); err != nil {
d.log.Errorf("err: %+v", err) d.log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C sleepCh := time.NewTimer(DefaultCompactChangeGroupsInterval).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
@ -917,7 +1029,7 @@ func (d *DataManager) etcdPingerLoop(ctx context.Context) {
d.log.Errorf("err: %+v", err) d.log.Errorf("err: %+v", err)
} }
sleepCh := time.NewTimer(1 * time.Second).C sleepCh := time.NewTimer(DefaultEtcdPingerInterval).C
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return