From 445ef24daa1633a2c3113d7a72ec6b61b8c2f15e Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 18 Jul 2019 14:58:42 +0200 Subject: [PATCH] datamanager: add option to force a checkpoint --- internal/datamanager/datamanager_test.go | 6 +++--- internal/datamanager/wal.go | 10 +++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 1bf19c9..7023ecb 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -318,7 +318,7 @@ func TestWalCleaner(t *testing.T) { } } - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := dm.walCleaner(ctx); err != nil { @@ -436,7 +436,7 @@ func TestReadObject(t *testing.T) { } // do a checkpoint and wal clean - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { t.Fatalf("unexpected err: %v", err) } if err := dm.walCleaner(ctx); err != nil { @@ -489,7 +489,7 @@ func doAndCheckCheckpoint(t *testing.T, ctx context.Context, dm *DataManager, ac time.Sleep(500 * time.Millisecond) // do a checkpoint - if err := dm.checkpoint(ctx); err != nil { + if err := dm.checkpoint(ctx, true); err != nil { return nil, err } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index d98e2d0..c66cfb2 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -642,7 +642,7 @@ func (d *DataManager) sync(ctx context.Context) error { func (d *DataManager) checkpointLoop(ctx context.Context) { for { d.log.Debugf("checkpointer") - if err := d.checkpoint(ctx); err != nil { + if err := d.checkpoint(ctx, false); err != nil { d.log.Errorf("checkpoint error: %v", err) } @@ -656,7 +656,7 @@ func (d *DataManager) checkpointLoop(ctx context.Context) { } } -func (d *DataManager) checkpoint(ctx context.Context) error { +func (d *DataManager) checkpoint(ctx context.Context, force bool) error { session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err @@ -694,7 +694,11 @@ func (d *DataManager) checkpoint(ctx context.Context) error { } walsData = append(walsData, walData) } - if len(walsData) < d.minCheckpointWalsNum { + + if !force && len(walsData) < d.minCheckpointWalsNum { + return nil + } + if len(walsData) == 0 { return nil }