From 21447fc59d588f65220b28ed6ac8782238a9b1ff Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Fri, 29 Mar 2019 11:37:22 +0100 Subject: [PATCH] etcd: allow specifying a revision for a get --- internal/etcd/etcd.go | 8 ++++++-- internal/sequence/sequence.go | 4 ++-- internal/services/runservice/scheduler/store/store.go | 6 +++--- internal/testutil/utils.go | 6 +++--- internal/wal/wal.go | 6 +++--- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/internal/etcd/etcd.go b/internal/etcd/etcd.go index 31f7862..6a08b1b 100644 --- a/internal/etcd/etcd.go +++ b/internal/etcd/etcd.go @@ -161,9 +161,13 @@ func (s *Store) Put(ctx context.Context, key string, value []byte, options *Writ return resp, FromEtcdError(err) } -func (s *Store) Get(ctx context.Context, key string) (*etcdclientv3.GetResponse, error) { - resp, err := s.c.Get(ctx, key) +func (s *Store) Get(ctx context.Context, key string, revision int64) (*etcdclientv3.GetResponse, error) { + opts := []etcdclientv3.OpOption{} + if revision != 0 { + opts = append(opts, etcdclientv3.WithRev(revision)) + } + resp, err := s.c.Get(ctx, key, opts...) if err != nil { return resp, FromEtcdError(err) } diff --git a/internal/sequence/sequence.go b/internal/sequence/sequence.go index 3992349..e71f726 100644 --- a/internal/sequence/sequence.go +++ b/internal/sequence/sequence.go @@ -68,7 +68,7 @@ func (s *Sequence) EqualEpoch(s2 *Sequence) bool { } func CurSequence(ctx context.Context, e *etcd.Store, key string) (*Sequence, bool, error) { - resp, err := e.Get(ctx, key) + resp, err := e.Get(ctx, key, 0) if err != nil && err != etcd.ErrKeyNotFound { return nil, false, err } @@ -87,7 +87,7 @@ func CurSequence(ctx context.Context, e *etcd.Store, key string) (*Sequence, boo } func IncSequence(ctx context.Context, e *etcd.Store, key string) (*Sequence, error) { - resp, err := e.Get(ctx, key) + resp, err := e.Get(ctx, key, 0) if err != nil && err != etcd.ErrKeyNotFound { return nil, err } diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index c608510..438d5a7 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -249,7 +249,7 @@ func LTSGenIndexes(lts *objectstorage.ObjStorage, r *types.Run) []string { } func GetExecutor(ctx context.Context, e *etcd.Store, executorID string) (*types.Executor, error) { - resp, err := e.Get(ctx, common.EtcdExecutorKey(executorID)) + resp, err := e.Get(ctx, common.EtcdExecutorKey(executorID), 0) if err != nil { return nil, err } @@ -304,7 +304,7 @@ func DeleteExecutor(ctx context.Context, e *etcd.Store, executorID string) error } func GetExecutorTask(ctx context.Context, e *etcd.Store, etID string) (*types.ExecutorTask, error) { - resp, err := e.Get(ctx, common.EtcdTaskKey(etID)) + resp, err := e.Get(ctx, common.EtcdTaskKey(etID), 0) if err != nil { return nil, err } @@ -422,7 +422,7 @@ func GetExecutorTasksForRun(ctx context.Context, e *etcd.Store, runID string) ([ } func GetRun(ctx context.Context, e *etcd.Store, runID string) (*types.Run, int64, error) { - resp, err := e.Get(ctx, common.EtcdRunKey(runID)) + resp, err := e.Get(ctx, common.EtcdRunKey(runID), 0) if err != nil { return nil, 0, err } diff --git a/internal/testutil/utils.go b/internal/testutil/utils.go index 7ff9e76..dfe7049 100644 --- a/internal/testutil/utils.go +++ b/internal/testutil/utils.go @@ -297,7 +297,7 @@ type TestEtcd struct { func (te *TestEtcd) Compact() error { ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) defer cancel() - resp, err := te.Get(ctx, "anykey") + resp, err := te.Get(ctx, "anykey", 0) if err != nil && err != etcd.ErrKeyNotFound { return err } @@ -311,7 +311,7 @@ func (te *TestEtcd) WaitUp(timeout time.Duration) error { for time.Now().Add(-timeout).Before(start) { ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) defer cancel() - _, err := te.Get(ctx, "anykey") + _, err := te.Get(ctx, "anykey", 0) if err != nil && err == etcd.ErrKeyNotFound { return nil } @@ -329,7 +329,7 @@ func (te *TestEtcd) WaitDown(timeout time.Duration) error { for time.Now().Add(-timeout).Before(start) { ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) defer cancel() - _, err := te.Get(ctx, "anykey") + _, err := te.Get(ctx, "anykey", 0) if err != nil && err != etcd.ErrKeyNotFound { return nil } diff --git a/internal/wal/wal.go b/internal/wal/wal.go index 61c3402..75355b7 100644 --- a/internal/wal/wal.go +++ b/internal/wal/wal.go @@ -466,7 +466,7 @@ func (w *WalManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64 } func (w *WalManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) { - resp, err := w.e.Get(ctx, etcdLastCommittedStorageWalSeqKey) + resp, err := w.e.Get(ctx, etcdLastCommittedStorageWalSeqKey, 0) if err != nil && err != etcd.ErrKeyNotFound { return "", 0, err } @@ -587,7 +587,7 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio return nil, err } - resp, err := w.e.Get(ctx, etcdWalsDataKey) + resp, err := w.e.Get(ctx, etcdWalsDataKey, 0) if err != nil { return nil, err } @@ -1121,7 +1121,7 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { return etcd.FromEtcdError(err) } - _, err := w.e.Get(ctx, etcdWalsDataKey) + _, err := w.e.Get(ctx, etcdWalsDataKey, 0) if err != nil && err != etcd.ErrKeyNotFound { return err }