// Copyright 2019 Sorint.lab // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied // See the License for the specific language governing permissions and // limitations under the License. package wal import ( "bytes" "container/ring" "context" "encoding/json" "io" "io/ioutil" "path" "strings" "time" uuid "github.com/satori/go.uuid" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/sequence" "github.com/pkg/errors" etcdclientv3 "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" ) // TODO(sgotti) handle etcd unwanted changes: // * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it. // * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one // Storage paths // wals/{walSeq} // // Etcd paths // wals/{walSeq} const ( DefaultEtcdWalsKeepNum = 100 ) var ( ErrCompacted = errors.New("required revision has been compacted") ErrConcurrency = errors.New("wal concurrency error: change groups already updated") ) var ( // Storage paths. Always use path (not filepath) to use the "/" separator storageObjectsPrefix = "data/" storageWalsDir = "wals" storageWalsStatusDir = path.Join(storageWalsDir, "status") storageWalsDataDir = path.Join(storageWalsDir, "data") // etcd paths. Always use path (not filepath) to use the "/" separator etcdWalBaseDir = "walmanager" etcdWalsDir = path.Join(etcdWalBaseDir, "wals") etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata") etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") etcdPingKey = path.Join(etcdWalBaseDir, "ping") ) const ( etcdChangeGroupMinRevisionRange = 1000 ) func (w *WalManager) toStorageDataPath(path string) string { return w.basePath + storageObjectsPrefix + path } func (w *WalManager) fromStorageDataPath(path string) string { return strings.TrimPrefix(path, w.basePath+storageObjectsPrefix) } func (w *WalManager) storageWalStatusFile(walSeq string) string { return path.Join(w.basePath, storageWalsStatusDir, walSeq) } func (w *WalManager) storageWalDataFile(walFileID string) string { return path.Join(w.basePath, storageWalsDataDir, walFileID) } func etcdWalKey(walSeq string) string { return path.Join(etcdWalsDir, walSeq) } type ActionType string const ( ActionTypePut ActionType = "put" ActionTypeDelete ActionType = "delete" ) type Action struct { ActionType ActionType DataType string ID string Data []byte } type WalHeader struct { WalDataFileID string PreviousWalSequence string ChangeGroups map[string]int64 } type WalStatus string const ( // WalStatusCommitted represent a wal written to the lts WalStatusCommitted WalStatus = "committed" // WalStatusCommittedStorage represent the .committed marker file written to the lts WalStatusCommittedStorage WalStatus = "committed_storage" // WalStatusCheckpointed mean that all the wal actions have been executed on the lts WalStatusCheckpointed WalStatus = "checkpointed" ) type WalsData struct { LastCommittedWalSequence string Revision int64 `json:"-"` } type WalData struct { WalDataFileID string WalStatus WalStatus WalSequence string PreviousWalSequence string ChangeGroups map[string]int64 } type ChangeGroupsUpdateToken struct { CurRevision int64 `json:"cur_revision"` ChangeGroupsRevisions changeGroupsRevisions `json:"change_groups_revisions"` } type changeGroupsRevisions map[string]int64 func (w *WalManager) ChangesCurrentRevision() (int64, error) { w.changes.Lock() defer w.changes.Unlock() if !w.changes.initialized { return 0, errors.Errorf("wal changes not ready") } return w.changes.revision, nil } func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) { w.changes.Lock() defer w.changes.Unlock() if !w.changes.initialized { return nil, errors.Errorf("wal changes not ready") } revision := w.changes.curRevision() cgr := w.changes.getChangeGroups(cgNames) return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil } func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateToken) *ChangeGroupsUpdateToken { mcgt := &ChangeGroupsUpdateToken{ChangeGroupsRevisions: make(changeGroupsRevisions)} for _, cgt := range cgts { // keep the lower curRevision if cgt.CurRevision != 0 && cgt.CurRevision < mcgt.CurRevision { mcgt.CurRevision = cgt.CurRevision } // keep the lower changegroup revision for cgName, cgRev := range cgt.ChangeGroupsRevisions { if mr, ok := mcgt.ChangeGroupsRevisions[cgName]; ok { if cgRev < mr { mcgt.ChangeGroupsRevisions[cgName] = cgRev } } else { mcgt.ChangeGroupsRevisions[cgName] = cgRev } } } return mcgt } func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) { w.changes.Lock() if !w.changes.initialized { w.changes.Unlock() return nil, nil, errors.Errorf("wal changes not ready") } walseq, ok := w.changes.getPut(p) revision := w.changes.curRevision() cgr := w.changes.getChangeGroups(cgNames) actions := w.changes.actions[walseq] w.changes.Unlock() cgt := &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr} if ok { for _, action := range actions { if action.ActionType == ActionTypePut { dataPath := w.dataToPathFunc(action.DataType, action.ID) if dataPath == p { w.log.Debugf("reading file from wal: %q", dataPath) return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil } } } return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq) } f, err := w.lts.ReadObject(w.toStorageDataPath(p)) return f, cgt, err } func (w *WalManager) changesList(paths []string, prefix, startWith string, recursive bool) []string { fpaths := []string{} for _, p := range paths { if !recursive && len(p) > len(prefix) { rel := strings.TrimPrefix(p, prefix) skip := strings.Contains(rel, w.lts.Delimiter()) if skip { continue } } if strings.HasPrefix(p, prefix) && p > startWith { fpaths = append(fpaths, p) } } return fpaths } func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan objectstorage.ObjectInfo { objectCh := make(chan objectstorage.ObjectInfo, 1) prefix = w.toStorageDataPath(prefix) startWith = w.toStorageDataPath(startWith) w.changes.Lock() if !w.changes.initialized { w.changes.Unlock() objectCh <- objectstorage.ObjectInfo{Err: errors.Errorf("wal changes not ready")} return objectCh } changesList := w.changesList(w.changes.pathsOrdered, prefix, startWith, recursive) deletedChangesMap := w.changes.getDeletesMap() w.changes.Unlock() ci := 0 go func(objectCh chan<- objectstorage.ObjectInfo) { defer close(objectCh) for object := range w.lts.List(prefix, startWith, recursive, doneCh) { if object.Err != nil { objectCh <- object return } object.Path = w.fromStorageDataPath(object.Path) for ci < len(changesList) { p := changesList[ci] if p < object.Path { //w.log.Infof("using path from changelist: %q", p) select { // Send object content. case objectCh <- objectstorage.ObjectInfo{Path: p}: // If receives done from the caller, return here. case <-doneCh: return } ci++ } else if p == object.Path { ci++ break } else { break } } if _, ok := deletedChangesMap[object.Path]; ok { continue } //w.log.Infof("using path from objectstorage: %q", object.Path) select { // Send object content. case objectCh <- object: // If receives done from the caller, return here. case <-doneCh: return } } for ci < len(changesList) { //w.log.Infof("using path from changelist: %q", changesList[ci]) objectCh <- objectstorage.ObjectInfo{ Path: changesList[ci], } ci++ } }(objectCh) return objectCh } func (w *WalManager) HasLtsWal(walseq string) (bool, error) { _, err := w.lts.Stat(w.storageWalStatusFile(walseq) + ".committed") if err == objectstorage.ErrNotExist { return false, nil } if err != nil { return false, err } return true, nil } func (w *WalManager) ReadWal(walseq string) (io.ReadCloser, error) { return w.lts.ReadObject(w.storageWalStatusFile(walseq) + ".committed") } func (w *WalManager) ReadWalData(walFileID string) (io.ReadCloser, error) { return w.lts.ReadObject(w.storageWalDataFile(walFileID)) } type WalFile struct { WalSequence string Err error Committed bool Checkpointed bool } func (w *WalManager) ListLtsWals(start string) <-chan *WalFile { walCh := make(chan *WalFile, 1) go func() { doneCh := make(chan struct{}) defer close(doneCh) defer close(walCh) curWal := &WalFile{} var startPath string if start != "" { startPath = w.storageWalStatusFile(start) } for object := range w.lts.List(path.Join(w.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { if object.Err != nil { walCh <- &WalFile{ Err: object.Err, } return } name := path.Base(object.Path) ext := path.Ext(name) walSequence := strings.TrimSuffix(name, ext) // wal file refers to another wal, so return the current one if curWal.WalSequence != walSequence { // if this happen something is wrong on the lts if !curWal.Committed && curWal.Checkpointed { walCh <- &WalFile{ Err: errors.Errorf("wal is checkpointed but not committed. this should never happen"), } return } if curWal.WalSequence != "" { // skip not committed wals if curWal.Committed { walCh <- curWal } } curWal = &WalFile{ WalSequence: walSequence, } } if ext == ".committed" { curWal.Committed = true } if ext == ".checkpointed" { curWal.Checkpointed = true } } if curWal.WalSequence != "" { walCh <- curWal } }() return walCh } type ListEtcdWalsElement struct { WalData *WalData Err error } func (w *WalManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement { walCh := make(chan *ListEtcdWalsElement, 1) go func() { defer close(walCh) var continuation *etcd.ListPagedContinuation for { listResp, err := w.e.ListPaged(ctx, etcdWalsDir, revision, 10, continuation) if err != nil { walCh <- &ListEtcdWalsElement{ Err: err, } return } resp := listResp.Resp continuation = listResp.Continuation for _, kv := range resp.Kvs { var walData *WalData err := json.Unmarshal(kv.Value, &walData) walCh <- &ListEtcdWalsElement{ WalData: walData, Err: err, } } if !listResp.HasMore { break } } }() return walCh } // FirstAvailableWalData returns the first (the one with smaller sequence) wal // and returns it (or nil if not available) and the etcd revision at the time of // the operation func (w *WalManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error) { // list waldata and just get the first if available listResp, err := w.e.ListPaged(ctx, etcdWalsDir, 0, 1, nil) if err != nil { return nil, 0, err } resp := listResp.Resp revision := resp.Header.Revision if len(resp.Kvs) == 0 { return nil, revision, nil } var walData *WalData if err := json.Unmarshal(resp.Kvs[0].Value, &walData); err != nil { return nil, 0, err } return walData, revision, nil } func (w *WalManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) { resp, err := w.e.Get(ctx, etcdLastCommittedStorageWalSeqKey, 0) if err != nil && err != etcd.ErrKeyNotFound { return "", 0, err } if err == etcd.ErrKeyNotFound { return "", 0, errors.Errorf("no last committedstorage wal on etcd") } lastCommittedStorageWal := string(resp.Kvs[0].Value) revision := resp.Header.Revision return lastCommittedStorageWal, revision, nil } type WatchElement struct { Revision int64 WalData *WalData ChangeGroupsRevisions changeGroupsRevisions Err error } func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement { walCh := make(chan *WatchElement, 1) // TODO(sgotti) if the etcd cluster goes down, watch won't return an error but // wait until it comes back. We have to find a way to detect when the cluster // is down and report an error so our clients can react (i.e. a readdb could // mark itself as not in sync) wctx := etcdclientv3.WithRequireLeader(ctx) wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision) go func() { defer close(walCh) for wresp := range wch { we := &WatchElement{ChangeGroupsRevisions: make(changeGroupsRevisions)} send := false if wresp.Canceled { err := wresp.Err() switch err { case etcdclientv3rpc.ErrCompacted: we.Err = ErrCompacted default: we.Err = err } walCh <- we return } we.Revision = wresp.Header.Revision for _, ev := range wresp.Events { key := string(ev.Kv.Key) switch { case strings.HasPrefix(key, etcdWalsDir+"/"): send = true switch ev.Type { case mvccpb.PUT: var walData *WalData if err := json.Unmarshal(ev.Kv.Value, &walData); err != nil { we.Err = wresp.Err() walCh <- we return } we.WalData = walData } case strings.HasPrefix(key, etcdChangeGroupsDir+"/"): send = true switch ev.Type { case mvccpb.PUT: changeGroup := path.Base(string(ev.Kv.Key)) we.ChangeGroupsRevisions[changeGroup] = ev.Kv.ModRevision case mvccpb.DELETE: changeGroup := path.Base(string(ev.Kv.Key)) we.ChangeGroupsRevisions[changeGroup] = 0 } case key == etcdPingKey: send = true default: continue } } if send { walCh <- we } } }() return walCh } // WriteWal writes the provided actions in a wal file. The wal will be marked as // "committed" on etcd if the provided group changes aren't changed in the // meantime or a optimistic concurrency error will be returned and the wal won't // be committed // // TODO(sgotti) save inside the wal file also the previous committed wal to // handle possible lts list operation eventual consistency gaps (list won't // report a wal at seq X but a wal at X+n, if this kind of eventual consistency // ever exists) func (w *WalManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) { return w.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil) } func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error) { if len(actions) == 0 { return nil, errors.Errorf("cannot write wal: actions is empty") } walSequence, err := sequence.IncSequence(ctx, w.e, etcdWalSeqKey) if err != nil { return nil, err } resp, err := w.e.Get(ctx, etcdWalsDataKey, 0) if err != nil { return nil, err } var walsData WalsData if err := json.Unmarshal(resp.Kvs[0].Value, &walsData); err != nil { return nil, err } walsData.Revision = resp.Kvs[0].ModRevision walDataFileID := uuid.NewV4().String() walDataFilePath := w.storageWalDataFile(walDataFileID) walKey := etcdWalKey(walSequence.String()) var buf bytes.Buffer for _, action := range actions { actionj, err := json.Marshal(action) if err != nil { return nil, err } if _, err := buf.Write(actionj); err != nil { return nil, err } } if err := w.lts.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil { return nil, err } w.log.Debugf("wrote wal file: %s", walDataFilePath) walsData.LastCommittedWalSequence = walSequence.String() walData := &WalData{ WalSequence: walSequence.String(), WalDataFileID: walDataFileID, WalStatus: WalStatusCommitted, } walsDataj, err := json.Marshal(walsData) if err != nil { return nil, err } walDataj, err := json.Marshal(walData) if err != nil { return nil, err } if cmp == nil { cmp = []etcdclientv3.Cmp{} } if then == nil { then = []etcdclientv3.Op{} } getWalsData := etcdclientv3.OpGet(etcdWalsDataKey) getWal := etcdclientv3.OpGet(walKey) //w.log.Infof("cgt: %s", util.Dump(cgt)) if cgt != nil { for cgName, cgRev := range cgt.ChangeGroupsRevisions { cgKey := path.Join(etcdChangeGroupsDir, cgName) if cgRev > 0 { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(cgKey), "=", cgRev)) } else { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(cgKey), "=", 0)) } then = append(then, etcdclientv3.OpPut(cgKey, "")) } if cgt.CurRevision > 0 { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "<", cgt.CurRevision+etcdChangeGroupMinRevisionRange)) } } cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(etcdWalsDataKey), "=", walsData.Revision)) cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.Version(walKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) then = append(then, etcdclientv3.OpPut(walKey, string(walDataj))) // This will only succeed if no one else have concurrently updated the walsData // TODO(sgotti) retry if it failed due to concurrency errors txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...).Else(getWalsData, getWal) tresp, err := txn.Commit() if err != nil { return nil, etcd.FromEtcdError(err) } if !tresp.Succeeded { walsDataRev := tresp.Responses[0].GetResponseRange().Kvs[0].ModRevision walDataCreateRev := tresp.Responses[0].GetResponseRange().Kvs[0].CreateRevision // TODO(sgotti) If the tx failed due to walsdata already updated we could retry if walsDataRev == walsData.Revision && walDataCreateRev == 0 { return nil, errors.Errorf("failed to write committed wal: wals groups already updated") } return nil, ErrConcurrency } ncgt := &ChangeGroupsUpdateToken{CurRevision: tresp.Header.Revision, ChangeGroupsRevisions: make(changeGroupsRevisions)} if cgt != nil { for cgName := range cgt.ChangeGroupsRevisions { ncgt.ChangeGroupsRevisions[cgName] = tresp.Header.Revision } } // try to commit storage right now if err := w.sync(ctx); err != nil { w.log.Errorf("wal sync error: %+v", err) } return ncgt, nil } func (w *WalManager) syncLoop(ctx context.Context) { for { w.log.Debugf("syncer") if err := w.sync(ctx); err != nil { w.log.Errorf("syncer error: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(5 * time.Second) } } func (w *WalManager) sync(ctx context.Context) error { session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err } defer session.Close() m := concurrency.NewMutex(session, etcdSyncLockKey) if err := m.Lock(ctx); err != nil { return err } defer m.Unlock(ctx) resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { return err } for _, kv := range resp.Kvs { var walData WalData if err := json.Unmarshal(kv.Value, &walData); err != nil { return err } // wals must be committed and checkpointed in order. // TODO(sgotti) this could be optimized by parallelizing writes of wals that don't have common change groups switch walData.WalStatus { case WalStatusCommitted: walFilePath := w.storageWalStatusFile(walData.WalSequence) w.log.Debugf("syncing committed wal to storage") header := &WalHeader{ WalDataFileID: walData.WalDataFileID, ChangeGroups: walData.ChangeGroups, PreviousWalSequence: walData.PreviousWalSequence, } headerj, err := json.Marshal(header) if err != nil { return err } walFileCommittedPath := walFilePath + ".committed" if err := w.lts.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil { return err } w.log.Debugf("updating wal to state %q", WalStatusCommittedStorage) walData.WalStatus = WalStatusCommittedStorage walDataj, err := json.Marshal(walData) if err != nil { return err } cmp := []etcdclientv3.Cmp{} then := []etcdclientv3.Op{} cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision)) then = append(then, etcdclientv3.OpPut(string(kv.Key), string(walDataj))) then = append(then, etcdclientv3.OpPut(string(etcdLastCommittedStorageWalSeqKey), string(walData.WalSequence))) // This will only succeed if the no one else have concurrently updated the wal keys in etcd txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) } if !tresp.Succeeded { return errors.Errorf("failed to write committedstorage wal: concurrent update") } case WalStatusCheckpointed: walFilePath := w.storageWalStatusFile(walData.WalSequence) w.log.Debugf("checkpointing committed wal to storage") walFileCheckpointedPath := walFilePath + ".checkpointed" if err := w.lts.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil { return err } } } return nil } func (w *WalManager) checkpointLoop(ctx context.Context) { for { w.log.Debugf("checkpointer") if err := w.checkpoint(ctx); err != nil { w.log.Errorf("checkpoint error: %v", err) } select { case <-ctx.Done(): return default: } time.Sleep(2 * time.Second) } } func (w *WalManager) checkpoint(ctx context.Context) error { session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err } defer session.Close() m := concurrency.NewMutex(session, etcdCheckpointLockKey) if err := m.Lock(ctx); err != nil { return err } defer m.Unlock(ctx) resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { return err } for _, kv := range resp.Kvs { var walData WalData if err := json.Unmarshal(kv.Value, &walData); err != nil { return err } if walData.WalStatus == WalStatusCommitted { w.log.Warnf("wal %s not yet committed storage", walData.WalSequence) break } if walData.WalStatus == WalStatusCheckpointed { continue } walFilePath := w.storageWalDataFile(walData.WalDataFileID) w.log.Debugf("checkpointing wal: %q", walData.WalSequence) walFile, err := w.lts.ReadObject(walFilePath) if err != nil { return err } dec := json.NewDecoder(walFile) for { var action *Action err := dec.Decode(&action) if err == io.EOF { // all done break } if err != nil { walFile.Close() return err } if err := w.checkpointAction(ctx, action); err != nil { walFile.Close() return err } } walFile.Close() w.log.Debugf("updating wal to state %q", WalStatusCheckpointed) walData.WalStatus = WalStatusCheckpointed walDataj, err := json.Marshal(walData) if err != nil { return err } if _, err := w.e.AtomicPut(ctx, string(kv.Key), walDataj, kv.ModRevision, nil); err != nil { return err } } return nil } func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error { dataPath := w.dataToPathFunc(action.DataType, action.ID) if dataPath == "" { return nil } path := w.toStorageDataPath(dataPath) switch action.ActionType { case ActionTypePut: w.log.Debugf("writing file: %q", path) if err := w.lts.WriteObject(path, bytes.NewReader(action.Data)); err != nil { return err } case ActionTypeDelete: w.log.Debugf("deleting file: %q", path) if err := w.lts.DeleteObject(path); err != nil && err != objectstorage.ErrNotExist { return err } } return nil } func (w *WalManager) walCleanerLoop(ctx context.Context) { for { w.log.Debugf("walcleaner") if err := w.walCleaner(ctx); err != nil { w.log.Errorf("walcleaner error: %v", err) } select { case <-ctx.Done(): return default: } time.Sleep(2 * time.Second) } } // walCleaner will clean already checkpointed wals from etcd // it must always keep at least one wal that is needed for resync operations // from clients func (w *WalManager) walCleaner(ctx context.Context) error { session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err } defer session.Close() m := concurrency.NewMutex(session, etcdWalCleanerLockKey) if err := m.Lock(ctx); err != nil { return err } defer m.Unlock(ctx) resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { return err } if len(resp.Kvs) <= w.etcdWalsKeepNum { return nil } removeCount := len(resp.Kvs) - w.etcdWalsKeepNum for _, kv := range resp.Kvs { var walData WalData if err := json.Unmarshal(kv.Value, &walData); err != nil { return err } if walData.WalStatus != WalStatusCheckpointed { break } // TODO(sgotti) check that the objectstorage returns the wal actions as checkpointed. // With eventual consistent object storages like S3 we shouldn't remove a wal // file from etcd (and so from the cache) until we are sure there're no // eventual consistency issues. The difficult part is how to check them and be // sure that no objects with old data will be returned? Is it enough to read // it back or the result could just be luckily correct but another client may // arrive to a differnt S3 server that is not yet in sync? w.log.Infof("removing wal %q from etcd", walData.WalSequence) if _, err := w.e.AtomicDelete(ctx, string(kv.Key), kv.ModRevision); err != nil { return err } removeCount-- if removeCount == 0 { return nil } } return nil } func (w *WalManager) compactChangeGroupsLoop(ctx context.Context) { for { if err := w.compactChangeGroups(ctx); err != nil { w.log.Errorf("err: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(1 * time.Second) } } func (w *WalManager) compactChangeGroups(ctx context.Context) error { resp, err := w.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey) if err != nil { return err } revision := resp.Kvs[0].ModRevision // first update minrevision cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "=", revision) then := etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "") txn := w.e.Client().Txn(ctx).If(cmp).Then(then) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) } if !tresp.Succeeded { return errors.Errorf("failed to update change group min revision key due to concurrent update") } revision = tresp.Header.Revision // then remove all the groups keys with modrevision < minrevision resp, err = w.e.List(ctx, etcdChangeGroupsDir, "", 0) if err != nil { return err } for _, kv := range resp.Kvs { if kv.ModRevision < revision-etcdChangeGroupMinRevisionRange { cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision) then := etcdclientv3.OpDelete(string(kv.Key)) txn := w.e.Client().Txn(ctx).If(cmp).Then(then) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) } if !tresp.Succeeded { w.log.Errorf("failed to update change group min revision key due to concurrent update") } } } return nil } // etcdPingerLoop periodically updates a key. // This is used by watchers to inform the client of the current revision // this is needed since if other users are updating other unwatched keys on // etcd we won't be notified, not updating the known revisions and thus all the // walWrites will fails since the provided changegrouptoken will have an old // revision // TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? func (w *WalManager) etcdPingerLoop(ctx context.Context) { for { if err := w.etcdPinger(ctx); err != nil { w.log.Errorf("err: %+v", err) } select { case <-ctx.Done(): return default: } time.Sleep(1 * time.Second) } } func (w *WalManager) etcdPinger(ctx context.Context) error { if _, err := w.e.Put(ctx, etcdPingKey, []byte{}, nil); err != nil { return err } return nil } func (w *WalManager) InitEtcd(ctx context.Context) error { writeWal := func(wal *WalFile) error { w.log.Infof("wal seq: %s", wal.WalSequence) walFile, err := w.lts.ReadObject(w.storageWalStatusFile(wal.WalSequence) + ".committed") if err != nil { return err } dec := json.NewDecoder(walFile) var header *WalHeader if err = dec.Decode(&header); err != nil && err != io.EOF { walFile.Close() return err } walFile.Close() walData := &WalData{ WalSequence: wal.WalSequence, WalDataFileID: header.WalDataFileID, WalStatus: WalStatusCommitted, ChangeGroups: header.ChangeGroups, } if wal.Checkpointed { walData.WalStatus = WalStatusCheckpointed } walDataj, err := json.Marshal(walData) if err != nil { return err } cmp := []etcdclientv3.Cmp{} then := []etcdclientv3.Op{} // only add if it doesn't exist cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalKey(wal.WalSequence)), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalKey(wal.WalSequence), string(walDataj))) txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) } if !tresp.Succeeded { return errors.Errorf("failed to sync etcd: wal %q already written", wal.WalSequence) } return nil } // Create changegroup min revision if it doesn't exists cmp := []etcdclientv3.Cmp{} then := []etcdclientv3.Op{} cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdChangeGroupMinRevisionKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "")) txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) if _, err := txn.Commit(); err != nil { return etcd.FromEtcdError(err) } _, err := w.e.Get(ctx, etcdWalsDataKey, 0) if err != nil && err != etcd.ErrKeyNotFound { return err } if err == nil { return nil } w.log.Infof("no data found in etcd, initializing") // walsdata not found in etcd // if there're some wals in the lts this means etcd has been reset. // So take all the wals in committed or checkpointed state starting from the // first not checkpointed wal and put them in etcd lastCommittedStorageWalsRing := ring.New(100) lastCommittedStorageWalElem := lastCommittedStorageWalsRing lastCommittedStorageWalSequence := "" wroteWals := 0 for wal := range w.ListLtsWals("") { w.log.Infof("wal: %s", wal) if wal.Err != nil { return wal.Err } lastCommittedStorageWalElem.Value = wal lastCommittedStorageWalElem = lastCommittedStorageWalElem.Next() lastCommittedStorageWalSequence = wal.WalSequence if wal.Checkpointed { continue } if err := writeWal(wal); err != nil { return err } wroteWals++ } // if no wal has been written (because all are checkpointed), write at least // the ones in the ring if wroteWals == 0 { var err error lastCommittedStorageWalsRing.Do(func(e interface{}) { if e == nil { return } wal := e.(*WalFile) err = writeWal(wal) if err != nil { return } lastCommittedStorageWalSequence = wal.WalSequence }) if err != nil { return err } } walsData := &WalsData{ LastCommittedWalSequence: lastCommittedStorageWalSequence, } walsDataj, err := json.Marshal(walsData) if err != nil { return err } // save walsdata and lastcommittedstoragewalseq only after writing all the // wals in etcd // in this way if something fails while adding wals to etcd it'll be retried // since waldata doesn't exists cmp = []etcdclientv3.Cmp{} then = []etcdclientv3.Op{} cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence)) txn = w.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) } if !tresp.Succeeded { return errors.Errorf("failed to sync etcd: waldata already written") } return nil } type CheckpointFunc func(action *Action) error type DataToPathFunc func(dataType string, id string) string func NoOpDataToPath(dataType string, id string) string { return "" } type WalManagerConfig struct { BasePath string E *etcd.Store Lts *objectstorage.ObjStorage EtcdWalsKeepNum int CheckpointFunc CheckpointFunc DataToPathFunc DataToPathFunc } type WalManager struct { basePath string log *zap.SugaredLogger e *etcd.Store lts *objectstorage.ObjStorage changes *WalChanges etcdWalsKeepNum int checkpointFunc CheckpointFunc dataToPathFunc DataToPathFunc } func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) { if conf.EtcdWalsKeepNum == 0 { conf.EtcdWalsKeepNum = DefaultEtcdWalsKeepNum } if conf.EtcdWalsKeepNum < 1 { return nil, errors.New("etcdWalsKeepNum must be greater than 0") } dataToPathFunc := conf.DataToPathFunc if dataToPathFunc == nil { dataToPathFunc = NoOpDataToPath } w := &WalManager{ basePath: conf.BasePath, log: logger.Sugar(), e: conf.E, lts: conf.Lts, etcdWalsKeepNum: conf.EtcdWalsKeepNum, changes: NewWalChanges(), checkpointFunc: conf.CheckpointFunc, dataToPathFunc: dataToPathFunc, } // add trailing slash the basepath if w.basePath != "" && !strings.HasSuffix(w.basePath, "/") { w.basePath = w.basePath + "/" } return w, nil } func (w *WalManager) Run(ctx context.Context, readyCh chan struct{}) error { for { err := w.InitEtcd(ctx) if err == nil { break } w.log.Errorf("failed to initialize etcd: %+v", err) time.Sleep(1 * time.Second) } readyCh <- struct{}{} go w.watcherLoop(ctx) go w.syncLoop(ctx) go w.checkpointLoop(ctx) go w.walCleanerLoop(ctx) go w.compactChangeGroupsLoop(ctx) go w.etcdPingerLoop(ctx) select { case <-ctx.Done(): w.log.Infof("walmanager exiting") return nil } }