datamanager: add lock around compatchangegroups and initetcd
just to avoid concurrency errors when multiple instances are running
This commit is contained in:
parent
276c045c3b
commit
c064e490b2
|
@ -60,9 +60,11 @@ var (
|
||||||
|
|
||||||
etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq")
|
etcdCheckpointSeqKey = path.Join(etcdWalBaseDir, "checkpointseq")
|
||||||
|
|
||||||
etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock")
|
etcdInitEtcdLockKey = path.Join(etcdWalBaseDir, "initetcd")
|
||||||
etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock")
|
etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock")
|
||||||
etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock")
|
etcdCompactChangeGroupsLockKey = path.Join(etcdWalBaseDir, "compactchangegroupslock")
|
||||||
|
etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock")
|
||||||
|
etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock")
|
||||||
|
|
||||||
etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups")
|
etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups")
|
||||||
etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev")
|
etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev")
|
||||||
|
|
|
@ -134,7 +134,6 @@ func TestEtcdReset(t *testing.T) {
|
||||||
os.RemoveAll(etcdDir)
|
os.RemoveAll(etcdDir)
|
||||||
t.Logf("starting etcd")
|
t.Logf("starting etcd")
|
||||||
tetcd = setupEtcd(t, etcdDir)
|
tetcd = setupEtcd(t, etcdDir)
|
||||||
defer shutdownEtcd(tetcd)
|
|
||||||
if err := tetcd.Start(); err != nil {
|
if err := tetcd.Start(); err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -568,6 +568,9 @@ func (d *DataManager) sync(ctx context.Context) error {
|
||||||
|
|
||||||
m := concurrency.NewMutex(session, etcdSyncLockKey)
|
m := concurrency.NewMutex(session, etcdSyncLockKey)
|
||||||
|
|
||||||
|
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||||
|
// locked. Currently multiple task updaters will enqueue and start when another
|
||||||
|
// finishes (unuseful and consume resources)
|
||||||
if err := m.Lock(ctx); err != nil {
|
if err := m.Lock(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -662,6 +665,9 @@ func (d *DataManager) checkpoint(ctx context.Context) error {
|
||||||
|
|
||||||
m := concurrency.NewMutex(session, etcdCheckpointLockKey)
|
m := concurrency.NewMutex(session, etcdCheckpointLockKey)
|
||||||
|
|
||||||
|
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||||
|
// locked. Currently multiple task updaters will enqueue and start when another
|
||||||
|
// finishes (unuseful and consume resources)
|
||||||
if err := m.Lock(ctx); err != nil {
|
if err := m.Lock(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -741,6 +747,9 @@ func (d *DataManager) walCleaner(ctx context.Context) error {
|
||||||
|
|
||||||
m := concurrency.NewMutex(session, etcdWalCleanerLockKey)
|
m := concurrency.NewMutex(session, etcdWalCleanerLockKey)
|
||||||
|
|
||||||
|
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||||
|
// locked. Currently multiple task updaters will enqueue and start when another
|
||||||
|
// finishes (unuseful and consume resources)
|
||||||
if err := m.Lock(ctx); err != nil {
|
if err := m.Lock(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -802,11 +811,30 @@ func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DataManager) compactChangeGroups(ctx context.Context) error {
|
func (d *DataManager) compactChangeGroups(ctx context.Context) error {
|
||||||
|
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
m := concurrency.NewMutex(session, etcdCompactChangeGroupsLockKey)
|
||||||
|
|
||||||
|
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||||
|
// locked. Currently multiple task updaters will enqueue and start when another
|
||||||
|
// finishes (unuseful and consume resources)
|
||||||
|
if err := m.Lock(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() { _ = m.Unlock(ctx) }()
|
||||||
|
|
||||||
resp, err := d.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey)
|
resp, err := d.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(resp.Kvs) == 0 {
|
||||||
|
return errors.Errorf("no change group min revision key in etcd")
|
||||||
|
}
|
||||||
revision := resp.Kvs[0].ModRevision
|
revision := resp.Kvs[0].ModRevision
|
||||||
|
|
||||||
// first update minrevision
|
// first update minrevision
|
||||||
|
@ -918,6 +946,22 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
m := concurrency.NewMutex(session, etcdInitEtcdLockKey)
|
||||||
|
|
||||||
|
// TODO(sgotti) find a way to use a trylock so we'll just return if already
|
||||||
|
// locked. Currently multiple task updaters will enqueue and start when another
|
||||||
|
// finishes (unuseful and consume resources)
|
||||||
|
if err := m.Lock(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() { _ = m.Unlock(ctx) }()
|
||||||
|
|
||||||
// Create changegroup min revision if it doesn't exists
|
// Create changegroup min revision if it doesn't exists
|
||||||
cmp := []etcdclientv3.Cmp{}
|
cmp := []etcdclientv3.Cmp{}
|
||||||
then := []etcdclientv3.Op{}
|
then := []etcdclientv3.Op{}
|
||||||
|
@ -929,7 +973,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
|
||||||
return etcd.FromEtcdError(err)
|
return etcd.FromEtcdError(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := d.e.Get(ctx, etcdWalsDataKey, 0)
|
_, err = d.e.Get(ctx, etcdWalsDataKey, 0)
|
||||||
if err != nil && err != etcd.ErrKeyNotFound {
|
if err != nil && err != etcd.ErrKeyNotFound {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue