diff --git a/internal/datamanager/changes.go b/internal/datamanager/changes.go index 92cde49..35975eb 100644 --- a/internal/datamanager/changes.go +++ b/internal/datamanager/changes.go @@ -72,28 +72,11 @@ func (c *WalChanges) putRevision(revision int64) { c.revision = revision } -func (c *WalChanges) curWalSeq() string { - return c.walSeq -} - func (c *WalChanges) getPut(dataType, id string) (string, bool) { walseq, ok := c.puts[dataType][id] return walseq, ok } -func (c *WalChanges) getDeletesMap() map[string]struct{} { - dmap := map[string]struct{}{} - for p := range c.deletes { - dmap[p] = struct{}{} - } - return dmap -} - -func (c *WalChanges) getDelete(p string) bool { - _, ok := c.deletes[p] - return ok -} - func (c *WalChanges) addPut(dataType, id, walseq string, revision int64) { delete(c.deletes[dataType], id) c.puts[dataType][id] = walseq @@ -188,7 +171,7 @@ func (d *DataManager) applyWalChangesAction(ctx context.Context, action *Action, d.changes.actions[walSequence] = append(d.changes.actions[walSequence], action) } -func (d *DataManager) watcherLoop(ctx context.Context) error { +func (d *DataManager) watcherLoop(ctx context.Context) { for { initialized := d.changes.initialized if !initialized { @@ -204,7 +187,7 @@ func (d *DataManager) watcherLoop(ctx context.Context) error { select { case <-ctx.Done(): d.log.Infof("watcher exiting") - return nil + return default: } diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index 1ec1f60..1009dd9 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -166,9 +166,8 @@ func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { go d.compactChangeGroupsLoop(ctx) go d.etcdPingerLoop(ctx) - select { - case <-ctx.Done(): - d.log.Infof("walmanager exiting") - return nil - } + <-ctx.Done() + d.log.Infof("walmanager exiting") + + return nil } diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index bc5b6a8..a27b62c 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -38,7 +38,6 @@ import ( var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) var logger = slog.New(level) -var log = logger.Sugar() func setupEtcd(t *testing.T, dir string) *testutil.TestEmbeddedEtcd { tetcd, err := testutil.NewTestEmbeddedEtcd(t, logger, dir) @@ -56,17 +55,10 @@ func setupEtcd(t *testing.T, dir string) *testutil.TestEmbeddedEtcd { func shutdownEtcd(tetcd *testutil.TestEmbeddedEtcd) { if tetcd.Etcd != nil { - tetcd.Kill() + _ = tetcd.Kill() } } -type noopCheckpointer struct { -} - -func (c *noopCheckpointer) Checkpoint(ctx context.Context, action *Action) error { - return nil -} - func TestEtcdReset(t *testing.T) { dir, err := ioutil.TempDir("", "agola") if err != nil { @@ -75,12 +67,18 @@ func TestEtcdReset(t *testing.T) { defer os.RemoveAll(dir) etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } tetcd := setupEtcd(t, etcdDir) defer shutdownEtcd(tetcd) ctx, cancel := context.WithCancel(context.Background()) ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } ost, err := posix.New(ostDir) if err != nil { @@ -95,10 +93,13 @@ func TestEtcdReset(t *testing.T) { DataTypes: []string{"datatype01"}, } dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } dmReadyCh := make(chan struct{}) t.Logf("starting datamanager") - go dm.Run(ctx, dmReadyCh) + go func() { _ = dm.Run(ctx, dmReadyCh) }() <-dmReadyCh actions := []*Action{ @@ -109,10 +110,8 @@ func TestEtcdReset(t *testing.T) { }, } - expectedObjects := []string{} for i := 0; i < 20; i++ { objectID := fmt.Sprintf("object%02d", i) - expectedObjects = append(expectedObjects, objectID) actions[0].ID = objectID if _, err := dm.WriteWal(ctx, actions, nil); err != nil { t.Fatalf("unexpected err: %v", err) @@ -128,7 +127,9 @@ func TestEtcdReset(t *testing.T) { t.Logf("stopping etcd") // Reset etcd shutdownEtcd(tetcd) - tetcd.WaitDown(10 * time.Second) + if err := tetcd.WaitDown(10 * time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } t.Logf("resetting etcd") os.RemoveAll(etcdDir) t.Logf("starting etcd") @@ -149,10 +150,13 @@ func TestEtcdReset(t *testing.T) { DataTypes: []string{"datatype01"}, } dm, err = NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } dmReadyCh = make(chan struct{}) t.Logf("starting datamanager") - go dm.Run(ctx, dmReadyCh) + go func() { _ = dm.Run(ctx, dmReadyCh) }() <-dmReadyCh time.Sleep(5 * time.Second) @@ -174,12 +178,18 @@ func TestConcurrentUpdate(t *testing.T) { defer os.RemoveAll(dir) etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } tetcd := setupEtcd(t, etcdDir) defer shutdownEtcd(tetcd) ctx := context.Background() ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } ost, err := posix.New(ostDir) if err != nil { @@ -193,6 +203,9 @@ func TestConcurrentUpdate(t *testing.T) { DataTypes: []string{"datatype01"}, } dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } actions := []*Action{ { @@ -204,7 +217,7 @@ func TestConcurrentUpdate(t *testing.T) { } dmReadyCh := make(chan struct{}) - go dm.Run(ctx, dmReadyCh) + go func() { _ = dm.Run(ctx, dmReadyCh) }() <-dmReadyCh time.Sleep(5 * time.Second) @@ -236,7 +249,7 @@ func TestConcurrentUpdate(t *testing.T) { oldcgt = cgt // this must work successfully - cgt, err = dm.WriteWal(ctx, actions, cgt) + _, err = dm.WriteWal(ctx, actions, cgt) if err != nil { t.Fatalf("unexpected err: %v", err) } @@ -256,12 +269,18 @@ func TestWalCleaner(t *testing.T) { defer os.RemoveAll(dir) etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } tetcd := setupEtcd(t, etcdDir) defer shutdownEtcd(tetcd) ctx := context.Background() ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } ost, err := posix.New(ostDir) if err != nil { @@ -277,6 +296,9 @@ func TestWalCleaner(t *testing.T) { MinCheckpointWalsNum: 1, } dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } actions := []*Action{ { @@ -288,7 +310,7 @@ func TestWalCleaner(t *testing.T) { } dmReadyCh := make(chan struct{}) - go dm.Run(ctx, dmReadyCh) + go func() { _ = dm.Run(ctx, dmReadyCh) }() <-dmReadyCh for i := 0; i < 20; i++ { @@ -321,12 +343,18 @@ func TestReadObject(t *testing.T) { defer os.RemoveAll(dir) etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } tetcd := setupEtcd(t, etcdDir) defer shutdownEtcd(tetcd) ctx := context.Background() ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) @@ -340,9 +368,12 @@ func TestReadObject(t *testing.T) { DataTypes: []string{"datatype01"}, } dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } dmReadyCh := make(chan struct{}) - go dm.Run(ctx, dmReadyCh) + go func() { _ = dm.Run(ctx, dmReadyCh) }() <-dmReadyCh time.Sleep(5 * time.Second) @@ -479,12 +510,18 @@ func TestCheckpoint(t *testing.T) { defer os.RemoveAll(dir) etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } tetcd := setupEtcd(t, etcdDir) defer shutdownEtcd(tetcd) ctx := context.Background() ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } ost, err := posix.New(ostDir) if err != nil { t.Fatalf("unexpected err: %v", err) @@ -502,8 +539,11 @@ func TestCheckpoint(t *testing.T) { MaxDataFileSize: 10 * 1024, } dm, err := NewDataManager(ctx, logger, dmConfig) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } dmReadyCh := make(chan struct{}) - go dm.Run(ctx, dmReadyCh) + go func() { _ = dm.Run(ctx, dmReadyCh) }() <-dmReadyCh time.Sleep(5 * time.Second) @@ -667,7 +707,7 @@ func TestCheckpoint(t *testing.T) { } actionGroups = append(actionGroups, actions) - currentEntries, err = testCheckpoint(t, ctx, dm, actionGroups, currentEntries) + _, err = testCheckpoint(t, ctx, dm, actionGroups, currentEntries) if err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 823c5e2..fc279a8 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -143,24 +143,6 @@ func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.Read return ioutil.NopCloser(f), cgt, err } -func (d *DataManager) changesList(paths []string, prefix, startWith string, recursive bool) []string { - fpaths := []string{} - for _, p := range paths { - if !recursive && len(p) > len(prefix) { - rel := strings.TrimPrefix(p, prefix) - skip := strings.Contains(rel, d.ost.Delimiter()) - if skip { - continue - } - } - if strings.HasPrefix(p, prefix) && p > startWith { - fpaths = append(fpaths, p) - } - } - - return fpaths -} - func (d *DataManager) HasOSTWal(walseq string) (bool, error) { _, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed") if err == ostypes.ErrNotExist { @@ -601,7 +583,7 @@ func (d *DataManager) sync(ctx context.Context) error { if err := m.Lock(ctx); err != nil { return err } - defer m.Unlock(ctx) + defer func() { _ = m.Unlock(ctx) }() resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { @@ -695,7 +677,7 @@ func (d *DataManager) checkpoint(ctx context.Context) error { if err := m.Lock(ctx); err != nil { return err } - defer m.Unlock(ctx) + defer func() { _ = m.Unlock(ctx) }() resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { @@ -774,7 +756,7 @@ func (d *DataManager) walCleaner(ctx context.Context) error { if err := m.Lock(ctx); err != nil { return err } - defer m.Unlock(ctx) + defer func() { _ = m.Unlock(ctx) }() resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil {