From 975bc810b3674ce020fb3bd383bdc11eb07f25c0 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Thu, 28 Mar 2019 16:01:08 +0100 Subject: [PATCH] wal: return error when changes isn't initialized --- internal/wal/wal.go | 18 +++++++++++++++--- internal/wal/wal_test.go | 11 +++++++---- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 5dc0652..cafaf82 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -155,12 +155,15 @@ type ChangeGroupsUpdateToken struct { type changeGroupsRevisions map[string]int64 -func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) *ChangeGroupsUpdateToken { +func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) { w.changes.Lock() + defer w.changes.Unlock() + if !w.changes.initialized { + return nil, errors.Errorf("wal changes not ready") + } revision := w.changes.curRevision() cgr := w.changes.getChangeGroups(cgNames) - w.changes.Unlock() - return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr} + return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil } func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateToken) *ChangeGroupsUpdateToken { @@ -187,6 +190,10 @@ func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateTok func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) { w.changes.Lock() + if !w.changes.initialized { + w.changes.Unlock() + return nil, nil, errors.Errorf("wal changes not ready") + } walseq, ok := w.changes.getPut(p) revision := w.changes.curRevision() cgr := w.changes.getChangeGroups(cgNames) @@ -234,6 +241,11 @@ func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-cha startWith = w.toStorageDataPath(startWith) w.changes.Lock() + if !w.changes.initialized { + w.changes.Unlock() + objectCh <- objectstorage.ObjectInfo{Err: errors.Errorf("wal changes not ready")} + return objectCh + } changesList := w.changesList(w.changes.pathsOrdered, prefix, startWith, recursive) deletedChangesMap := w.changes.getDeletesMap() w.changes.Unlock() diff --git a/internal/wal/wal_test.go b/internal/wal/wal_test.go index f3fd166..752172f 100644 --- a/internal/wal/wal_test.go +++ b/internal/wal/wal_test.go @@ -184,19 +184,22 @@ func TestConcurrentUpdate(t *testing.T) { <-walReadyCh cgNames := []string{"changegroup01", "changegroup02"} - cgt := wal.GetChangeGroupsUpdateToken(cgNames) + cgt, err := wal.GetChangeGroupsUpdateToken(cgNames) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } // populate with a wal cgt, err = wal.WriteWal(ctx, actions, cgt) if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("unexpected err: %v", err) } // this must work successfully oldcgt := cgt cgt, err = wal.WriteWal(ctx, actions, cgt) if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("unexpected err: %v", err) } // this must fail since we are using the old cgt @@ -209,7 +212,7 @@ func TestConcurrentUpdate(t *testing.T) { // this must work successfully cgt, err = wal.WriteWal(ctx, actions, cgt) if err != nil { - t.Fatalf("err: %v", err) + t.Fatalf("unexpected err: %v", err) } // this must fail since we are using the old cgt