diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index d752094..03f73dc 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -564,12 +564,12 @@ func (d *DataManager) sync(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, etcdSyncLockKey) + m := etcd.NewMutex(session, etcdSyncLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -653,12 +653,12 @@ func (d *DataManager) checkpoint(ctx context.Context, force bool) error { } defer session.Close() - m := concurrency.NewMutex(session, etcdCheckpointLockKey) + m := etcd.NewMutex(session, etcdCheckpointLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -735,12 +735,12 @@ func (d *DataManager) checkpointClean(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, etcdCheckpointLockKey) + m := etcd.NewMutex(session, etcdCheckpointLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -778,12 +778,12 @@ func (d *DataManager) etcdWalCleaner(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, etcdWalCleanerLockKey) + m := etcd.NewMutex(session, etcdWalCleanerLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -851,12 +851,12 @@ func (d *DataManager) storageWalCleaner(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, etcdStorageWalCleanerLockKey) + m := etcd.NewMutex(session, etcdStorageWalCleanerLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -961,12 +961,12 @@ func (d *DataManager) compactChangeGroups(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, etcdCompactChangeGroupsLockKey) + m := etcd.NewMutex(session, etcdCompactChangeGroupsLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -1099,12 +1099,12 @@ func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) erro } defer session.Close() - m := concurrency.NewMutex(session, etcdInitEtcdLockKey) + m := etcd.NewMutex(session, etcdInitEtcdLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() diff --git a/internal/etcd/mutex.go b/internal/etcd/mutex.go new file mode 100644 index 0000000..1ce40ba --- /dev/null +++ b/internal/etcd/mutex.go @@ -0,0 +1,200 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// TODO(sgotti) this is copied from etcd master clientv3 concurrency package to +// have the TryLock function not yet available on stable v3.4 client +// Remove this when updating the client to a version providing TryLock + +package etcd + +import ( + "context" + "errors" + "fmt" + "sync" + + v3 "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/clientv3/concurrency" + pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.etcd.io/etcd/mvcc/mvccpb" +) + +// ErrLocked is returned by TryLock when Mutex is already locked by another session. +var ErrLocked = errors.New("mutex: Locked by another session") + +// Mutex implements the sync Locker interface with etcd +type Mutex struct { + s *concurrency.Session + + pfx string + myKey string + myRev int64 + hdr *pb.ResponseHeader +} + +func NewMutex(s *concurrency.Session, pfx string) *Mutex { + return &Mutex{s, pfx + "/", "", -1, nil} +} + +// TryLock locks the mutex if not already locked by another session. +// If lock is held by another session, return immediately after attempting necessary cleanup +// The ctx argument is used for the sending/receiving Txn RPC. +func (m *Mutex) TryLock(ctx context.Context) error { + resp, err := m.tryAcquire(ctx) + if err != nil { + return err + } + // if no key on prefix / the minimum rev is key, already hold the lock + ownerKey := resp.Responses[1].GetResponseRange().Kvs + if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { + m.hdr = resp.Header + return nil + } + client := m.s.Client() + // Cannot lock, so delete the key + if _, err := client.Delete(ctx, m.myKey); err != nil { + return err + } + m.myKey = "\x00" + m.myRev = -1 + return ErrLocked +} + +// Lock locks the mutex with a cancelable context. If the context is canceled +// while trying to acquire the lock, the mutex tries to clean its stale lock entry. +func (m *Mutex) Lock(ctx context.Context) error { + resp, err := m.tryAcquire(ctx) + if err != nil { + return err + } + // if no key on prefix / the minimum rev is key, already hold the lock + ownerKey := resp.Responses[1].GetResponseRange().Kvs + if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { + m.hdr = resp.Header + return nil + } + client := m.s.Client() + // wait for deletion revisions prior to myKey + hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) + // release lock key if wait failed + if werr != nil { + _ = m.Unlock(client.Ctx()) + } else { + m.hdr = hdr + } + return werr +} + +func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { + s := m.s + client := m.s.Client() + + m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) + cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) + // put self in lock waiters via myKey; oldest waiter holds lock + put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) + // reuse key in case this session already holds the lock + get := v3.OpGet(m.myKey) + // fetch current holder to complete uncontended path with only one RPC + getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) + resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() + if err != nil { + return nil, err + } + m.myRev = resp.Header.Revision + if !resp.Succeeded { + m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision + } + return resp, nil +} + +func (m *Mutex) Unlock(ctx context.Context) error { + client := m.s.Client() + if _, err := client.Delete(ctx, m.myKey); err != nil { + return err + } + m.myKey = "\x00" + m.myRev = -1 + return nil +} + +func (m *Mutex) IsOwner() v3.Cmp { + return v3.Compare(v3.CreateRevision(m.myKey), "=", m.myRev) +} + +func (m *Mutex) Key() string { return m.myKey } + +// Header is the response header received from etcd on acquiring the lock. +func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr } + +type lockerMutex struct{ *Mutex } + +func (lm *lockerMutex) Lock() { + client := lm.s.Client() + if err := lm.Mutex.Lock(client.Ctx()); err != nil { + panic(err) + } +} +func (lm *lockerMutex) Unlock() { + client := lm.s.Client() + if err := lm.Mutex.Unlock(client.Ctx()); err != nil { + panic(err) + } +} + +// NewLocker creates a sync.Locker backed by an etcd mutex. +func NewLocker(s *concurrency.Session, pfx string) sync.Locker { + return &lockerMutex{NewMutex(s, pfx)} +} + +func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { + cctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wr v3.WatchResponse + wch := client.Watch(cctx, key, v3.WithRev(rev)) + for wr = range wch { + for _, ev := range wr.Events { + if ev.Type == mvccpb.DELETE { + return nil + } + } + } + if err := wr.Err(); err != nil { + return err + } + if err := ctx.Err(); err != nil { + return err + } + return fmt.Errorf("lost watcher waiting for delete") +} + +// waitDeletes efficiently waits until all keys matching the prefix and no greater +// than the create revision. +func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) { + getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev)) + for { + resp, err := client.Get(ctx, pfx, getOpts...) + if err != nil { + return nil, err + } + if len(resp.Kvs) == 0 { + return resp.Header, nil + } + lastKey := string(resp.Kvs[0].Key) + if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil { + return nil, err + } + } +} diff --git a/internal/services/notification/runevents.go b/internal/services/notification/runevents.go index 1d18f13..d46ac11 100644 --- a/internal/services/notification/runevents.go +++ b/internal/services/notification/runevents.go @@ -24,6 +24,7 @@ import ( "path" "time" + "agola.io/agola/internal/etcd" rstypes "agola.io/agola/services/runservice/types" "go.etcd.io/etcd/clientv3/concurrency" @@ -56,12 +57,12 @@ func (n *NotificationService) runEventsHandler(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, etcdRunEventsLockKey) + m := etcd.NewMutex(session, etcdRunEventsLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() diff --git a/internal/services/runservice/scheduler.go b/internal/services/runservice/scheduler.go index eb7ba90..2dd678f 100644 --- a/internal/services/runservice/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -395,12 +395,12 @@ func (s *Runservice) compactChangeGroups(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, common.EtcdCompactChangeGroupsLockKey) + m := etcd.NewMutex(session, common.EtcdCompactChangeGroupsLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -834,12 +834,12 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error { } defer session.Close() - m := concurrency.NewMutex(session, common.EtcdTaskUpdaterLockKey) + m := etcd.NewMutex(session, common.EtcdTaskUpdaterLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple task updaters will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -1341,12 +1341,12 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time. } defer session.Close() - m := concurrency.NewMutex(session, common.EtcdCacheCleanerLockKey) + m := etcd.NewMutex(session, common.EtcdCacheCleanerLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple cachecleaners will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }() @@ -1393,12 +1393,12 @@ func (s *Runservice) workspaceCleaner(ctx context.Context, workspaceExpireInterv } defer session.Close() - m := concurrency.NewMutex(session, common.EtcdWorkspaceCleanerLockKey) + m := etcd.NewMutex(session, common.EtcdWorkspaceCleanerLockKey) - // TODO(sgotti) find a way to use a trylock so we'll just return if already - // locked. Currently multiple workspacecleaners will enqueue and start when another - // finishes (unuseful and consume resources) - if err := m.Lock(ctx); err != nil { + if err := m.TryLock(ctx); err != nil { + if errors.Is(err, etcd.ErrLocked) { + return nil + } return err } defer func() { _ = m.Unlock(ctx) }()