wal: return error when changes isn't initialized

This commit is contained in:
Simone Gotti 2019-03-28 16:01:08 +01:00
parent 65c425b22b
commit 975bc810b3
2 changed files with 22 additions and 7 deletions

View File

@ -155,12 +155,15 @@ type ChangeGroupsUpdateToken struct {
type changeGroupsRevisions map[string]int64
func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) *ChangeGroupsUpdateToken {
func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) {
w.changes.Lock()
defer w.changes.Unlock()
if !w.changes.initialized {
return nil, errors.Errorf("wal changes not ready")
}
revision := w.changes.curRevision()
cgr := w.changes.getChangeGroups(cgNames)
w.changes.Unlock()
return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}
return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
}
func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateToken) *ChangeGroupsUpdateToken {
@ -187,6 +190,10 @@ func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateTok
func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) {
w.changes.Lock()
if !w.changes.initialized {
w.changes.Unlock()
return nil, nil, errors.Errorf("wal changes not ready")
}
walseq, ok := w.changes.getPut(p)
revision := w.changes.curRevision()
cgr := w.changes.getChangeGroups(cgNames)
@ -234,6 +241,11 @@ func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-cha
startWith = w.toStorageDataPath(startWith)
w.changes.Lock()
if !w.changes.initialized {
w.changes.Unlock()
objectCh <- objectstorage.ObjectInfo{Err: errors.Errorf("wal changes not ready")}
return objectCh
}
changesList := w.changesList(w.changes.pathsOrdered, prefix, startWith, recursive)
deletedChangesMap := w.changes.getDeletesMap()
w.changes.Unlock()

View File

@ -184,19 +184,22 @@ func TestConcurrentUpdate(t *testing.T) {
<-walReadyCh
cgNames := []string{"changegroup01", "changegroup02"}
cgt := wal.GetChangeGroupsUpdateToken(cgNames)
cgt, err := wal.GetChangeGroupsUpdateToken(cgNames)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// populate with a wal
cgt, err = wal.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("err: %v", err)
t.Fatalf("unexpected err: %v", err)
}
// this must work successfully
oldcgt := cgt
cgt, err = wal.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("err: %v", err)
t.Fatalf("unexpected err: %v", err)
}
// this must fail since we are using the old cgt
@ -209,7 +212,7 @@ func TestConcurrentUpdate(t *testing.T) {
// this must work successfully
cgt, err = wal.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("err: %v", err)
t.Fatalf("unexpected err: %v", err)
}
// this must fail since we are using the old cgt