From c064e490b2145e19e495510d2c2f75ad77082c91 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Wed, 10 Jul 2019 10:20:03 +0200 Subject: [PATCH] datamanager: add lock around compatchangegroups and initetcd just to avoid concurrency errors when multiple instances are running --- internal/datamanager/datamanager.go | 8 +++-- internal/datamanager/datamanager_test.go | 1 - internal/datamanager/wal.go | 46 +++++++++++++++++++++++- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go index 76d4792..e4de93c 100644 --- a/internal/datamanager/datamanager.go +++ b/internal/datamanager/datamanager.go @@ -60,9 +60,11 @@ var ( etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq") - etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") - etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") - etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") + etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd") + etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") + etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock") + etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") + etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") diff --git a/internal/datamanager/datamanager_test.go b/internal/datamanager/datamanager_test.go index 9c76130..1bf19c9 100644 --- a/internal/datamanager/datamanager_test.go +++ b/internal/datamanager/datamanager_test.go @@ -134,7 +134,6 @@ func TestEtcdReset(t *testing.T) { os.RemoveAll(etcdDir) t.Logf("starting etcd") tetcd = setupEtcd(t, etcdDir) - defer shutdownEtcd(tetcd) if err := tetcd.Start(); err != nil { t.Fatalf("unexpected err: %v", err) } diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index dbb6c26..c9a8a14 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -568,6 +568,9 @@ func (d *DataManager) sync(ctx context.Context) error { m := concurrency.NewMutex(session, etcdSyncLockKey) + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) if err := m.Lock(ctx); err != nil { return err } @@ -662,6 +665,9 @@ func (d *DataManager) checkpoint(ctx context.Context) error { m := concurrency.NewMutex(session, etcdCheckpointLockKey) + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) if err := m.Lock(ctx); err != nil { return err } @@ -741,6 +747,9 @@ func (d *DataManager) walCleaner(ctx context.Context) error { m := concurrency.NewMutex(session, etcdWalCleanerLockKey) + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) if err := m.Lock(ctx); err != nil { return err } @@ -802,11 +811,30 @@ func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) { } func (d *DataManager) compactChangeGroups(ctx context.Context) error { + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdCompactChangeGroupsLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer func() { _ = m.Unlock(ctx) }() + resp, err := d.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey) if err != nil { return err } + if len(resp.Kvs) == 0 { + return errors.Errorf("no change group min revision key in etcd") + } revision := resp.Kvs[0].ModRevision // first update minrevision @@ -918,6 +946,22 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return nil } + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdInitEtcdLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer func() { _ = m.Unlock(ctx) }() + // Create changegroup min revision if it doesn't exists cmp := []etcdclientv3.Cmp{} then := []etcdclientv3.Op{} @@ -929,7 +973,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return etcd.FromEtcdError(err) } - _, err := d.e.Get(ctx, etcdWalsDataKey, 0) + _, err = d.e.Get(ctx, etcdWalsDataKey, 0) if err != nil && err != etcd.ErrKeyNotFound { return err }