agola/internal/datamanager/wal.go
Simone Gotti d2b09d854f *: use new errors handling library
Implement a new error handling library based on pkg/errors. It provides
stack saving on wrapping and exports some function to add stack saving
also to external errors.
It also implements custom zerolog error formatting without adding too
much verbosity by just printing the chain error file:line without a full
stack trace of every error.

* Add a --detailed-errors options to print error with they full chain
* Wrap all error returns. Use errors.WithStack to wrap without adding a
  new messsage and error.Wrap[f] to add a message.
* Add golangci-lint wrapcheck to check that external packages errors are
  wrapped. This won't check that internal packages error are wrapped.
  But we want also to ensure this case so we'll have to find something
  else to check also these.
2022-02-28 12:49:13 +01:00

1273 lines
35 KiB
Go

// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package datamanager
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"path"
"strings"
"time"
"agola.io/agola/internal/errors"
"agola.io/agola/internal/etcd"
"agola.io/agola/internal/objectstorage"
"agola.io/agola/internal/sequence"
"github.com/gofrs/uuid"
etcdclientv3 "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/mvcc/mvccpb"
)
type ActionType string
const (
ActionTypePut ActionType = "put"
ActionTypeDelete ActionType = "delete"
)
type Action struct {
ActionType ActionType
DataType string
ID string
Data []byte
}
type WalHeader struct {
WalDataFileID string
PreviousWalSequence string
}
type WalStatus string
const (
// WalStatusCommitted represent a wal written to the objectstorage
WalStatusCommitted WalStatus = "committed"
// WalStatusCommittedStorage represent the .committed marker file written to the objectstorage
WalStatusCommittedStorage WalStatus = "committed_storage"
// WalStatusCheckpointed mean that all the wal actions have been executed on the objectstorage
WalStatusCheckpointed WalStatus = "checkpointed"
)
type WalsData struct {
LastCommittedWalSequence string
Revision int64 `json:"-"`
}
type WalData struct {
WalDataFileID string
WalStatus WalStatus
WalSequence string
PreviousWalSequence string
// internal values not saved
Revision int64 `json:"-"`
}
type ChangeGroupsUpdateToken struct {
CurRevision int64 `json:"cur_revision"`
ChangeGroupsRevisions changeGroupsRevisions `json:"change_groups_revisions"`
}
type changeGroupsRevisions map[string]int64
func (d *DataManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) {
d.changes.Lock()
defer d.changes.Unlock()
if !d.changes.initialized {
return nil, errors.Errorf("wal changes not ready")
}
revision := d.changes.curRevision()
cgr := d.changes.getChangeGroups(cgNames)
return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
}
func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) {
d.changes.Lock()
if !d.changes.initialized {
d.changes.Unlock()
return nil, nil, errors.Errorf("wal changes not ready")
}
walseq, ok := d.changes.getPut(dataType, id)
revision := d.changes.curRevision()
cgr := d.changes.getChangeGroups(cgNames)
actions := d.changes.actions[walseq]
d.changes.Unlock()
cgt := &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}
if ok {
for _, action := range actions {
if action.ActionType == ActionTypePut {
if action.DataType == dataType && action.ID == id {
d.log.Debug().Msgf("reading datatype %q, id %q from wal: %q", dataType, id, walseq)
return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil
}
}
}
return nil, nil, newErrNotExist(errors.Errorf("no datatype %q, id %q in wal %s", dataType, id, walseq))
}
f, err := d.Read(dataType, id)
return ioutil.NopCloser(f), cgt, errors.WithStack(err)
}
func (d *DataManager) HasOSTWal(walseq string) (bool, error) {
_, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed")
if objectstorage.IsNotExist(err) {
return false, nil
}
if err != nil {
return false, fromOSTError(err)
}
return true, nil
}
func (d *DataManager) ReadWal(walseq string) (*WalHeader, error) {
walFilef, err := d.ost.ReadObject(d.storageWalStatusFile(walseq) + ".committed")
if err != nil {
return nil, fromOSTError(err)
}
defer walFilef.Close()
dec := json.NewDecoder(walFilef)
var header *WalHeader
if err = dec.Decode(&header); err != nil {
return nil, errors.WithStack(err)
}
return header, nil
}
func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) {
r, err := d.ost.ReadObject(d.storageWalDataFile(walFileID))
return r, fromOSTError(err)
}
type WalFile struct {
WalSequence string
Err error
}
func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
walCh := make(chan *WalFile, 1)
go func() {
doneCh := make(chan struct{})
defer close(doneCh)
defer close(walCh)
curWal := &WalFile{}
var startPath string
if start != "" {
startPath = d.storageWalStatusFile(start)
}
for object := range d.ost.List(d.storageWalStatusDir()+"/", startPath, true, doneCh) {
if object.Err != nil {
walCh <- &WalFile{
Err: fromOSTError(object.Err),
}
return
}
name := path.Base(object.Path)
ext := path.Ext(name)
// accept only ".committed" files (skip old files that had ".checkpointed" extensions)
if ext != ".committed" {
continue
}
walSequence := strings.TrimSuffix(name, ext)
// wal file refers to another wal, so return the current one
if curWal.WalSequence != walSequence {
if curWal.WalSequence != "" {
walCh <- curWal
}
curWal = &WalFile{
WalSequence: walSequence,
}
}
}
if curWal.WalSequence != "" {
walCh <- curWal
}
}()
return walCh
}
type ListEtcdWalsElement struct {
WalData *WalData
Err error
}
func (d *DataManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement {
walCh := make(chan *ListEtcdWalsElement, 1)
go func() {
defer close(walCh)
var continuation *etcd.ListPagedContinuation
for {
listResp, err := d.e.ListPaged(ctx, etcdWalsDir, revision, 10, continuation)
if err != nil {
walCh <- &ListEtcdWalsElement{
Err: err,
}
return
}
resp := listResp.Resp
continuation = listResp.Continuation
for _, kv := range resp.Kvs {
var walData *WalData
err := json.Unmarshal(kv.Value, &walData)
walCh <- &ListEtcdWalsElement{
WalData: walData,
Err: err,
}
}
if !listResp.HasMore {
break
}
}
}()
return walCh
}
func (d *DataManager) ListEtcdChangeGroups(ctx context.Context, revision int64) (changeGroupsRevisions, error) {
changeGroupsRevisions := changeGroupsRevisions{}
resp, err := d.e.List(ctx, etcdChangeGroupsDir, "", revision)
if err != nil {
return nil, errors.WithStack(err)
}
for _, kv := range resp.Kvs {
changegroupID := path.Base(string(kv.Key))
changeGroupsRevisions[changegroupID] = kv.ModRevision
}
return changeGroupsRevisions, nil
}
// FirstAvailableWalData returns the first (the one with smaller sequence) wal
// and returns it (or nil if not available) and the etcd revision at the time of
// the operation
func (d *DataManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error) {
// list waldata and just get the first if available
listResp, err := d.e.ListPaged(ctx, etcdWalsDir, 0, 1, nil)
if err != nil {
return nil, 0, errors.WithStack(err)
}
resp := listResp.Resp
revision := resp.Header.Revision
if len(resp.Kvs) == 0 {
return nil, revision, nil
}
var walData *WalData
if err := json.Unmarshal(resp.Kvs[0].Value, &walData); err != nil {
return nil, 0, errors.WithStack(err)
}
return walData, revision, nil
}
func (d *DataManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) {
resp, err := d.e.Get(ctx, etcdLastCommittedStorageWalSeqKey, 0)
if err != nil && !errors.Is(err, etcd.ErrKeyNotFound) {
return "", 0, errors.WithStack(err)
}
if errors.Is(err, etcd.ErrKeyNotFound) {
return "", 0, errors.Errorf("no last committedstorage wal on etcd")
}
lastCommittedStorageWal := string(resp.Kvs[0].Value)
revision := resp.Header.Revision
return lastCommittedStorageWal, revision, nil
}
type WatchElement struct {
Revision int64
WalData *WalData
ChangeGroupsRevisions changeGroupsRevisions
Err error
}
func (d *DataManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement {
walCh := make(chan *WatchElement, 1)
// TODO(sgotti) if the etcd cluster goes down, watch won't return an error but
// wait until it comes back. We have to find a way to detect when the cluster
// is down and report an error so our clients can react (i.e. a readdb could
// mark itself as not in sync)
wctx := etcdclientv3.WithRequireLeader(ctx)
wch := d.e.Watch(wctx, etcdWalBaseDir+"/", revision)
go func() {
defer close(walCh)
for wresp := range wch {
we := &WatchElement{ChangeGroupsRevisions: make(changeGroupsRevisions)}
send := false
if wresp.Canceled {
err := wresp.Err()
if errors.Is(err, etcdclientv3rpc.ErrCompacted) {
we.Err = ErrCompacted
} else {
we.Err = err
}
walCh <- we
return
}
we.Revision = wresp.Header.Revision
for _, ev := range wresp.Events {
key := string(ev.Kv.Key)
switch {
case strings.HasPrefix(key, etcdWalsDir+"/"):
send = true
switch ev.Type {
case mvccpb.PUT:
var walData *WalData
if err := json.Unmarshal(ev.Kv.Value, &walData); err != nil {
we.Err = wresp.Err()
walCh <- we
return
}
we.WalData = walData
}
case strings.HasPrefix(key, etcdChangeGroupsDir+"/"):
send = true
switch ev.Type {
case mvccpb.PUT:
changeGroup := path.Base(string(ev.Kv.Key))
we.ChangeGroupsRevisions[changeGroup] = ev.Kv.ModRevision
case mvccpb.DELETE:
changeGroup := path.Base(string(ev.Kv.Key))
we.ChangeGroupsRevisions[changeGroup] = 0
}
case key == etcdPingKey:
send = true
default:
continue
}
}
if send {
walCh <- we
}
}
}()
return walCh
}
// WriteWal writes the provided actions in a wal file. The wal will be marked as
// "committed" on etcd if the provided group changes aren't changed in the
// meantime or a optimistic concurrency error will be returned and the wal won't
// be committed
//
// TODO(sgotti) save inside the wal file also the previous committed wal to
// handle possible objectstorage list operation eventual consistency gaps (list
// won't report a wal at seq X but a wal at X+n, if this kind of eventual
// consistency ever exists)
func (d *DataManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) {
return d.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil)
}
func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error) {
// check changegroups name
if cgt != nil {
for cgName := range cgt.ChangeGroupsRevisions {
if strings.Contains(cgName, "/") {
return nil, fmt.Errorf(`changegroup name %q must not contain "/"`, cgName)
}
if len(cgName) > maxChangegroupNameLength {
return nil, fmt.Errorf("changegroup name %q too long", cgName)
}
}
}
if len(actions) == 0 {
return nil, errors.Errorf("cannot write wal: actions is empty")
}
walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey)
if err != nil {
return nil, errors.WithStack(err)
}
resp, err := d.e.Get(ctx, etcdWalsDataKey, 0)
if err != nil {
return nil, errors.WithStack(err)
}
var walsData WalsData
if err := json.Unmarshal(resp.Kvs[0].Value, &walsData); err != nil {
return nil, errors.WithStack(err)
}
walsData.Revision = resp.Kvs[0].ModRevision
walDataFileID := uuid.Must(uuid.NewV4()).String()
walDataFilePath := d.storageWalDataFile(walDataFileID)
walKey := etcdWalKey(walSequence.String())
var buf bytes.Buffer
for _, action := range actions {
actionj, err := json.Marshal(action)
if err != nil {
return nil, errors.WithStack(err)
}
if _, err := buf.Write(actionj); err != nil {
return nil, errors.WithStack(err)
}
}
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes()), int64(buf.Len()), true); err != nil {
return nil, fromOSTError(err)
}
d.log.Debug().Msgf("wrote wal file: %s", walDataFilePath)
walData := &WalData{
WalSequence: walSequence.String(),
WalDataFileID: walDataFileID,
WalStatus: WalStatusCommitted,
PreviousWalSequence: walsData.LastCommittedWalSequence,
}
walsData.LastCommittedWalSequence = walSequence.String()
walsDataj, err := json.Marshal(walsData)
if err != nil {
return nil, errors.WithStack(err)
}
walDataj, err := json.Marshal(walData)
if err != nil {
return nil, errors.WithStack(err)
}
if cmp == nil {
cmp = []etcdclientv3.Cmp{}
}
if then == nil {
then = []etcdclientv3.Op{}
}
getWalsData := etcdclientv3.OpGet(etcdWalsDataKey)
getWal := etcdclientv3.OpGet(walKey)
if cgt != nil {
for cgName, cgRev := range cgt.ChangeGroupsRevisions {
cgKey := path.Join(etcdChangeGroupsDir, cgName)
if cgRev > 0 {
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(cgKey), "=", cgRev))
} else {
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(cgKey), "=", 0))
}
then = append(then, etcdclientv3.OpPut(cgKey, ""))
}
if cgt.CurRevision > 0 {
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "<", cgt.CurRevision+etcdChangeGroupMinRevisionRange))
}
}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(etcdWalsDataKey), "=", walsData.Revision))
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.Version(walKey), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj)))
then = append(then, etcdclientv3.OpPut(walKey, string(walDataj)))
// This will only succeed if no one else have concurrently updated the walsData
// TODO(sgotti) retry if it failed due to concurrency errors
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...).Else(getWalsData, getWal)
tresp, err := txn.Commit()
if err != nil {
return nil, errors.WithStack(etcd.FromEtcdError(err))
}
if !tresp.Succeeded {
walsDataRev := tresp.Responses[0].GetResponseRange().Kvs[0].ModRevision
walDataCreateRev := tresp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
// TODO(sgotti) If the tx failed due to walsdata already updated we could retry
if walsDataRev == walsData.Revision && walDataCreateRev == 0 {
return nil, errors.Errorf("failed to write committed wal: wals groups already updated")
}
return nil, ErrConcurrency
}
ncgt := &ChangeGroupsUpdateToken{CurRevision: tresp.Header.Revision, ChangeGroupsRevisions: make(changeGroupsRevisions)}
if cgt != nil {
for cgName := range cgt.ChangeGroupsRevisions {
ncgt.ChangeGroupsRevisions[cgName] = tresp.Header.Revision
}
}
// try to commit storage right now
if err := d.sync(ctx); err != nil {
d.log.Err(err).Msgf("wal sync error")
}
return ncgt, nil
}
func (d *DataManager) syncLoop(ctx context.Context) {
for {
d.log.Debug().Msgf("syncer")
if err := d.sync(ctx); err != nil {
d.log.Err(err).Msgf("syncer error")
}
sleepCh := time.NewTimer(DefaultSyncInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
func (d *DataManager) sync(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return errors.WithStack(err)
}
defer session.Close()
m := etcd.NewMutex(session, etcdSyncLockKey)
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = m.Unlock(ctx) }()
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return errors.WithStack(err)
}
for _, kv := range resp.Kvs {
var walData WalData
if err := json.Unmarshal(kv.Value, &walData); err != nil {
return errors.WithStack(err)
}
// wals must be committed and checkpointed in order.
// TODO(sgotti) this could be optimized by parallelizing writes of wals that don't have common change groups
switch walData.WalStatus {
case WalStatusCommitted:
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debug().Msgf("syncing committed wal %q to storage", walData.WalSequence)
header := &WalHeader{
WalDataFileID: walData.WalDataFileID,
PreviousWalSequence: walData.PreviousWalSequence,
}
headerj, err := json.Marshal(header)
if err != nil {
return errors.WithStack(err)
}
walFileCommittedPath := walFilePath + ".committed"
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil {
return fromOSTError(err)
}
d.log.Debug().Msgf("updating wal to state %q", WalStatusCommittedStorage)
walData.WalStatus = WalStatusCommittedStorage
walDataj, err := json.Marshal(walData)
if err != nil {
return errors.WithStack(err)
}
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision))
then = append(then, etcdclientv3.OpPut(string(kv.Key), string(walDataj)))
then = append(then, etcdclientv3.OpPut(string(etcdLastCommittedStorageWalSeqKey), string(walData.WalSequence)))
// This will only succeed if the no one else have concurrently updated the wal keys in etcd
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...)
tresp, err := txn.Commit()
if err != nil {
return errors.WithStack(etcd.FromEtcdError(err))
}
if !tresp.Succeeded {
return errors.Errorf("failed to write committedstorage wal: concurrent update")
}
}
}
return nil
}
func (d *DataManager) checkpointLoop(ctx context.Context) {
for {
d.log.Debug().Msgf("checkpointer")
if err := d.checkpoint(ctx, false); err != nil {
d.log.Err(err).Msgf("checkpoint error")
}
sleepCh := time.NewTimer(d.checkpointInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
func (d *DataManager) checkpoint(ctx context.Context, force bool) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return errors.WithStack(err)
}
defer session.Close()
m := etcd.NewMutex(session, etcdCheckpointLockKey)
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = m.Unlock(ctx) }()
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return errors.WithStack(err)
}
walsData := []*WalData{}
for _, kv := range resp.Kvs {
var walData *WalData
if err := json.Unmarshal(kv.Value, &walData); err != nil {
return errors.WithStack(err)
}
walData.Revision = kv.ModRevision
if walData.WalStatus == WalStatusCommitted {
d.log.Warn().Msgf("wal %s not yet committed storage", walData.WalSequence)
break
}
if walData.WalStatus == WalStatusCheckpointed {
continue
}
walsData = append(walsData, walData)
}
if !force && len(walsData) < d.minCheckpointWalsNum {
return nil
}
if len(walsData) == 0 {
return nil
}
if err := d.writeDataSnapshot(ctx, walsData); err != nil {
return errors.Wrapf(err, "checkpoint function error")
}
for _, walData := range walsData {
d.log.Debug().Msgf("updating wal to state %q", WalStatusCheckpointed)
walData.WalStatus = WalStatusCheckpointed
walDataj, err := json.Marshal(walData)
if err != nil {
return errors.WithStack(err)
}
walKey := etcdWalKey(walData.WalSequence)
if _, err := d.e.AtomicPut(ctx, walKey, walDataj, walData.Revision, nil); err != nil {
return errors.WithStack(err)
}
}
return nil
}
func (d *DataManager) checkpointCleanLoop(ctx context.Context) {
for {
d.log.Debug().Msgf("checkpointCleanLoop")
if err := d.checkpointClean(ctx); err != nil {
d.log.Err(err).Msgf("checkpointClean error")
}
sleepCh := time.NewTimer(d.checkpointCleanInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
func (d *DataManager) checkpointClean(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return errors.WithStack(err)
}
defer session.Close()
m := etcd.NewMutex(session, etcdCheckpointLockKey)
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = m.Unlock(ctx) }()
if err := d.CleanOldCheckpoints(ctx); err != nil {
return errors.WithStack(err)
}
return nil
}
func (d *DataManager) etcdWalCleanerLoop(ctx context.Context) {
for {
d.log.Debug().Msgf("etcdwalcleaner")
if err := d.etcdWalCleaner(ctx); err != nil {
d.log.Err(err).Msgf("etcdwalcleaner error")
}
sleepCh := time.NewTimer(DefaultEtcdWalCleanInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
// etcdWalCleaner will clean already checkpointed wals from etcd
// it must always keep at least one wal that is needed for resync operations
// from clients
func (d *DataManager) etcdWalCleaner(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return errors.WithStack(err)
}
defer session.Close()
m := etcd.NewMutex(session, etcdWalCleanerLockKey)
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = m.Unlock(ctx) }()
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return errors.WithStack(err)
}
if len(resp.Kvs) <= d.etcdWalsKeepNum {
return nil
}
removeCount := len(resp.Kvs) - d.etcdWalsKeepNum
for _, kv := range resp.Kvs {
var walData WalData
if err := json.Unmarshal(kv.Value, &walData); err != nil {
return errors.WithStack(err)
}
if walData.WalStatus != WalStatusCheckpointed {
break
}
// TODO(sgotti) check that the objectstorage returns the wal actions as checkpointed.
// With eventual consistent object storages like S3 we shouldn't remove a wal
// file from etcd (and so from the cache) until we are sure there're no
// eventual consistency issues. The difficult part is how to check them and be
// sure that no objects with old data will be returned? Is it enough to read
// it back or the result could just be luckily correct but another client may
// arrive to a differnt S3 server that is not yet in sync?
d.log.Info().Msgf("removing wal %q from etcd", walData.WalSequence)
if _, err := d.e.AtomicDelete(ctx, string(kv.Key), kv.ModRevision); err != nil {
return errors.WithStack(err)
}
removeCount--
if removeCount == 0 {
return nil
}
}
return nil
}
func (d *DataManager) storageWalCleanerLoop(ctx context.Context) {
for {
d.log.Debug().Msgf("storagewalcleaner")
if err := d.storageWalCleaner(ctx); err != nil {
d.log.Err(err).Msgf("storagewalcleaner error")
}
sleepCh := time.NewTimer(DefaultStorageWalCleanInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
// storageWalCleaner will clean unneeded wals from the storage
func (d *DataManager) storageWalCleaner(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return errors.WithStack(err)
}
defer session.Close()
m := etcd.NewMutex(session, etcdStorageWalCleanerLockKey)
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = m.Unlock(ctx) }()
firstDataStatus, err := d.GetFirstDataStatus()
if err != nil {
return errors.WithStack(err)
}
firstWalSequence := firstDataStatus.WalSequence
// get the first wal in etcd (in any state) and use it's wal sequence if
// it's lesser than the first data status wal sequence
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return errors.WithStack(err)
}
if len(resp.Kvs) == 0 {
return errors.Errorf("no wals in etcd")
}
var walData WalData
if err := json.Unmarshal(resp.Kvs[0].Value, &walData); err != nil {
return errors.WithStack(err)
}
if walData.WalSequence < firstWalSequence {
firstWalSequence = walData.WalSequence
}
doneCh := make(chan struct{})
defer close(doneCh)
for object := range d.ost.List(d.storageWalStatusDir()+"/", "", true, doneCh) {
if object.Err != nil {
return fromOSTError(err)
}
name := path.Base(object.Path)
ext := path.Ext(name)
walSequence := strings.TrimSuffix(name, ext)
// handle committed status file and related data file
if ext == ".committed" {
if walSequence >= firstWalSequence {
break
}
header, err := d.ReadWal(walSequence)
if err != nil {
return errors.WithStack(err)
}
// first remove wal data file
walStatusFilePath := d.storageWalDataFile(header.WalDataFileID)
d.log.Info().Msgf("removing %q", walStatusFilePath)
if err := d.ost.DeleteObject(walStatusFilePath); err != nil {
if !objectstorage.IsNotExist(err) {
return fromOSTError(err)
}
}
// then remove wal status files
d.log.Info().Msgf("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if !objectstorage.IsNotExist(err) {
return fromOSTError(err)
}
}
}
// handle old checkpointed status file
// TODO(sgotti) remove this in future versions since .checkpointed files are not created anymore
if ext == ".checkpointed" {
d.log.Info().Msgf("removing %q", object.Path)
if err := d.ost.DeleteObject(object.Path); err != nil {
if !objectstorage.IsNotExist(err) {
return fromOSTError(err)
}
}
}
}
return nil
}
func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) {
for {
if err := d.compactChangeGroups(ctx); err != nil {
d.log.Err(err).Send()
}
sleepCh := time.NewTimer(DefaultCompactChangeGroupsInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
func (d *DataManager) compactChangeGroups(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return errors.WithStack(err)
}
defer session.Close()
m := etcd.NewMutex(session, etcdCompactChangeGroupsLockKey)
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = m.Unlock(ctx) }()
resp, err := d.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey)
if err != nil {
return errors.WithStack(err)
}
if len(resp.Kvs) == 0 {
return errors.Errorf("no change group min revision key in etcd")
}
revision := resp.Kvs[0].ModRevision
// first update minrevision
cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "=", revision)
then := etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "")
txn := d.e.Client().Txn(ctx).If(cmp).Then(then)
tresp, err := txn.Commit()
if err != nil {
return errors.WithStack(etcd.FromEtcdError(err))
}
if !tresp.Succeeded {
return errors.Errorf("failed to update change group min revision key due to concurrent update")
}
revision = tresp.Header.Revision
// then remove all the groups keys with modrevision < minrevision
resp, err = d.e.List(ctx, etcdChangeGroupsDir, "", 0)
if err != nil {
return errors.WithStack(err)
}
for _, kv := range resp.Kvs {
if kv.ModRevision < revision-etcdChangeGroupMinRevisionRange {
cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision)
then := etcdclientv3.OpDelete(string(kv.Key))
txn := d.e.Client().Txn(ctx).If(cmp).Then(then)
tresp, err := txn.Commit()
if err != nil {
return errors.WithStack(etcd.FromEtcdError(err))
}
if !tresp.Succeeded {
d.log.Err(err).Msgf("failed to update change group min revision key due to concurrent update")
}
}
}
return nil
}
// etcdPingerLoop periodically updates a key.
// This is used by watchers to inform the client of the current revision
// this is needed since if other users are updating other unwatched keys on
// etcd we won't be notified, not updating the known revisions and thus all the
// walWrites will fails since the provided changegrouptoken will have an old
// revision
// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress???
func (d *DataManager) etcdPingerLoop(ctx context.Context) {
for {
if err := d.etcdPinger(ctx); err != nil {
d.log.Err(err).Send()
}
sleepCh := time.NewTimer(DefaultEtcdPingerInterval).C
select {
case <-ctx.Done():
return
case <-sleepCh:
}
}
}
func (d *DataManager) etcdPinger(ctx context.Context) error {
if _, err := d.e.Put(ctx, etcdPingKey, []byte{}, nil); err != nil {
return errors.WithStack(err)
}
return nil
}
func (d *DataManager) InitEtcd(ctx context.Context, dataStatus *DataStatus) error {
writeWal := func(wal *WalFile, prevWalSequence string) error {
walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed")
if err != nil {
return fromOSTError(err)
}
dec := json.NewDecoder(walFile)
var header *WalHeader
if err = dec.Decode(&header); err != nil && !errors.Is(err, io.EOF) {
walFile.Close()
return errors.WithStack(err)
}
walFile.Close()
if prevWalSequence != "" {
if header.PreviousWalSequence != "" && header.PreviousWalSequence != prevWalSequence {
return errors.Errorf("wal %q previousWalSequence %q is different than expected walSequence %q", wal.WalSequence, header.PreviousWalSequence, prevWalSequence)
}
}
walData := &WalData{
WalSequence: wal.WalSequence,
WalDataFileID: header.WalDataFileID,
WalStatus: WalStatusCommittedStorage,
PreviousWalSequence: header.PreviousWalSequence,
}
walDataj, err := json.Marshal(walData)
if err != nil {
return errors.WithStack(err)
}
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
// only add if it doesn't exist
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalKey(wal.WalSequence)), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdWalKey(wal.WalSequence), string(walDataj)))
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...)
tresp, err := txn.Commit()
if err != nil {
return errors.WithStack(etcd.FromEtcdError(err))
}
if !tresp.Succeeded {
return errors.Errorf("failed to sync etcd: wal %q already written", wal.WalSequence)
}
return nil
}
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return errors.WithStack(err)
}
defer session.Close()
m := etcd.NewMutex(session, etcdInitEtcdLockKey)
if err := m.TryLock(ctx); err != nil {
if errors.Is(err, etcd.ErrLocked) {
return nil
}
return errors.WithStack(err)
}
defer func() { _ = m.Unlock(ctx) }()
mustInit := false
_, err = d.e.Get(ctx, etcdWalsDataKey, 0)
if err != nil {
if !errors.Is(err, etcd.ErrKeyNotFound) {
return errors.WithStack(err)
}
mustInit = true
}
if mustInit {
d.log.Info().Msgf("no data found in etcd, initializing")
// delete all wals from etcd
if err := d.deleteEtcd(ctx); err != nil {
return errors.WithStack(err)
}
}
// Always create changegroup min revision if it doesn't exists
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdChangeGroupMinRevisionKey), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, ""))
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...)
if _, err := txn.Commit(); err != nil {
return errors.WithStack(etcd.FromEtcdError(err))
}
if !mustInit {
return nil
}
// walsdata not found in etcd
var firstWal string
if dataStatus != nil {
firstWal = dataStatus.WalSequence
} else {
dataStatus, err = d.GetLastDataStatus()
if err != nil && !errors.Is(err, ErrNoDataStatus) {
return errors.WithStack(err)
}
// set the first wal to import in etcd if there's a snapshot. In this way we'll
// ignore older wals (or wals left after an import)
if err == nil {
firstWal = dataStatus.WalSequence
}
}
// if there're some wals in the objectstorage this means etcd has been reset.
// So take all the wals in committed or checkpointed state starting from the
// first not checkpointed wal and put them in etcd
lastCommittedStorageWalSequence := ""
previousWalSequence := ""
wroteWals := 0
for wal := range d.ListOSTWals("") {
// if there're wals in ost but not a datastatus return an error
if dataStatus == nil {
return errors.Errorf("no datastatus in etcd but some wals are present, this shouldn't happen")
}
d.log.Debug().Msgf("wal: %s", wal)
if wal.Err != nil {
return errors.WithStack(wal.Err)
}
if wal.WalSequence < firstWal {
continue
}
lastCommittedStorageWalSequence = wal.WalSequence
if err := writeWal(wal, previousWalSequence); err != nil {
return errors.WithStack(err)
}
previousWalSequence = wal.WalSequence
wroteWals++
}
// insert an empty wal and make it already committedstorage
walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey)
if err != nil {
return errors.WithStack(err)
}
walDataFileID := uuid.Must(uuid.NewV4()).String()
walDataFilePath := d.storageWalDataFile(walDataFileID)
walKey := etcdWalKey(walSequence.String())
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader([]byte{}), 0, true); err != nil {
return fromOSTError(err)
}
d.log.Debug().Msgf("wrote wal file: %s", walDataFilePath)
walFilePath := d.storageWalStatusFile(walSequence.String())
d.log.Info().Msgf("syncing committed wal %q to storage", walSequence.String())
header := &WalHeader{
WalDataFileID: walDataFileID,
PreviousWalSequence: lastCommittedStorageWalSequence,
}
headerj, err := json.Marshal(header)
if err != nil {
return errors.WithStack(err)
}
walFileCommittedPath := walFilePath + ".committed"
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil {
return fromOSTError(err)
}
walData := &WalData{
WalSequence: walSequence.String(),
WalDataFileID: walDataFileID,
WalStatus: WalStatusCommittedStorage,
PreviousWalSequence: lastCommittedStorageWalSequence,
}
lastCommittedStorageWalSequence = walSequence.String()
walsData := &WalsData{
LastCommittedWalSequence: lastCommittedStorageWalSequence,
}
walDataj, err := json.Marshal(walData)
if err != nil {
return errors.WithStack(err)
}
walsDataj, err := json.Marshal(walsData)
if err != nil {
return errors.WithStack(err)
}
// save walsdata and lastcommittedstoragewalseq only after writing all the
// wals in etcd
// in this way if something fails while adding wals to etcd it'll be retried
// since waldata doesn't exists
cmp = []etcdclientv3.Cmp{}
then = []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj)))
then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence))
then = append(then, etcdclientv3.OpPut(walKey, string(walDataj)))
txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...)
tresp, err := txn.Commit()
if err != nil {
return errors.WithStack(etcd.FromEtcdError(err))
}
if !tresp.Succeeded {
return errors.Errorf("failed to sync etcd: walsdata already written")
}
// force a checkpoint
if err := d.checkpoint(ctx, true); err != nil {
return errors.WithStack(err)
}
return nil
}