diff --git a/internal/wal/changes.go b/internal/datamanager/changes.go similarity index 53% rename from internal/wal/changes.go rename to internal/datamanager/changes.go index de31741..b2c8e77 100644 --- a/internal/wal/changes.go +++ b/internal/datamanager/changes.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package datamanager import ( "context" @@ -20,7 +20,6 @@ import ( "fmt" "io" "path" - "sort" "strings" "sync" "time" @@ -32,11 +31,12 @@ import ( "go.etcd.io/etcd/mvcc/mvccpb" ) +// TODO(sgotti) rewrite this to use a sqlite local cache + type WalChanges struct { actions map[string][]*Action - puts map[string]string - deletes map[string]string - pathsOrdered []string + puts map[string]map[string]string // map[dataType]map[id] + deletes map[string]map[string]string walSeq string revision int64 changeGroupsRevisions changeGroupsRevisions @@ -44,13 +44,20 @@ type WalChanges struct { sync.Mutex } -func NewWalChanges() *WalChanges { - return &WalChanges{ +func NewWalChanges(dataTypes []string) *WalChanges { + changes := &WalChanges{ actions: make(map[string][]*Action), - puts: make(map[string]string), - deletes: make(map[string]string), + puts: make(map[string]map[string]string), + deletes: make(map[string]map[string]string), changeGroupsRevisions: make(changeGroupsRevisions), } + + for _, dataType := range dataTypes { + changes.puts[dataType] = make(map[string]string) + changes.deletes[dataType] = make(map[string]string) + } + + return changes } func (c *WalChanges) String() string { @@ -69,8 +76,8 @@ func (c *WalChanges) curWalSeq() string { return c.walSeq } -func (c *WalChanges) getPut(p string) (string, bool) { - walseq, ok := c.puts[p] +func (c *WalChanges) getPut(dataType, id string) (string, bool) { + walseq, ok := c.puts[dataType][id] return walseq, ok } @@ -87,30 +94,30 @@ func (c *WalChanges) getDelete(p string) bool { return ok } -func (c *WalChanges) addPut(p, walseq string, revision int64) { - delete(c.deletes, p) - c.puts[p] = walseq +func (c *WalChanges) addPut(dataType, id, walseq string, revision int64) { + delete(c.deletes[dataType], id) + c.puts[dataType][id] = walseq c.walSeq = walseq c.revision = revision } -func (c *WalChanges) removePut(p string, revision int64) { - delete(c.puts, p) +func (c *WalChanges) removePut(dataType, id string, revision int64) { + delete(c.puts[dataType], id) c.revision = revision } -func (c *WalChanges) addDelete(p, walseq string, revision int64) { - delete(c.puts, p) - c.deletes[p] = walseq +func (c *WalChanges) addDelete(dataType, id, walseq string, revision int64) { + delete(c.puts[dataType], id) + c.deletes[dataType][id] = walseq c.walSeq = walseq c.revision = revision } -func (c *WalChanges) removeDelete(p string, revision int64) { - delete(c.deletes, p) +func (c *WalChanges) removeDelete(dataType, id string, revision int64) { + delete(c.deletes[dataType], id) c.revision = revision } @@ -137,28 +144,18 @@ func (c *WalChanges) removeChangeGroup(cgName string) { delete(c.changeGroupsRevisions, cgName) } -func (c *WalChanges) updatePathsOrdered() { - c.pathsOrdered = make([]string, len(c.puts)) - i := 0 - for p := range c.puts { - c.pathsOrdered[i] = p - i++ - } - sort.Sort(sort.StringSlice(c.pathsOrdered)) -} +func (d *DataManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error { + walDataFilePath := d.storageWalDataFile(walData.WalDataFileID) -func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error { - walDataFilePath := w.storageWalDataFile(walData.WalDataFileID) - - walDataFile, err := w.ost.ReadObject(walDataFilePath) + walDataFile, err := d.ost.ReadObject(walDataFilePath) if err != nil { return errors.Wrapf(err, "failed to read waldata %q", walDataFilePath) } defer walDataFile.Close() dec := json.NewDecoder(walDataFile) - w.changes.Lock() - defer w.changes.Unlock() + d.changes.Lock() + defer d.changes.Unlock() for { var action *Action @@ -171,48 +168,42 @@ func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revi return errors.Wrapf(err, "failed to decode wal file") } - w.applyWalChangesAction(ctx, action, walData.WalSequence, revision) + d.applyWalChangesAction(ctx, action, walData.WalSequence, revision) } - w.changes.updatePathsOrdered() - return nil } -func (w *WalManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) { - dataPath := w.dataToPathFunc(action.DataType, action.ID) - if dataPath == "" { - return - } +func (d *DataManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) { switch action.ActionType { case ActionTypePut: - w.changes.addPut(dataPath, walSequence, revision) + d.changes.addPut(action.DataType, action.ID, walSequence, revision) case ActionTypeDelete: - w.changes.addDelete(dataPath, walSequence, revision) + d.changes.addDelete(action.DataType, action.ID, walSequence, revision) } - if w.changes.actions[walSequence] == nil { - w.changes.actions[walSequence] = []*Action{} + if d.changes.actions[walSequence] == nil { + d.changes.actions[walSequence] = []*Action{} } - w.changes.actions[walSequence] = append(w.changes.actions[walSequence], action) + d.changes.actions[walSequence] = append(d.changes.actions[walSequence], action) } -func (w *WalManager) watcherLoop(ctx context.Context) error { +func (d *DataManager) watcherLoop(ctx context.Context) error { for { - initialized := w.changes.initialized + initialized := d.changes.initialized if !initialized { - if err := w.initializeChanges(ctx); err != nil { - w.log.Errorf("watcher err: %+v", err) + if err := d.initializeChanges(ctx); err != nil { + d.log.Errorf("watcher err: %+v", err) } } else { - if err := w.watcher(ctx); err != nil { - w.log.Errorf("watcher err: %+v", err) + if err := d.watcher(ctx); err != nil { + d.log.Errorf("watcher err: %+v", err) } } select { case <-ctx.Done(): - w.log.Infof("watcher exiting") + d.log.Infof("watcher exiting") return nil default: } @@ -221,11 +212,11 @@ func (w *WalManager) watcherLoop(ctx context.Context) error { } } -func (w *WalManager) initializeChanges(ctx context.Context) error { +func (d *DataManager) initializeChanges(ctx context.Context) error { var revision int64 var continuation *etcd.ListPagedContinuation for { - listResp, err := w.e.ListPaged(ctx, etcdWalsDir+"/", 0, 10, continuation) + listResp, err := d.e.ListPaged(ctx, etcdWalsDir+"/", 0, 10, continuation) if err != nil { return err } @@ -239,7 +230,7 @@ func (w *WalManager) initializeChanges(ctx context.Context) error { if err := json.Unmarshal(kv.Value, &walData); err != nil { return err } - if err := w.applyWalChanges(ctx, walData, revision); err != nil { + if err := d.applyWalChanges(ctx, walData, revision); err != nil { return err } } @@ -251,7 +242,7 @@ func (w *WalManager) initializeChanges(ctx context.Context) error { continuation = nil // use the same revision for { - listResp, err := w.e.ListPaged(ctx, etcdChangeGroupsDir+"/", 0, 10, continuation) + listResp, err := d.e.ListPaged(ctx, etcdChangeGroupsDir+"/", 0, 10, continuation) if err != nil { return err } @@ -259,40 +250,40 @@ func (w *WalManager) initializeChanges(ctx context.Context) error { continuation = listResp.Continuation for _, kv := range resp.Kvs { - w.changes.Lock() + d.changes.Lock() changeGroup := path.Base(string(kv.Key)) - w.changes.putChangeGroup(changeGroup, kv.ModRevision) - w.changes.Unlock() + d.changes.putChangeGroup(changeGroup, kv.ModRevision) + d.changes.Unlock() } if !listResp.HasMore { break } } - w.changes.Lock() - w.changes.revision = revision - w.changes.initialized = true - w.changes.Unlock() + d.changes.Lock() + d.changes.revision = revision + d.changes.initialized = true + d.changes.Unlock() return nil } -func (w *WalManager) watcher(ctx context.Context) error { - w.changes.Lock() - revision := w.changes.curRevision() - w.changes.Unlock() +func (d *DataManager) watcher(ctx context.Context) error { + d.changes.Lock() + revision := d.changes.curRevision() + d.changes.Unlock() wctx, cancel := context.WithCancel(ctx) defer cancel() - wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision+1) + wch := d.e.Watch(wctx, etcdWalBaseDir+"/", revision+1) for wresp := range wch { if wresp.Canceled { err := wresp.Err() if err == etcdclientv3rpc.ErrCompacted { - w.log.Errorf("required events already compacted, reinitializing watcher changes") - w.changes.Lock() - w.changes.initialized = false - w.changes.Unlock() + d.log.Errorf("required events already compacted, reinitializing watcher changes") + d.changes.Lock() + d.changes.initialized = false + d.changes.Unlock() } return errors.Wrapf(err, "watch error") } @@ -312,56 +303,66 @@ func (w *WalManager) watcher(ctx context.Context) error { if walData.WalStatus != WalStatusCommitted { continue } - if err := w.applyWalChanges(ctx, walData, revision); err != nil { + if err := d.applyWalChanges(ctx, walData, revision); err != nil { return err } case mvccpb.DELETE: walseq := path.Base(string(key)) - w.changes.Lock() - putsToDelete := []string{} - deletesToDelete := []string{} - for p, pwalseq := range w.changes.puts { - if pwalseq == walseq { - putsToDelete = append(putsToDelete, p) + d.changes.Lock() + putsToDelete := map[string][]string{} + deletesToDelete := map[string][]string{} + for _, dataType := range d.dataTypes { + putsToDelete[dataType] = []string{} + deletesToDelete[dataType] = []string{} + } + for _, dataType := range d.dataTypes { + for p, pwalseq := range d.changes.puts[dataType] { + if pwalseq == walseq { + putsToDelete[dataType] = append(putsToDelete[dataType], p) + } } } - for p, pwalseq := range w.changes.deletes { - if pwalseq == walseq { - deletesToDelete = append(deletesToDelete, p) + for _, dataType := range d.dataTypes { + for id, pwalseq := range d.changes.deletes[dataType] { + if pwalseq == walseq { + deletesToDelete[dataType] = append(deletesToDelete[dataType], id) + } } } - for _, p := range putsToDelete { - w.changes.removePut(p, revision) + for dataType, ids := range putsToDelete { + for _, id := range ids { + d.changes.removePut(dataType, id, revision) + } } - for _, p := range deletesToDelete { - w.changes.removeDelete(p, revision) + for dataType, ids := range putsToDelete { + for _, id := range ids { + d.changes.removeDelete(dataType, id, revision) + } } - delete(w.changes.actions, walseq) + delete(d.changes.actions, walseq) - w.changes.updatePathsOrdered() - - w.changes.Unlock() + d.changes.Unlock() } case strings.HasPrefix(key, etcdChangeGroupsDir+"/"): switch ev.Type { case mvccpb.PUT: - w.changes.Lock() + d.changes.Lock() changeGroup := strings.TrimPrefix(string(ev.Kv.Key), etcdChangeGroupsDir+"/") - w.changes.putChangeGroup(changeGroup, ev.Kv.ModRevision) - w.changes.Unlock() + d.changes.putChangeGroup(changeGroup, ev.Kv.ModRevision) + d.changes.Unlock() case mvccpb.DELETE: - w.changes.Lock() + d.changes.Lock() changeGroup := strings.TrimPrefix(string(ev.Kv.Key), etcdChangeGroupsDir+"/") - w.changes.removeChangeGroup(changeGroup) - w.changes.Unlock() + d.changes.removeChangeGroup(changeGroup) + d.changes.Unlock() } case key == etcdPingKey: - w.changes.Lock() - w.changes.putRevision(wresp.Header.Revision) - w.changes.Unlock() + d.changes.Lock() + d.changes.putRevision(wresp.Header.Revision) + d.changes.Unlock() } } } diff --git a/internal/datamanager/data.go b/internal/datamanager/data.go new file mode 100644 index 0000000..d100843 --- /dev/null +++ b/internal/datamanager/data.go @@ -0,0 +1,300 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package datamanager + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "strings" + + "github.com/pkg/errors" + "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/sequence" +) + +type DataStatus struct { + DataSequence string `json:"data_sequence,omitempty"` + WalSequence string `json:"wal_sequence,omitempty"` + Files map[string][]string `json:"files,omitempty"` +} + +type DataFileIndex struct { + Index map[string]int `json:"index,omitempty"` +} + +type DataEntry struct { + ID string `json:"id,omitempty"` + DataType string `json:"data_type,omitempty"` + Data []byte `json:"data,omitempty"` +} + +func dataStatusPath(sequence string) string { + return fmt.Sprintf("%s/%s.status", storageDataDir, sequence) +} + +func dataFileIndexPath(datatype, sequence string) string { + return fmt.Sprintf("%s/%s/%s.index", storageDataDir, datatype, sequence) +} + +func dataFilePath(datatype, sequence string) string { + return fmt.Sprintf("%s/%s/%s.data", storageDataDir, datatype, sequence) +} + +// TODO(sgotti) +// split/merge data files at max N bytes (i.e 16MiB) so we'll rewrite only files +// with changed data + +func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error { + dataSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey) + if err != nil { + return err + } + + for _, dataType := range d.dataTypes { + if err := d.writeDataType(ctx, wals, dataType, dataSequence.String()); err != nil { + return err + } + } + + var lastWalSequence string + for _, walData := range wals { + lastWalSequence = walData.WalSequence + } + + dataStatus := &DataStatus{ + DataSequence: dataSequence.String(), + WalSequence: lastWalSequence, + Files: make(map[string][]string), + } + for _, dataType := range d.dataTypes { + dataStatus.Files[dataType] = []string{dataFilePath(dataType, dataSequence.String())} + } + + dataStatusj, err := json.Marshal(dataStatus) + if err != nil { + return err + } + if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj)); err != nil { + return err + } + + return nil +} + +func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, datatype, dataSequence string) error { + curDataStatus, err := d.GetLastDataStatus() + if err != nil && err != objectstorage.ErrNotExist { + return err + } + + dataEntriesMap := map[string]*DataEntry{} + if err != objectstorage.ErrNotExist { + curDataSequence := curDataStatus.DataSequence + + oldDataf, err := d.ost.ReadObject(dataFilePath(datatype, curDataSequence)) + if err != nil && err != objectstorage.ErrNotExist { + return err + } + if err != objectstorage.ErrNotExist { + dec := json.NewDecoder(oldDataf) + for { + var de *DataEntry + + err := dec.Decode(&de) + if err == io.EOF { + // all done + break + } + if err != nil { + oldDataf.Close() + return err + } + dataEntriesMap[de.ID] = de + } + oldDataf.Close() + } + } + + for _, walData := range wals { + walFilef, err := d.ReadWal(walData.WalSequence) + if err != nil { + return err + } + dec := json.NewDecoder(walFilef) + var header *WalHeader + if err = dec.Decode(&header); err != nil && err != io.EOF { + walFilef.Close() + return err + } + walFilef.Close() + + walFile, err := d.ReadWalData(header.WalDataFileID) + if err != nil { + return errors.Wrapf(err, "cannot read wal data file %q", header.WalDataFileID) + } + defer walFile.Close() + + dec = json.NewDecoder(walFile) + for { + var action *Action + + err := dec.Decode(&action) + if err == io.EOF { + // all done + break + } + if err != nil { + return errors.Wrapf(err, "failed to decode wal file") + } + if action.DataType != datatype { + continue + } + + switch action.ActionType { + case ActionTypePut: + de := &DataEntry{ + ID: action.ID, + DataType: action.DataType, + Data: action.Data, + } + dataEntriesMap[de.ID] = de + case ActionTypeDelete: + delete(dataEntriesMap, action.ID) + } + } + } + + dataEntries := []*DataEntry{} + for _, de := range dataEntriesMap { + dataEntries = append(dataEntries, de) + } + + dataFileIndex := &DataFileIndex{ + Index: make(map[string]int), + } + + var buf bytes.Buffer + pos := 0 + for _, de := range dataEntries { + dataFileIndex.Index[de.ID] = pos + + dataEntryj, err := json.Marshal(de) + if err != nil { + return err + } + if _, err := buf.Write(dataEntryj); err != nil { + return err + } + + pos += len(dataEntryj) + } + if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf); err != nil { + return err + } + + dataFileIndexj, err := json.Marshal(dataFileIndex) + if err != nil { + return err + } + if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj)); err != nil { + return err + } + + return nil +} + +func (d *DataManager) Read(dataType, id string) (io.Reader, error) { + curDataStatus, err := d.GetLastDataStatus() + if err != nil { + return nil, err + } + dataSequence := curDataStatus.DataSequence + + dataFileIndexf, err := d.ost.ReadObject(dataFileIndexPath(dataType, dataSequence)) + if err != nil { + return nil, err + } + var dataFileIndex *DataFileIndex + dec := json.NewDecoder(dataFileIndexf) + err = dec.Decode(&dataFileIndex) + if err != nil { + dataFileIndexf.Close() + return nil, errors.WithStack(err) + } + dataFileIndexf.Close() + + pos, ok := dataFileIndex.Index[id] + if !ok { + return nil, objectstorage.ErrNotExist + } + + dataf, err := d.ost.ReadObject(dataFilePath(dataType, dataSequence)) + if err != nil { + return nil, errors.WithStack(err) + } + if _, err := dataf.Seek(int64(pos), io.SeekStart); err != nil { + dataf.Close() + return nil, errors.WithStack(err) + } + var de *DataEntry + dec = json.NewDecoder(dataf) + if err := dec.Decode(&de); err != nil { + dataf.Close() + return nil, err + } + dataf.Close() + + return bytes.NewReader(de.Data), nil +} + +func (d *DataManager) GetLastDataStatusPath() (string, error) { + doneCh := make(chan struct{}) + defer close(doneCh) + + var dataStatusPath string + for object := range d.ost.List(storageDataDir+"/", "", false, doneCh) { + if object.Err != nil { + return "", object.Err + } + if strings.HasSuffix(object.Path, ".status") { + dataStatusPath = object.Path + } + } + if dataStatusPath == "" { + return "", objectstorage.ErrNotExist + } + + return dataStatusPath, nil +} + +func (d *DataManager) GetLastDataStatus() (*DataStatus, error) { + dataStatusPath, err := d.GetLastDataStatusPath() + if err != nil { + return nil, err + } + + dataStatusf, err := d.ost.ReadObject(dataStatusPath) + if err != nil { + return nil, err + } + defer dataStatusf.Close() + var dataStatus *DataStatus + dec := json.NewDecoder(dataStatusf) + + return dataStatus, dec.Decode(&dataStatus) +} diff --git a/internal/datamanager/datamanager.go b/internal/datamanager/datamanager.go new file mode 100644 index 0000000..ad06710 --- /dev/null +++ b/internal/datamanager/datamanager.go @@ -0,0 +1,164 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package datamanager + +import ( + "context" + "path" + "strings" + "time" + + "github.com/sorintlab/agola/internal/etcd" + "github.com/sorintlab/agola/internal/objectstorage" + + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// TODO(sgotti) handle etcd unwanted changes: +// * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it. +// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one + +// Storage paths +// wals/{walSeq} +// +// Etcd paths +// wals/{walSeq} + +const ( + DefaultCheckpointInterval = 1 * time.Minute + DefaultEtcdWalsKeepNum = 100 + DefaultMinCheckpointWalsNum = 100 +) + +var ( + ErrCompacted = errors.New("required revision has been compacted") + ErrConcurrency = errors.New("wal concurrency error: change groups already updated") +) + +var ( + // Storage paths. Always use path (not filepath) to use the "/" separator + storageDataDir = "data" + + storageWalsDir = "wals" + storageWalsStatusDir = path.Join(storageWalsDir, "status") + storageWalsDataDir = path.Join(storageWalsDir, "data") + + // etcd paths. Always use path (not filepath) to use the "/" separator + etcdWalBaseDir = "datamanager" + etcdWalsDir = path.Join(etcdWalBaseDir, "wals") + etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata") + etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") + etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") + + etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") + etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") + etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") + + etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") + etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") + + etcdPingKey = path.Join(etcdWalBaseDir, "ping") +) + +const ( + etcdChangeGroupMinRevisionRange = 1000 +) + +type DataManagerConfig struct { + BasePath string + E *etcd.Store + OST *objectstorage.ObjStorage + DataTypes []string + EtcdWalsKeepNum int + CheckpointInterval time.Duration + // MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint + MinCheckpointWalsNum int +} + +type DataManager struct { + basePath string + log *zap.SugaredLogger + e *etcd.Store + ost *objectstorage.ObjStorage + changes *WalChanges + dataTypes []string + etcdWalsKeepNum int + checkpointInterval time.Duration + minCheckpointWalsNum int +} + +func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) { + if conf.EtcdWalsKeepNum == 0 { + conf.EtcdWalsKeepNum = DefaultEtcdWalsKeepNum + } + if conf.EtcdWalsKeepNum < 1 { + return nil, errors.New("etcdWalsKeepNum must be greater than 0") + } + if conf.CheckpointInterval == 0 { + conf.CheckpointInterval = DefaultCheckpointInterval + } + if conf.MinCheckpointWalsNum == 0 { + conf.MinCheckpointWalsNum = DefaultMinCheckpointWalsNum + } + if conf.MinCheckpointWalsNum < 1 { + return nil, errors.New("minCheckpointWalsNum must be greater than 0") + } + + d := &DataManager{ + basePath: conf.BasePath, + log: logger.Sugar(), + e: conf.E, + ost: conf.OST, + changes: NewWalChanges(conf.DataTypes), + dataTypes: conf.DataTypes, + etcdWalsKeepNum: conf.EtcdWalsKeepNum, + checkpointInterval: conf.CheckpointInterval, + minCheckpointWalsNum: conf.MinCheckpointWalsNum, + } + + // add trailing slash the basepath + if d.basePath != "" && !strings.HasSuffix(d.basePath, "/") { + d.basePath = d.basePath + "/" + } + + return d, nil +} + +func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error { + for { + err := d.InitEtcd(ctx) + if err == nil { + break + } + d.log.Errorf("failed to initialize etcd: %+v", err) + time.Sleep(1 * time.Second) + } + + readyCh <- struct{}{} + + go d.watcherLoop(ctx) + go d.syncLoop(ctx) + go d.checkpointLoop(ctx) + go d.walCleanerLoop(ctx) + go d.compactChangeGroupsLoop(ctx) + go d.etcdPingerLoop(ctx) + + select { + case <-ctx.Done(): + d.log.Infof("walmanager exiting") + return nil + } +} diff --git a/internal/wal/wal_test.go b/internal/datamanager/datamanager_test.go similarity index 54% rename from internal/wal/wal_test.go rename to internal/datamanager/datamanager_test.go index 8eb5a09..28a6060 100644 --- a/internal/wal/wal_test.go +++ b/internal/datamanager/datamanager_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package datamanager import ( "context" @@ -26,7 +26,6 @@ import ( "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/testutil" - "github.com/google/go-cmp/cmp" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -82,22 +81,24 @@ func TestEtcdReset(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - walConfig := &WalManagerConfig{ + dmConfig := &DataManagerConfig{ BasePath: "basepath", E: tetcd.TestEtcd.Store, OST: objectstorage.NewObjStorage(ost, "/"), EtcdWalsKeepNum: 10, + DataTypes: []string{"datatype01"}, } - wal, err := NewWalManager(ctx, logger, walConfig) - walReadyCh := make(chan struct{}) + dm, err := NewDataManager(ctx, logger, dmConfig) + dmReadyCh := make(chan struct{}) - t.Logf("starting wal") - go wal.Run(ctx, walReadyCh) - <-walReadyCh + t.Logf("starting datamanager") + go dm.Run(ctx, dmReadyCh) + <-dmReadyCh actions := []*Action{ { ActionType: ActionTypePut, + DataType: "datatype01", Data: []byte("{}"), }, } @@ -107,7 +108,7 @@ func TestEtcdReset(t *testing.T) { objectID := fmt.Sprintf("object%02d", i) expectedObjects = append(expectedObjects, objectID) actions[0].ID = objectID - if _, err := wal.WriteWal(ctx, actions, nil); err != nil { + if _, err := dm.WriteWal(ctx, actions, nil); err != nil { t.Fatalf("unexpected err: %v", err) } } @@ -115,7 +116,7 @@ func TestEtcdReset(t *testing.T) { // wait for wal to be committed storage time.Sleep(5 * time.Second) - t.Logf("stopping wal") + t.Logf("stopping datamanager") cancel() t.Logf("stopping etcd") @@ -133,35 +134,29 @@ func TestEtcdReset(t *testing.T) { defer shutdownEtcd(tetcd) ctx, cancel = context.WithCancel(context.Background()) - walConfig = &WalManagerConfig{ + defer cancel() + dmConfig = &DataManagerConfig{ BasePath: "basepath", E: tetcd.TestEtcd.Store, OST: objectstorage.NewObjStorage(ost, "/"), EtcdWalsKeepNum: 10, + DataTypes: []string{"datatype01"}, } - wal, err = NewWalManager(ctx, logger, walConfig) - walReadyCh = make(chan struct{}) + dm, err = NewDataManager(ctx, logger, dmConfig) + dmReadyCh = make(chan struct{}) t.Logf("starting wal") - go wal.Run(ctx, walReadyCh) - <-walReadyCh + go dm.Run(ctx, dmReadyCh) + <-dmReadyCh time.Sleep(5 * time.Second) - curObjects := []string{} - doneCh := make(chan struct{}) - for object := range wal.List("", "", true, doneCh) { - t.Logf("path: %q", object.Path) - if object.Err != nil { - t.Fatalf("unexpected err: %v", object.Err) + for i := 0; i < 20; i++ { + objectID := fmt.Sprintf("object%02d", i) + _, _, err = dm.ReadObject("datatype01", objectID, nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) } - curObjects = append(curObjects, object.Path) - } - close(doneCh) - t.Logf("curObjects: %s", curObjects) - - if diff := cmp.Diff(expectedObjects, curObjects); diff != "" { - t.Error(diff) } } @@ -185,61 +180,63 @@ func TestConcurrentUpdate(t *testing.T) { t.Fatalf("unexpected err: %v", err) } - walConfig := &WalManagerConfig{ + dmConfig := &DataManagerConfig{ E: tetcd.TestEtcd.Store, OST: objectstorage.NewObjStorage(ost, "/"), EtcdWalsKeepNum: 10, + DataTypes: []string{"datatype01"}, } - wal, err := NewWalManager(ctx, logger, walConfig) + dm, err := NewDataManager(ctx, logger, dmConfig) actions := []*Action{ { ActionType: ActionTypePut, - ID: "/object01", + ID: "object01", + DataType: "datatype01", Data: []byte("{}"), }, } - walReadyCh := make(chan struct{}) - go wal.Run(ctx, walReadyCh) - <-walReadyCh + dmReadyCh := make(chan struct{}) + go dm.Run(ctx, dmReadyCh) + <-dmReadyCh time.Sleep(5 * time.Second) cgNames := []string{"changegroup01", "changegroup02"} - cgt, err := wal.GetChangeGroupsUpdateToken(cgNames) + cgt, err := dm.GetChangeGroupsUpdateToken(cgNames) if err != nil { t.Fatalf("unexpected err: %v", err) } // populate with a wal - cgt, err = wal.WriteWal(ctx, actions, cgt) + cgt, err = dm.WriteWal(ctx, actions, cgt) if err != nil { t.Fatalf("unexpected err: %v", err) } // this must work successfully oldcgt := cgt - cgt, err = wal.WriteWal(ctx, actions, cgt) + cgt, err = dm.WriteWal(ctx, actions, cgt) if err != nil { t.Fatalf("unexpected err: %v", err) } // this must fail since we are using the old cgt - _, err = wal.WriteWal(ctx, actions, oldcgt) + _, err = dm.WriteWal(ctx, actions, oldcgt) if err != ErrConcurrency { t.Fatalf("expected err: %v, got %v", ErrConcurrency, err) } oldcgt = cgt // this must work successfully - cgt, err = wal.WriteWal(ctx, actions, cgt) + cgt, err = dm.WriteWal(ctx, actions, cgt) if err != nil { t.Fatalf("unexpected err: %v", err) } // this must fail since we are using the old cgt - _, err = wal.WriteWal(ctx, actions, oldcgt) + _, err = dm.WriteWal(ctx, actions, oldcgt) if err != ErrConcurrency { t.Fatalf("expected err: %v, got %v", ErrConcurrency, err) } @@ -266,39 +263,155 @@ func TestWalCleaner(t *testing.T) { } walKeepNum := 10 - walConfig := &WalManagerConfig{ - E: tetcd.TestEtcd.Store, - OST: objectstorage.NewObjStorage(ost, "/"), - EtcdWalsKeepNum: walKeepNum, + dmConfig := &DataManagerConfig{ + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + EtcdWalsKeepNum: walKeepNum, + DataTypes: []string{"datatype01"}, + MinCheckpointWalsNum: 1, } - wal, err := NewWalManager(ctx, logger, walConfig) + dm, err := NewDataManager(ctx, logger, dmConfig) actions := []*Action{ { ActionType: ActionTypePut, - ID: "/object01", + ID: "object01", + DataType: "datatype01", Data: []byte("{}"), }, } - walReadyCh := make(chan struct{}) - go wal.Run(ctx, walReadyCh) - <-walReadyCh + dmReadyCh := make(chan struct{}) + go dm.Run(ctx, dmReadyCh) + <-dmReadyCh for i := 0; i < 20; i++ { - if _, err := wal.WriteWal(ctx, actions, nil); err != nil { + if _, err := dm.WriteWal(ctx, actions, nil); err != nil { t.Fatalf("unexpected err: %v", err) } } - // wait for walCleaner to complete - time.Sleep(5 * time.Second) + dm.checkpoint(ctx) + dm.walCleaner(ctx) walsCount := 0 - for range wal.ListEtcdWals(ctx, 0) { + for range dm.ListEtcdWals(ctx, 0) { walsCount++ } if walsCount != walKeepNum { t.Fatalf("expected %d wals in etcd, got %d wals", walKeepNum, walsCount) } } + +func TestReadObject(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + ctx := context.Background() + + ostDir, err := ioutil.TempDir(dir, "ost") + ost, err := objectstorage.NewPosixStorage(ostDir) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + dmConfig := &DataManagerConfig{ + E: tetcd.TestEtcd.Store, + OST: objectstorage.NewObjStorage(ost, "/"), + // remove almost all wals to see that they are removed also from changes + EtcdWalsKeepNum: 1, + DataTypes: []string{"datatype01"}, + } + dm, err := NewDataManager(ctx, logger, dmConfig) + + actions := []*Action{} + for i := 0; i < 20; i++ { + actions = append(actions, &Action{ + ActionType: ActionTypePut, + ID: fmt.Sprintf("object%d", i), + DataType: "datatype01", + Data: []byte(fmt.Sprintf(`{ "ID": "%d" }`, i)), + }) + } + + dmReadyCh := make(chan struct{}) + go dm.Run(ctx, dmReadyCh) + <-dmReadyCh + + time.Sleep(5 * time.Second) + + // populate with a wal + _, err = dm.WriteWal(ctx, actions, nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // wait for the event to be read + time.Sleep(500 * time.Millisecond) + // should read it + _, _, err = dm.ReadObject("datatype01", "object1", nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + _, _, err = dm.ReadObject("datatype01", "object19", nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + actions = []*Action{} + for i := 0; i < 10; i++ { + actions = append(actions, &Action{ + ActionType: ActionTypeDelete, + ID: fmt.Sprintf("object%d", i), + DataType: "datatype01", + }) + } + + _, err = dm.WriteWal(ctx, actions, nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // wait for the event to be read + time.Sleep(500 * time.Millisecond) + + // test read from changes (since not checkpoint yet) + + // should not exists + _, _, err = dm.ReadObject("datatype01", "object1", nil) + if err != objectstorage.ErrNotExist { + t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) + } + // should exist + _, _, err = dm.ReadObject("datatype01", "object19", nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + // do a checkpoint and wal clean + dm.checkpoint(ctx) + dm.walCleaner(ctx) + + // wait for the event to be read + time.Sleep(500 * time.Millisecond) + + // test read from data + + // should not exists + _, _, err = dm.ReadObject("datatype01", "object1", nil) + if err != objectstorage.ErrNotExist { + t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err) + } + // should exist + _, _, err = dm.ReadObject("datatype01", "object19", nil) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } +} diff --git a/internal/wal/wal.go b/internal/datamanager/wal.go similarity index 58% rename from internal/wal/wal.go rename to internal/datamanager/wal.go index 3daab37..5692fb2 100644 --- a/internal/wal/wal.go +++ b/internal/datamanager/wal.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package wal +package datamanager import ( "bytes" @@ -35,70 +35,14 @@ import ( "go.etcd.io/etcd/clientv3/concurrency" etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/mvcc/mvccpb" - "go.uber.org/zap" ) -// TODO(sgotti) handle etcd unwanted changes: -// * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it. -// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one - -// Storage paths -// wals/{walSeq} -// -// Etcd paths -// wals/{walSeq} - -const ( - DefaultEtcdWalsKeepNum = 100 -) - -var ( - ErrCompacted = errors.New("required revision has been compacted") - ErrConcurrency = errors.New("wal concurrency error: change groups already updated") -) - -var ( - // Storage paths. Always use path (not filepath) to use the "/" separator - storageObjectsPrefix = "data/" - storageWalsDir = "wals" - storageWalsStatusDir = path.Join(storageWalsDir, "status") - storageWalsDataDir = path.Join(storageWalsDir, "data") - - // etcd paths. Always use path (not filepath) to use the "/" separator - etcdWalBaseDir = "walmanager" - etcdWalsDir = path.Join(etcdWalBaseDir, "wals") - etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata") - etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq") - etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq") - - etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock") - etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock") - etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock") - - etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups") - etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev") - - etcdPingKey = path.Join(etcdWalBaseDir, "ping") -) - -const ( - etcdChangeGroupMinRevisionRange = 1000 -) - -func (w *WalManager) toStorageDataPath(path string) string { - return w.basePath + storageObjectsPrefix + path +func (d *DataManager) storageWalStatusFile(walSeq string) string { + return path.Join(d.basePath, storageWalsStatusDir, walSeq) } -func (w *WalManager) fromStorageDataPath(path string) string { - return strings.TrimPrefix(path, w.basePath+storageObjectsPrefix) -} - -func (w *WalManager) storageWalStatusFile(walSeq string) string { - return path.Join(w.basePath, storageWalsStatusDir, walSeq) -} - -func (w *WalManager) storageWalDataFile(walFileID string) string { - return path.Join(w.basePath, storageWalsDataDir, walFileID) +func (d *DataManager) storageWalDataFile(walFileID string) string { + return path.Join(d.basePath, storageWalsDataDir, walFileID) } func etcdWalKey(walSeq string) string { @@ -122,7 +66,6 @@ type Action struct { type WalHeader struct { WalDataFileID string PreviousWalSequence string - ChangeGroups map[string]int64 } type WalStatus string @@ -146,7 +89,9 @@ type WalData struct { WalStatus WalStatus WalSequence string PreviousWalSequence string - ChangeGroups map[string]int64 + + // internal values not saved + Revision int64 `json:"-"` } type ChangeGroupsUpdateToken struct { @@ -156,85 +101,53 @@ type ChangeGroupsUpdateToken struct { type changeGroupsRevisions map[string]int64 -func (w *WalManager) ChangesCurrentRevision() (int64, error) { - w.changes.Lock() - defer w.changes.Unlock() - if !w.changes.initialized { - return 0, errors.Errorf("wal changes not ready") - } - return w.changes.revision, nil -} - -func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) { - w.changes.Lock() - defer w.changes.Unlock() - if !w.changes.initialized { +func (d *DataManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) { + d.changes.Lock() + defer d.changes.Unlock() + if !d.changes.initialized { return nil, errors.Errorf("wal changes not ready") } - revision := w.changes.curRevision() - cgr := w.changes.getChangeGroups(cgNames) + revision := d.changes.curRevision() + cgr := d.changes.getChangeGroups(cgNames) return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil } -func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateToken) *ChangeGroupsUpdateToken { - mcgt := &ChangeGroupsUpdateToken{ChangeGroupsRevisions: make(changeGroupsRevisions)} - for _, cgt := range cgts { - // keep the lower curRevision - if cgt.CurRevision != 0 && cgt.CurRevision < mcgt.CurRevision { - mcgt.CurRevision = cgt.CurRevision - } - // keep the lower changegroup revision - for cgName, cgRev := range cgt.ChangeGroupsRevisions { - if mr, ok := mcgt.ChangeGroupsRevisions[cgName]; ok { - if cgRev < mr { - mcgt.ChangeGroupsRevisions[cgName] = cgRev - } - } else { - mcgt.ChangeGroupsRevisions[cgName] = cgRev - } - } - } - - return mcgt -} - -func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) { - w.changes.Lock() - if !w.changes.initialized { - w.changes.Unlock() +func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) { + d.changes.Lock() + if !d.changes.initialized { + d.changes.Unlock() return nil, nil, errors.Errorf("wal changes not ready") } - walseq, ok := w.changes.getPut(p) - revision := w.changes.curRevision() - cgr := w.changes.getChangeGroups(cgNames) - actions := w.changes.actions[walseq] - w.changes.Unlock() + walseq, ok := d.changes.getPut(dataType, id) + revision := d.changes.curRevision() + cgr := d.changes.getChangeGroups(cgNames) + actions := d.changes.actions[walseq] + d.changes.Unlock() cgt := &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr} if ok { for _, action := range actions { if action.ActionType == ActionTypePut { - dataPath := w.dataToPathFunc(action.DataType, action.ID) - if dataPath == p { - w.log.Debugf("reading file from wal: %q", dataPath) + if action.DataType == dataType && action.ID == id { + d.log.Debugf("reading datatype %q, id %q from wal: %q", dataType, id) return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil } } } - return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq) + return nil, nil, errors.Errorf("no datatype %q, id %q in wal %s", dataType, id, walseq) } - f, err := w.ost.ReadObject(w.toStorageDataPath(p)) - return f, cgt, err + f, err := d.Read(dataType, id) + return ioutil.NopCloser(f), cgt, err } -func (w *WalManager) changesList(paths []string, prefix, startWith string, recursive bool) []string { +func (d *DataManager) changesList(paths []string, prefix, startWith string, recursive bool) []string { fpaths := []string{} for _, p := range paths { if !recursive && len(p) > len(prefix) { rel := strings.TrimPrefix(p, prefix) - skip := strings.Contains(rel, w.ost.Delimiter()) + skip := strings.Contains(rel, d.ost.Delimiter()) if skip { continue } @@ -247,79 +160,8 @@ func (w *WalManager) changesList(paths []string, prefix, startWith string, recur return fpaths } -func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan objectstorage.ObjectInfo { - objectCh := make(chan objectstorage.ObjectInfo, 1) - - prefix = w.toStorageDataPath(prefix) - startWith = w.toStorageDataPath(startWith) - - w.changes.Lock() - if !w.changes.initialized { - w.changes.Unlock() - objectCh <- objectstorage.ObjectInfo{Err: errors.Errorf("wal changes not ready")} - return objectCh - } - changesList := w.changesList(w.changes.pathsOrdered, prefix, startWith, recursive) - deletedChangesMap := w.changes.getDeletesMap() - w.changes.Unlock() - - ci := 0 - go func(objectCh chan<- objectstorage.ObjectInfo) { - defer close(objectCh) - for object := range w.ost.List(prefix, startWith, recursive, doneCh) { - if object.Err != nil { - objectCh <- object - return - } - object.Path = w.fromStorageDataPath(object.Path) - - for ci < len(changesList) { - p := changesList[ci] - if p < object.Path { - //w.log.Infof("using path from changelist: %q", p) - select { - // Send object content. - case objectCh <- objectstorage.ObjectInfo{Path: p}: - // If receives done from the caller, return here. - case <-doneCh: - return - } - ci++ - } else if p == object.Path { - ci++ - break - } else { - break - } - } - - if _, ok := deletedChangesMap[object.Path]; ok { - continue - } - - //w.log.Infof("using path from objectstorage: %q", object.Path) - select { - // Send object content. - case objectCh <- object: - // If receives done from the caller, return here. - case <-doneCh: - return - } - } - for ci < len(changesList) { - //w.log.Infof("using path from changelist: %q", changesList[ci]) - objectCh <- objectstorage.ObjectInfo{ - Path: changesList[ci], - } - ci++ - } - }(objectCh) - - return objectCh -} - -func (w *WalManager) HasOSTWal(walseq string) (bool, error) { - _, err := w.ost.Stat(w.storageWalStatusFile(walseq) + ".committed") +func (d *DataManager) HasOSTWal(walseq string) (bool, error) { + _, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed") if err == objectstorage.ErrNotExist { return false, nil } @@ -329,12 +171,12 @@ func (w *WalManager) HasOSTWal(walseq string) (bool, error) { return true, nil } -func (w *WalManager) ReadWal(walseq string) (io.ReadCloser, error) { - return w.ost.ReadObject(w.storageWalStatusFile(walseq) + ".committed") +func (d *DataManager) ReadWal(walseq string) (io.ReadCloser, error) { + return d.ost.ReadObject(d.storageWalStatusFile(walseq) + ".committed") } -func (w *WalManager) ReadWalData(walFileID string) (io.ReadCloser, error) { - return w.ost.ReadObject(w.storageWalDataFile(walFileID)) +func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) { + return d.ost.ReadObject(d.storageWalDataFile(walFileID)) } type WalFile struct { @@ -344,7 +186,7 @@ type WalFile struct { Checkpointed bool } -func (w *WalManager) ListOSTWals(start string) <-chan *WalFile { +func (d *DataManager) ListOSTWals(start string) <-chan *WalFile { walCh := make(chan *WalFile, 1) go func() { @@ -355,10 +197,10 @@ func (w *WalManager) ListOSTWals(start string) <-chan *WalFile { curWal := &WalFile{} var startPath string if start != "" { - startPath = w.storageWalStatusFile(start) + startPath = d.storageWalStatusFile(start) } - for object := range w.ost.List(path.Join(w.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { + for object := range d.ost.List(path.Join(d.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) { if object.Err != nil { walCh <- &WalFile{ Err: object.Err, @@ -411,14 +253,14 @@ type ListEtcdWalsElement struct { Err error } -func (w *WalManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement { +func (d *DataManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement { walCh := make(chan *ListEtcdWalsElement, 1) go func() { defer close(walCh) var continuation *etcd.ListPagedContinuation for { - listResp, err := w.e.ListPaged(ctx, etcdWalsDir, revision, 10, continuation) + listResp, err := d.e.ListPaged(ctx, etcdWalsDir, revision, 10, continuation) if err != nil { walCh <- &ListEtcdWalsElement{ Err: err, @@ -448,9 +290,9 @@ func (w *WalManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *L // FirstAvailableWalData returns the first (the one with smaller sequence) wal // and returns it (or nil if not available) and the etcd revision at the time of // the operation -func (w *WalManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error) { +func (d *DataManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error) { // list waldata and just get the first if available - listResp, err := w.e.ListPaged(ctx, etcdWalsDir, 0, 1, nil) + listResp, err := d.e.ListPaged(ctx, etcdWalsDir, 0, 1, nil) if err != nil { return nil, 0, err } @@ -469,8 +311,8 @@ func (w *WalManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64 return walData, revision, nil } -func (w *WalManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) { - resp, err := w.e.Get(ctx, etcdLastCommittedStorageWalSeqKey, 0) +func (d *DataManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) { + resp, err := d.e.Get(ctx, etcdLastCommittedStorageWalSeqKey, 0) if err != nil && err != etcd.ErrKeyNotFound { return "", 0, err } @@ -491,7 +333,7 @@ type WatchElement struct { Err error } -func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement { +func (d *DataManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement { walCh := make(chan *WatchElement, 1) // TODO(sgotti) if the etcd cluster goes down, watch won't return an error but @@ -499,7 +341,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle // is down and report an error so our clients can react (i.e. a readdb could // mark itself as not in sync) wctx := etcdclientv3.WithRequireLeader(ctx) - wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision) + wch := d.e.Watch(wctx, etcdWalBaseDir+"/", revision) go func() { defer close(walCh) @@ -577,21 +419,21 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle // handle possible objectstorage list operation eventual consistency gaps (list // won't report a wal at seq X but a wal at X+n, if this kind of eventual // consistency ever exists) -func (w *WalManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) { - return w.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil) +func (d *DataManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) { + return d.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil) } -func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error) { +func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error) { if len(actions) == 0 { return nil, errors.Errorf("cannot write wal: actions is empty") } - walSequence, err := sequence.IncSequence(ctx, w.e, etcdWalSeqKey) + walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey) if err != nil { return nil, err } - resp, err := w.e.Get(ctx, etcdWalsDataKey, 0) + resp, err := d.e.Get(ctx, etcdWalsDataKey, 0) if err != nil { return nil, err } @@ -603,7 +445,7 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio walsData.Revision = resp.Kvs[0].ModRevision walDataFileID := uuid.NewV4().String() - walDataFilePath := w.storageWalDataFile(walDataFileID) + walDataFilePath := d.storageWalDataFile(walDataFileID) walKey := etcdWalKey(walSequence.String()) var buf bytes.Buffer @@ -616,10 +458,10 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio return nil, err } } - if err := w.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil { + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil { return nil, err } - w.log.Debugf("wrote wal file: %s", walDataFilePath) + d.log.Debugf("wrote wal file: %s", walDataFilePath) walsData.LastCommittedWalSequence = walSequence.String() @@ -673,7 +515,7 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio // This will only succeed if no one else have concurrently updated the walsData // TODO(sgotti) retry if it failed due to concurrency errors - txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...).Else(getWalsData, getWal) + txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...).Else(getWalsData, getWal) tresp, err := txn.Commit() if err != nil { return nil, etcd.FromEtcdError(err) @@ -697,18 +539,18 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio } // try to commit storage right now - if err := w.sync(ctx); err != nil { - w.log.Errorf("wal sync error: %+v", err) + if err := d.sync(ctx); err != nil { + d.log.Errorf("wal sync error: %+v", err) } return ncgt, nil } -func (w *WalManager) syncLoop(ctx context.Context) { +func (d *DataManager) syncLoop(ctx context.Context) { for { - w.log.Debugf("syncer") - if err := w.sync(ctx); err != nil { - w.log.Errorf("syncer error: %+v", err) + d.log.Debugf("syncer") + if err := d.sync(ctx); err != nil { + d.log.Errorf("syncer error: %+v", err) } select { @@ -721,8 +563,8 @@ func (w *WalManager) syncLoop(ctx context.Context) { } } -func (w *WalManager) sync(ctx context.Context) error { - session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) +func (d *DataManager) sync(ctx context.Context) error { + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err } @@ -735,7 +577,7 @@ func (w *WalManager) sync(ctx context.Context) error { } defer m.Unlock(ctx) - resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) + resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { return err } @@ -748,11 +590,10 @@ func (w *WalManager) sync(ctx context.Context) error { // TODO(sgotti) this could be optimized by parallelizing writes of wals that don't have common change groups switch walData.WalStatus { case WalStatusCommitted: - walFilePath := w.storageWalStatusFile(walData.WalSequence) - w.log.Debugf("syncing committed wal to storage") + walFilePath := d.storageWalStatusFile(walData.WalSequence) + d.log.Debugf("syncing committed wal to storage") header := &WalHeader{ WalDataFileID: walData.WalDataFileID, - ChangeGroups: walData.ChangeGroups, PreviousWalSequence: walData.PreviousWalSequence, } headerj, err := json.Marshal(header) @@ -761,11 +602,11 @@ func (w *WalManager) sync(ctx context.Context) error { } walFileCommittedPath := walFilePath + ".committed" - if err := w.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil { + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil { return err } - w.log.Debugf("updating wal to state %q", WalStatusCommittedStorage) + d.log.Debugf("updating wal to state %q", WalStatusCommittedStorage) walData.WalStatus = WalStatusCommittedStorage walDataj, err := json.Marshal(walData) if err != nil { @@ -779,7 +620,7 @@ func (w *WalManager) sync(ctx context.Context) error { then = append(then, etcdclientv3.OpPut(string(etcdLastCommittedStorageWalSeqKey), string(walData.WalSequence))) // This will only succeed if the no one else have concurrently updated the wal keys in etcd - txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) + txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) @@ -788,10 +629,10 @@ func (w *WalManager) sync(ctx context.Context) error { return errors.Errorf("failed to write committedstorage wal: concurrent update") } case WalStatusCheckpointed: - walFilePath := w.storageWalStatusFile(walData.WalSequence) - w.log.Debugf("checkpointing committed wal to storage") + walFilePath := d.storageWalStatusFile(walData.WalSequence) + d.log.Debugf("checkpointing committed wal to storage") walFileCheckpointedPath := walFilePath + ".checkpointed" - if err := w.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil { + if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil { return err } } @@ -799,11 +640,11 @@ func (w *WalManager) sync(ctx context.Context) error { return nil } -func (w *WalManager) checkpointLoop(ctx context.Context) { +func (d *DataManager) checkpointLoop(ctx context.Context) { for { - w.log.Debugf("checkpointer") - if err := w.checkpoint(ctx); err != nil { - w.log.Errorf("checkpoint error: %v", err) + d.log.Debugf("checkpointer") + if err := d.checkpoint(ctx); err != nil { + d.log.Errorf("checkpoint error: %v", err) } select { @@ -812,12 +653,12 @@ func (w *WalManager) checkpointLoop(ctx context.Context) { default: } - time.Sleep(2 * time.Second) + time.Sleep(d.checkpointInterval) } } -func (w *WalManager) checkpoint(ctx context.Context) error { - session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) +func (d *DataManager) checkpoint(ctx context.Context) error { + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err } @@ -830,79 +671,44 @@ func (w *WalManager) checkpoint(ctx context.Context) error { } defer m.Unlock(ctx) - resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) + resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { return err } + walsData := []*WalData{} for _, kv := range resp.Kvs { - var walData WalData + var walData *WalData if err := json.Unmarshal(kv.Value, &walData); err != nil { return err } + walData.Revision = kv.ModRevision + if walData.WalStatus == WalStatusCommitted { - w.log.Warnf("wal %s not yet committed storage", walData.WalSequence) + d.log.Warnf("wal %s not yet committed storage", walData.WalSequence) break } if walData.WalStatus == WalStatusCheckpointed { continue } - walFilePath := w.storageWalDataFile(walData.WalDataFileID) - w.log.Debugf("checkpointing wal: %q", walData.WalSequence) + walsData = append(walsData, walData) + } + if len(walsData) < d.minCheckpointWalsNum { + return nil + } - walFile, err := w.ost.ReadObject(walFilePath) - if err != nil { - return err - } - dec := json.NewDecoder(walFile) - for { - var action *Action + if err := d.writeData(ctx, walsData); err != nil { + return errors.Wrapf(err, "checkpoint function error") + } - err := dec.Decode(&action) - if err == io.EOF { - // all done - break - } - if err != nil { - walFile.Close() - return err - } - - if err := w.checkpointAction(ctx, action); err != nil { - walFile.Close() - return err - } - } - walFile.Close() - - w.log.Debugf("updating wal to state %q", WalStatusCheckpointed) + for _, walData := range walsData { + d.log.Debugf("updating wal to state %q", WalStatusCheckpointed) walData.WalStatus = WalStatusCheckpointed walDataj, err := json.Marshal(walData) if err != nil { return err } - if _, err := w.e.AtomicPut(ctx, string(kv.Key), walDataj, kv.ModRevision, nil); err != nil { - return err - } - } - return nil -} - -func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error { - dataPath := w.dataToPathFunc(action.DataType, action.ID) - if dataPath == "" { - return nil - } - path := w.toStorageDataPath(dataPath) - switch action.ActionType { - case ActionTypePut: - w.log.Debugf("writing file: %q", path) - if err := w.ost.WriteObject(path, bytes.NewReader(action.Data)); err != nil { - return err - } - - case ActionTypeDelete: - w.log.Debugf("deleting file: %q", path) - if err := w.ost.DeleteObject(path); err != nil && err != objectstorage.ErrNotExist { + walKey := etcdWalKey(walData.WalSequence) + if _, err := d.e.AtomicPut(ctx, walKey, walDataj, walData.Revision, nil); err != nil { return err } } @@ -910,11 +716,11 @@ func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error return nil } -func (w *WalManager) walCleanerLoop(ctx context.Context) { +func (d *DataManager) walCleanerLoop(ctx context.Context) { for { - w.log.Debugf("walcleaner") - if err := w.walCleaner(ctx); err != nil { - w.log.Errorf("walcleaner error: %v", err) + d.log.Debugf("walcleaner") + if err := d.walCleaner(ctx); err != nil { + d.log.Errorf("walcleaner error: %v", err) } select { @@ -930,8 +736,8 @@ func (w *WalManager) walCleanerLoop(ctx context.Context) { // walCleaner will clean already checkpointed wals from etcd // it must always keep at least one wal that is needed for resync operations // from clients -func (w *WalManager) walCleaner(ctx context.Context) error { - session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) +func (d *DataManager) walCleaner(ctx context.Context) error { + session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) if err != nil { return err } @@ -944,14 +750,14 @@ func (w *WalManager) walCleaner(ctx context.Context) error { } defer m.Unlock(ctx) - resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0) + resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0) if err != nil { return err } - if len(resp.Kvs) <= w.etcdWalsKeepNum { + if len(resp.Kvs) <= d.etcdWalsKeepNum { return nil } - removeCount := len(resp.Kvs) - w.etcdWalsKeepNum + removeCount := len(resp.Kvs) - d.etcdWalsKeepNum for _, kv := range resp.Kvs { var walData WalData @@ -969,8 +775,8 @@ func (w *WalManager) walCleaner(ctx context.Context) error { // sure that no objects with old data will be returned? Is it enough to read // it back or the result could just be luckily correct but another client may // arrive to a differnt S3 server that is not yet in sync? - w.log.Infof("removing wal %q from etcd", walData.WalSequence) - if _, err := w.e.AtomicDelete(ctx, string(kv.Key), kv.ModRevision); err != nil { + d.log.Infof("removing wal %q from etcd", walData.WalSequence) + if _, err := d.e.AtomicDelete(ctx, string(kv.Key), kv.ModRevision); err != nil { return err } @@ -983,10 +789,10 @@ func (w *WalManager) walCleaner(ctx context.Context) error { return nil } -func (w *WalManager) compactChangeGroupsLoop(ctx context.Context) { +func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) { for { - if err := w.compactChangeGroups(ctx); err != nil { - w.log.Errorf("err: %+v", err) + if err := d.compactChangeGroups(ctx); err != nil { + d.log.Errorf("err: %+v", err) } select { @@ -999,8 +805,8 @@ func (w *WalManager) compactChangeGroupsLoop(ctx context.Context) { } } -func (w *WalManager) compactChangeGroups(ctx context.Context) error { - resp, err := w.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey) +func (d *DataManager) compactChangeGroups(ctx context.Context) error { + resp, err := d.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey) if err != nil { return err } @@ -1010,7 +816,7 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error { // first update minrevision cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "=", revision) then := etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "") - txn := w.e.Client().Txn(ctx).If(cmp).Then(then) + txn := d.e.Client().Txn(ctx).If(cmp).Then(then) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) @@ -1022,7 +828,7 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error { revision = tresp.Header.Revision // then remove all the groups keys with modrevision < minrevision - resp, err = w.e.List(ctx, etcdChangeGroupsDir, "", 0) + resp, err = d.e.List(ctx, etcdChangeGroupsDir, "", 0) if err != nil { return err } @@ -1030,13 +836,13 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error { if kv.ModRevision < revision-etcdChangeGroupMinRevisionRange { cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision) then := etcdclientv3.OpDelete(string(kv.Key)) - txn := w.e.Client().Txn(ctx).If(cmp).Then(then) + txn := d.e.Client().Txn(ctx).If(cmp).Then(then) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) } if !tresp.Succeeded { - w.log.Errorf("failed to update change group min revision key due to concurrent update") + d.log.Errorf("failed to update change group min revision key due to concurrent update") } } } @@ -1050,10 +856,10 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error { // walWrites will fails since the provided changegrouptoken will have an old // revision // TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? -func (w *WalManager) etcdPingerLoop(ctx context.Context) { +func (d *DataManager) etcdPingerLoop(ctx context.Context) { for { - if err := w.etcdPinger(ctx); err != nil { - w.log.Errorf("err: %+v", err) + if err := d.etcdPinger(ctx); err != nil { + d.log.Errorf("err: %+v", err) } select { @@ -1066,17 +872,16 @@ func (w *WalManager) etcdPingerLoop(ctx context.Context) { } } -func (w *WalManager) etcdPinger(ctx context.Context) error { - if _, err := w.e.Put(ctx, etcdPingKey, []byte{}, nil); err != nil { +func (d *DataManager) etcdPinger(ctx context.Context) error { + if _, err := d.e.Put(ctx, etcdPingKey, []byte{}, nil); err != nil { return err } return nil } -func (w *WalManager) InitEtcd(ctx context.Context) error { +func (d *DataManager) InitEtcd(ctx context.Context) error { writeWal := func(wal *WalFile) error { - w.log.Infof("wal seq: %s", wal.WalSequence) - walFile, err := w.ost.ReadObject(w.storageWalStatusFile(wal.WalSequence) + ".committed") + walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed") if err != nil { return err } @@ -1092,7 +897,6 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { WalSequence: wal.WalSequence, WalDataFileID: header.WalDataFileID, WalStatus: WalStatusCommitted, - ChangeGroups: header.ChangeGroups, } if wal.Checkpointed { walData.WalStatus = WalStatusCheckpointed @@ -1107,7 +911,7 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { // only add if it doesn't exist cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalKey(wal.WalSequence)), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalKey(wal.WalSequence), string(walDataj))) - txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) + txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) @@ -1124,12 +928,12 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdChangeGroupMinRevisionKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "")) - txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...) + txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...) if _, err := txn.Commit(); err != nil { return etcd.FromEtcdError(err) } - _, err := w.e.Get(ctx, etcdWalsDataKey, 0) + _, err := d.e.Get(ctx, etcdWalsDataKey, 0) if err != nil && err != etcd.ErrKeyNotFound { return err } @@ -1137,7 +941,7 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { return nil } - w.log.Infof("no data found in etcd, initializing") + d.log.Infof("no data found in etcd, initializing") // walsdata not found in etcd @@ -1148,8 +952,8 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { lastCommittedStorageWalElem := lastCommittedStorageWalsRing lastCommittedStorageWalSequence := "" wroteWals := 0 - for wal := range w.ListOSTWals("") { - w.log.Infof("wal: %s", wal) + for wal := range d.ListOSTWals("") { + d.log.Debugf("wal: %s", wal) if wal.Err != nil { return wal.Err } @@ -1205,7 +1009,7 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence)) - txn = w.e.Client().Txn(ctx).If(cmp...).Then(then...) + txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { return etcd.FromEtcdError(err) @@ -1216,89 +1020,3 @@ func (w *WalManager) InitEtcd(ctx context.Context) error { return nil } - -type CheckpointFunc func(action *Action) error - -type DataToPathFunc func(dataType string, id string) string - -func NoOpDataToPath(dataType string, id string) string { - return "" -} - -type WalManagerConfig struct { - BasePath string - E *etcd.Store - OST *objectstorage.ObjStorage - EtcdWalsKeepNum int - CheckpointFunc CheckpointFunc - DataToPathFunc DataToPathFunc -} - -type WalManager struct { - basePath string - log *zap.SugaredLogger - e *etcd.Store - ost *objectstorage.ObjStorage - changes *WalChanges - etcdWalsKeepNum int - checkpointFunc CheckpointFunc - dataToPathFunc DataToPathFunc -} - -func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) { - if conf.EtcdWalsKeepNum == 0 { - conf.EtcdWalsKeepNum = DefaultEtcdWalsKeepNum - } - if conf.EtcdWalsKeepNum < 1 { - return nil, errors.New("etcdWalsKeepNum must be greater than 0") - } - - dataToPathFunc := conf.DataToPathFunc - if dataToPathFunc == nil { - dataToPathFunc = NoOpDataToPath - } - - w := &WalManager{ - basePath: conf.BasePath, - log: logger.Sugar(), - e: conf.E, - ost: conf.OST, - etcdWalsKeepNum: conf.EtcdWalsKeepNum, - changes: NewWalChanges(), - checkpointFunc: conf.CheckpointFunc, - dataToPathFunc: dataToPathFunc, - } - - // add trailing slash the basepath - if w.basePath != "" && !strings.HasSuffix(w.basePath, "/") { - w.basePath = w.basePath + "/" - } - - return w, nil -} - -func (w *WalManager) Run(ctx context.Context, readyCh chan struct{}) error { - for { - err := w.InitEtcd(ctx) - if err == nil { - break - } - w.log.Errorf("failed to initialize etcd: %+v", err) - time.Sleep(1 * time.Second) - } - - readyCh <- struct{}{} - - go w.watcherLoop(ctx) - go w.syncLoop(ctx) - go w.checkpointLoop(ctx) - go w.walCleanerLoop(ctx) - go w.compactChangeGroupsLoop(ctx) - go w.etcdPingerLoop(ctx) - - select { - case <-ctx.Done(): - w.log.Infof("walmanager exiting") - return nil - } -} diff --git a/internal/objectstorage/objectstorage.go b/internal/objectstorage/objectstorage.go index 8e34370..69ebd5b 100644 --- a/internal/objectstorage/objectstorage.go +++ b/internal/objectstorage/objectstorage.go @@ -26,9 +26,15 @@ import ( var ErrNotExist = errors.New("does not exist") +type ReadSeekCloser interface { + io.Reader + io.Seeker + io.Closer +} + type Storage interface { Stat(filepath string) (*ObjectInfo, error) - ReadObject(filepath string) (io.ReadCloser, error) + ReadObject(filepath string) (ReadSeekCloser, error) WriteObject(filepath string, data io.Reader) error DeleteObject(filepath string) error List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo diff --git a/internal/objectstorage/posix.go b/internal/objectstorage/posix.go index 36a1baf..aa51be1 100644 --- a/internal/objectstorage/posix.go +++ b/internal/objectstorage/posix.go @@ -251,7 +251,7 @@ func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) { return &ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil } -func (s *PosixStorage) ReadObject(p string) (io.ReadCloser, error) { +func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) { fspath, err := s.fsPath(p) if err != nil { return nil, err diff --git a/internal/objectstorage/s3.go b/internal/objectstorage/s3.go index 5ffdf5c..1be8f81 100644 --- a/internal/objectstorage/s3.go +++ b/internal/objectstorage/s3.go @@ -73,7 +73,7 @@ func (s *S3Storage) Stat(p string) (*ObjectInfo, error) { return &ObjectInfo{Path: p, LastModified: oi.LastModified}, nil } -func (s *S3Storage) ReadObject(filepath string) (io.ReadCloser, error) { +func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) { if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil { merr := minio.ToErrorResponse(err) if merr.StatusCode == http.StatusNotFound { diff --git a/internal/services/configstore/command/command.go b/internal/services/configstore/command/command.go index 2ffaf2e..905e8d3 100644 --- a/internal/services/configstore/command/command.go +++ b/internal/services/configstore/command/command.go @@ -19,11 +19,11 @@ import ( "encoding/json" "path" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/services/configstore/readdb" "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" @@ -33,14 +33,14 @@ import ( type CommandHandler struct { log *zap.SugaredLogger readDB *readdb.ReadDB - wal *wal.WalManager + dm *datamanager.DataManager } -func NewCommandHandler(logger *zap.Logger, readDB *readdb.ReadDB, wal *wal.WalManager) *CommandHandler { +func NewCommandHandler(logger *zap.Logger, readDB *readdb.ReadDB, dm *datamanager.DataManager) *CommandHandler { return &CommandHandler{ log: logger.Sugar(), readDB: readDB, - wal: wal, + dm: dm, } } @@ -52,7 +52,7 @@ func (s *CommandHandler) CreateProjectGroup(ctx context.Context, projectGroup *t return nil, util.NewErrBadRequest(errors.Errorf("project group parent id required")) } - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -106,16 +106,16 @@ func (s *CommandHandler) CreateProjectGroup(ctx context.Context, projectGroup *t if err != nil { return nil, errors.Wrapf(err, "failed to marshal projectGroup") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeProjectGroup), ID: projectGroup.ID, Data: pcj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return projectGroup, err } @@ -127,7 +127,7 @@ func (s *CommandHandler) CreateProject(ctx context.Context, project *types.Proje return nil, util.NewErrBadRequest(errors.Errorf("project parent id required")) } - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -182,23 +182,23 @@ func (s *CommandHandler) CreateProject(ctx context.Context, project *types.Proje if err != nil { return nil, errors.Wrapf(err, "failed to marshal project") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeProject), ID: project.ID, Data: pcj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return project, err } func (s *CommandHandler) DeleteProject(ctx context.Context, projectRef string) error { var project *types.Project - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -230,15 +230,15 @@ func (s *CommandHandler) DeleteProject(ctx context.Context, projectRef string) e } // TODO(sgotti) delete project secrets/variables - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypeDelete, + ActionType: datamanager.ActionTypeDelete, DataType: string(types.ConfigTypeProject), ID: project.ID, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } @@ -253,7 +253,7 @@ func (s *CommandHandler) CreateUser(ctx context.Context, req *CreateUserRequest) return nil, util.NewErrBadRequest(errors.Errorf("user name required")) } - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{req.UserName} var rs *types.RemoteSource @@ -335,29 +335,29 @@ func (s *CommandHandler) CreateUser(ctx context.Context, req *CreateUserRequest) return nil, errors.Wrapf(err, "failed to marshal project group") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeUser), ID: user.ID, Data: userj, }, { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeProjectGroup), ID: pg.ID, Data: pgj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return user, err } func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error { var user *types.User - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{user.UserName} // must do all the check in a single transaction to avoid concurrent changes @@ -382,9 +382,9 @@ func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error return err } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypeDelete, + ActionType: datamanager.ActionTypeDelete, DataType: string(types.ConfigTypeUser), ID: user.ID, }, @@ -392,7 +392,7 @@ func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error // changegroup is the username (and in future the email) to ensure no // concurrent user creation/modification using the same name - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } @@ -417,7 +417,7 @@ func (s *CommandHandler) CreateUserLA(ctx context.Context, req *CreateUserLARequ var user *types.User var rs *types.RemoteSource - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -477,16 +477,16 @@ func (s *CommandHandler) CreateUserLA(ctx context.Context, req *CreateUserLARequ if err != nil { return nil, errors.Wrapf(err, "failed to marshal user") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeUser), ID: user.ID, Data: userj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return la, err } @@ -500,7 +500,7 @@ func (s *CommandHandler) DeleteUserLA(ctx context.Context, userName, laID string var user *types.User - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -536,16 +536,16 @@ func (s *CommandHandler) DeleteUserLA(ctx context.Context, userName, laID string if err != nil { return errors.Wrapf(err, "failed to marshal user") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeUser), ID: user.ID, Data: userj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } @@ -567,7 +567,7 @@ func (s *CommandHandler) UpdateUserLA(ctx context.Context, req *UpdateUserLARequ var user *types.User var rs *types.RemoteSource - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -616,16 +616,16 @@ func (s *CommandHandler) UpdateUserLA(ctx context.Context, req *UpdateUserLARequ if err != nil { return nil, errors.Wrapf(err, "failed to marshal user") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeUser), ID: user.ID, Data: userj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return la, err } @@ -636,7 +636,7 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam var user *types.User - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -677,9 +677,9 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam if err != nil { return "", errors.Wrapf(err, "failed to marshal user") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeUser), ID: user.ID, Data: userj, @@ -687,7 +687,7 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam } // changegroup is the userid - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return token, err } @@ -701,7 +701,7 @@ func (s *CommandHandler) DeleteUserToken(ctx context.Context, userName, tokenNam var user *types.User - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -737,16 +737,16 @@ func (s *CommandHandler) DeleteUserToken(ctx context.Context, userName, tokenNam if err != nil { return errors.Wrapf(err, "failed to marshal user") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeUser), ID: user.ID, Data: userj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } @@ -755,7 +755,7 @@ func (s *CommandHandler) CreateRemoteSource(ctx context.Context, remoteSource *t return nil, util.NewErrBadRequest(errors.Errorf("remotesource name required")) } - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{remoteSource.Name} // must do all the check in a single transaction to avoid concurrent changes @@ -786,23 +786,23 @@ func (s *CommandHandler) CreateRemoteSource(ctx context.Context, remoteSource *t if err != nil { return nil, errors.Wrapf(err, "failed to marshal remotesource") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeRemoteSource), ID: remoteSource.ID, Data: rsj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return remoteSource, err } func (s *CommandHandler) DeleteRemoteSource(ctx context.Context, remoteSourceName string) error { var remoteSource *types.RemoteSource - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{remoteSource.ID} // must do all the check in a single transaction to avoid concurrent changes @@ -827,16 +827,16 @@ func (s *CommandHandler) DeleteRemoteSource(ctx context.Context, remoteSourceNam return err } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypeDelete, + ActionType: datamanager.ActionTypeDelete, DataType: string(types.ConfigTypeRemoteSource), ID: remoteSource.ID, }, } // changegroup is all the remote sources - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } @@ -845,7 +845,7 @@ func (s *CommandHandler) CreateOrg(ctx context.Context, org *types.Organization) return nil, util.NewErrBadRequest(errors.Errorf("org name required")) } - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{org.Name} // must do all the check in a single transaction to avoid concurrent changes @@ -887,22 +887,22 @@ func (s *CommandHandler) CreateOrg(ctx context.Context, org *types.Organization) if err != nil { return nil, errors.Wrapf(err, "failed to marshal project group") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeOrg), ID: org.ID, Data: orgj, }, { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeProjectGroup), ID: pg.ID, Data: pgj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return org, err } @@ -910,7 +910,7 @@ func (s *CommandHandler) DeleteOrg(ctx context.Context, orgName string) error { var org *types.Organization var projects []*types.Project - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{orgName} // must do all the check in a single transaction to avoid concurrent changes @@ -936,23 +936,23 @@ func (s *CommandHandler) DeleteOrg(ctx context.Context, orgName string) error { return err } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypeDelete, + ActionType: datamanager.ActionTypeDelete, DataType: string(types.ConfigTypeOrg), ID: org.ID, }, } // delete all org projects for _, project := range projects { - actions = append(actions, &wal.Action{ - ActionType: wal.ActionTypeDelete, + actions = append(actions, &datamanager.Action{ + ActionType: datamanager.ActionTypeDelete, DataType: string(types.ConfigTypeProject), ID: project.ID, }) } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } @@ -979,7 +979,7 @@ func (s *CommandHandler) CreateSecret(ctx context.Context, secret *types.Secret) return nil, util.NewErrBadRequest(errors.Errorf("invalid secret parent type %q", secret.Parent.Type)) } - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{secret.Name} // must do all the check in a single transaction to avoid concurrent changes @@ -1017,23 +1017,23 @@ func (s *CommandHandler) CreateSecret(ctx context.Context, secret *types.Secret) if err != nil { return nil, errors.Wrapf(err, "failed to marshal secret") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeSecret), ID: secret.ID, Data: secretj, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return secret, err } func (s *CommandHandler) DeleteSecret(ctx context.Context, parentType types.ConfigType, parentRef, secretName string) error { var secret *types.Secret - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -1064,15 +1064,15 @@ func (s *CommandHandler) DeleteSecret(ctx context.Context, parentType types.Conf return err } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypeDelete, + ActionType: datamanager.ActionTypeDelete, DataType: string(types.ConfigTypeSecret), ID: secret.ID, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } @@ -1093,7 +1093,7 @@ func (s *CommandHandler) CreateVariable(ctx context.Context, variable *types.Var return nil, util.NewErrBadRequest(errors.Errorf("invalid variable parent type %q", variable.Parent.Type)) } - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken cgNames := []string{variable.Name} // must do all the check in a single transaction to avoid concurrent changes @@ -1131,23 +1131,23 @@ func (s *CommandHandler) CreateVariable(ctx context.Context, variable *types.Var if err != nil { return nil, errors.Wrapf(err, "failed to marshal variable") } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypePut, + ActionType: datamanager.ActionTypePut, DataType: string(types.ConfigTypeVariable), ID: variable.ID, Data: variablej, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return variable, err } func (s *CommandHandler) DeleteVariable(ctx context.Context, parentType types.ConfigType, parentRef, variableName string) error { var variable *types.Variable - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken // must do all the check in a single transaction to avoid concurrent changes err := s.readDB.Do(func(tx *db.Tx) error { @@ -1177,14 +1177,14 @@ func (s *CommandHandler) DeleteVariable(ctx context.Context, parentType types.Co return err } - actions := []*wal.Action{ + actions := []*datamanager.Action{ { - ActionType: wal.ActionTypeDelete, + ActionType: datamanager.ActionTypeDelete, DataType: string(types.ConfigTypeVariable), ID: variable.ID, }, } - _, err = s.wal.WriteWal(ctx, actions, cgt) + _, err = s.dm.WriteWal(ctx, actions, cgt) return err } diff --git a/internal/services/configstore/common/common.go b/internal/services/configstore/common/common.go index 24fe2d7..fc75b52 100644 --- a/internal/services/configstore/common/common.go +++ b/internal/services/configstore/common/common.go @@ -15,103 +15,14 @@ package common import ( - "fmt" "net/url" - "path" "strings" - - "github.com/sorintlab/agola/internal/services/types" -) - -var ( - // Storage paths. Always use path (not filepath) to use the "/" separator - StorageDataDir = "data" - StorageUsersDir = path.Join(StorageDataDir, "users") - StorageOrgsDir = path.Join(StorageDataDir, "orgs") - StorageProjectsDir = path.Join(StorageDataDir, "projects") - StorageProjectGroupsDir = path.Join(StorageDataDir, "projectgroups") - StorageRemoteSourcesDir = path.Join(StorageDataDir, "remotesources") - StorageSecretsDir = path.Join(StorageDataDir, "secrets") - StorageVariablesDir = path.Join(StorageDataDir, "variables") ) const ( etcdWalsMinRevisionRange = 100 ) -func StorageUserFile(userID string) string { - return path.Join(StorageUsersDir, userID) -} - -func StorageOrgFile(orgID string) string { - return path.Join(StorageOrgsDir, orgID) -} - -func StorageProjectGroupFile(projectGroupID string) string { - return path.Join(StorageProjectGroupsDir, projectGroupID) -} - -func StorageProjectFile(projectID string) string { - return path.Join(StorageProjectsDir, projectID) -} - -func StorageRemoteSourceFile(userID string) string { - return path.Join(StorageRemoteSourcesDir, userID) -} - -func StorageSecretFile(secretID string) string { - return path.Join(StorageSecretsDir, secretID) -} - -func StorageVariableFile(variableID string) string { - return path.Join(StorageVariablesDir, variableID) -} - -func PathToTypeID(p string) (types.ConfigType, string) { - var configType types.ConfigType - switch path.Dir(p) { - case StorageUsersDir: - configType = types.ConfigTypeUser - case StorageOrgsDir: - configType = types.ConfigTypeOrg - case StorageProjectGroupsDir: - configType = types.ConfigTypeProjectGroup - case StorageProjectsDir: - configType = types.ConfigTypeProject - case StorageRemoteSourcesDir: - configType = types.ConfigTypeRemoteSource - case StorageSecretsDir: - configType = types.ConfigTypeSecret - case StorageVariablesDir: - configType = types.ConfigTypeVariable - default: - panic(fmt.Errorf("cannot determine configtype for path: %q", p)) - } - - return configType, path.Base(p) -} - -func DataToPathFunc(dataType string, id string) string { - switch types.ConfigType(dataType) { - case types.ConfigTypeUser: - return StorageUserFile(id) - case types.ConfigTypeOrg: - return StorageOrgFile(id) - case types.ConfigTypeProjectGroup: - return StorageProjectGroupFile(id) - case types.ConfigTypeProject: - return StorageProjectFile(id) - case types.ConfigTypeRemoteSource: - return StorageRemoteSourceFile(id) - case types.ConfigTypeSecret: - return StorageSecretFile(id) - case types.ConfigTypeVariable: - return StorageVariableFile(id) - } - - panic(fmt.Errorf("unknown data type %q", dataType)) -} - type RefType int const ( diff --git a/internal/services/configstore/configstore.go b/internal/services/configstore/configstore.go index 997f733..34de300 100644 --- a/internal/services/configstore/configstore.go +++ b/internal/services/configstore/configstore.go @@ -21,16 +21,16 @@ import ( "path/filepath" scommon "github.com/sorintlab/agola/internal/common" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/configstore/api" "github.com/sorintlab/agola/internal/services/configstore/command" - "github.com/sorintlab/agola/internal/services/configstore/common" "github.com/sorintlab/agola/internal/services/configstore/readdb" + "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" ghandlers "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -45,7 +45,7 @@ var log = logger.Sugar() type ConfigStore struct { c *config.ConfigStore e *etcd.Store - wal *wal.WalManager + dm *datamanager.DataManager readDB *readdb.ReadDB ost *objectstorage.ObjStorage ch *command.CommandHandler @@ -72,24 +72,32 @@ func NewConfigStore(ctx context.Context, c *config.ConfigStore) (*ConfigStore, e ost: ost, } - walConf := &wal.WalManagerConfig{ - E: e, - OST: ost, - DataToPathFunc: common.DataToPathFunc, + dmConf := &datamanager.DataManagerConfig{ + E: e, + OST: ost, + DataTypes: []string{ + string(types.ConfigTypeUser), + string(types.ConfigTypeOrg), + string(types.ConfigTypeProjectGroup), + string(types.ConfigTypeProject), + string(types.ConfigTypeRemoteSource), + string(types.ConfigTypeSecret), + string(types.ConfigTypeVariable), + }, } - wal, err := wal.NewWalManager(ctx, logger, walConf) + dm, err := datamanager.NewDataManager(ctx, logger, dmConf) if err != nil { return nil, err } - readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, wal) + readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm) if err != nil { return nil, err } - cs.wal = wal + cs.dm = dm cs.readDB = readDB - ch := command.NewCommandHandler(logger, readDB, wal) + ch := command.NewCommandHandler(logger, readDB, dm) cs.ch = ch return cs, nil @@ -97,12 +105,12 @@ func NewConfigStore(ctx context.Context, c *config.ConfigStore) (*ConfigStore, e func (s *ConfigStore) Run(ctx context.Context) error { errCh := make(chan error) - walReadyCh := make(chan struct{}) + dmReadyCh := make(chan struct{}) - go func() { errCh <- s.wal.Run(ctx, walReadyCh) }() + go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() - // wait for wal to be ready - <-walReadyCh + // wait for dm to be ready + <-dmReadyCh go func() { errCh <- s.readDB.Run(ctx) }() diff --git a/internal/services/configstore/readdb/readdb.go b/internal/services/configstore/readdb/readdb.go index 1161dd4..192d2ff 100644 --- a/internal/services/configstore/readdb/readdb.go +++ b/internal/services/configstore/readdb/readdb.go @@ -19,20 +19,18 @@ import ( "database/sql" "encoding/json" "io" - "io/ioutil" "os" "path/filepath" "sync" "time" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/sequence" - "github.com/sorintlab/agola/internal/services/configstore/common" "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" sq "github.com/Masterminds/squirrel" "github.com/pkg/errors" @@ -59,13 +57,13 @@ type ReadDB struct { e *etcd.Store rdb *db.DB ost *objectstorage.ObjStorage - wal *wal.WalManager + dm *datamanager.DataManager Initialized bool initMutex sync.Mutex } -func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) { +func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) { if err := os.MkdirAll(dataDir, 0770); err != nil { return nil, err } @@ -85,7 +83,7 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. rdb: rdb, e: e, ost: ost, - wal: wal, + dm: dm, } return readDB, nil @@ -125,65 +123,44 @@ func (r *ReadDB) ResetDB() error { return nil } -func (r *ReadDB) SyncFromFiles() (string, error) { - doneCh := make(chan struct{}) - defer close(doneCh) - - var lastCheckpointedWal string - // Get last checkpointed wal from lts - for wal := range r.wal.ListOSTWals("") { - if wal.Err != nil { - return "", wal.Err +func (r *ReadDB) SyncFromDump() (string, error) { + dumpIndex, err := r.dm.GetLastDataStatus() + if err != nil && err != objectstorage.ErrNotExist { + return "", errors.WithStack(err) + } + if err == objectstorage.ErrNotExist { + return "", nil + } + for dataType, files := range dumpIndex.Files { + dumpf, err := r.ost.ReadObject(files[0]) + if err != nil { + return "", errors.WithStack(err) } - if wal.Checkpointed { - lastCheckpointedWal = wal.WalSequence + dumpEntries := []*datamanager.DataEntry{} + dec := json.NewDecoder(dumpf) + for { + var de *datamanager.DataEntry + + err := dec.Decode(&de) + if err == io.EOF { + // all done + break + } + if err != nil { + dumpf.Close() + return "", err + } + dumpEntries = append(dumpEntries, de) } - } + dumpf.Close() - doneCh = make(chan struct{}) - haveConfigFiles := false - for object := range r.wal.List(common.StorageDataDir, "", true, doneCh) { - if object.Err != nil { - close(doneCh) - return "", object.Err - } - - haveConfigFiles = true - break - } - close(doneCh) - - if lastCheckpointedWal == "" && haveConfigFiles { - return "", errors.Errorf("no last checkpointed wal in lts but the storage has config files. This should never happen!") - } - - if !haveConfigFiles { - return lastCheckpointedWal, nil - } - - insertfunc := func(objs []string) error { - err := r.rdb.Do(func(tx *db.Tx) error { - for _, obj := range objs { - f, _, err := r.wal.ReadObject(obj, nil) - if err != nil { - if err == objectstorage.ErrNotExist { - r.log.Warnf("object %s disappeared, ignoring", obj) - } - return err - } - data, err := ioutil.ReadAll(f) - if err != nil { - f.Close() - return err - } - f.Close() - - configType, id := common.PathToTypeID(obj) - action := &wal.Action{ - ActionType: wal.ActionTypePut, - DataType: string(configType), - ID: id, - Data: data, + err = r.rdb.Do(func(tx *db.Tx) error { + for _, de := range dumpEntries { + action := &datamanager.Action{ + ActionType: datamanager.ActionTypePut, + ID: de.ID, + DataType: dataType, + Data: de.Data, } if err := r.applyAction(tx, action); err != nil { return err @@ -191,59 +168,24 @@ func (r *ReadDB) SyncFromFiles() (string, error) { } return nil }) - return err - } - - objs := []string{} - count := 0 - doneCh = make(chan struct{}) - defer close(doneCh) - - // file may have changed in the meantime (due to checkpointing) but we don't - // need to have a consistent snapshot since we'll apply all the wals and handle - // them - for object := range r.wal.List(common.StorageDataDir, "", true, doneCh) { - if object.Err != nil { - return "", object.Err - } - - objs = append(objs, object.Path) - - if count > 100 { - if err := insertfunc(objs); err != nil { - return "", err - } - count = 0 - objs = []string{} - } else { - count++ + if err != nil { + return "", err } } - if err := insertfunc(objs); err != nil { - return "", err - } - // save the wal sequence of the last checkpointed wal before syncing from files - err := r.rdb.Do(func(tx *db.Tx) error { - return r.insertCommittedWalSequence(tx, lastCheckpointedWal) - }) - if err != nil { - return "", err - } - - return lastCheckpointedWal, nil + return dumpIndex.WalSequence, nil } func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { - insertfunc := func(walFiles []*wal.WalFile) error { + insertfunc := func(walFiles []*datamanager.WalFile) error { err := r.rdb.Do(func(tx *db.Tx) error { for _, walFile := range walFiles { - walFilef, err := r.wal.ReadWal(walFile.WalSequence) + walFilef, err := r.dm.ReadWal(walFile.WalSequence) if err != nil { return err } dec := json.NewDecoder(walFilef) - var header *wal.WalHeader + var header *datamanager.WalHeader if err = dec.Decode(&header); err != nil && err != io.EOF { walFilef.Close() return err @@ -262,13 +204,13 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { } lastWalSeq := startWalSeq - walFiles := []*wal.WalFile{} + walFiles := []*datamanager.WalFile{} count := 0 doneCh := make(chan struct{}) defer close(doneCh) - for walFile := range r.wal.ListOSTWals(startWalSeq) { + for walFile := range r.dm.ListOSTWals(startWalSeq) { if walFile.Err != nil { return "", walFile.Err } @@ -281,7 +223,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { return "", err } count = 0 - walFiles = []*wal.WalFile{} + walFiles = []*datamanager.WalFile{} } else { count++ } @@ -308,7 +250,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { return err } - lastCommittedStorageWal, _, err := r.wal.LastCommittedStorageWal(ctx) + lastCommittedStorageWal, _, err := r.dm.LastCommittedStorageWal(ctx) if err != nil { return err } @@ -318,7 +260,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { doFullSync = true r.log.Warn("no startWalSeq in db, doing a full sync") } else { - ok, err := r.wal.HasOSTWal(curWalSeq) + ok, err := r.dm.HasOSTWal(curWalSeq) if err != nil { return err } @@ -349,15 +291,15 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { } if doFullSync { - r.log.Infof("doing a full sync from lts files") + r.log.Infof("doing a full sync from dump") if err := r.ResetDB(); err != nil { return err } var err error - curWalSeq, err = r.SyncFromFiles() + curWalSeq, err = r.SyncFromDump() if err != nil { - return err + return errors.WithStack(err) } } @@ -377,7 +319,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { // from wals on objectstorage is >= // if not (this happens when syncFromWals takes some time and in the meantime // many new wals are written, the next sync should be faster and able to continue - firstAvailableWalData, revision, err := r.wal.FirstAvailableWalData(ctx) + firstAvailableWalData, revision, err := r.dm.FirstAvailableWalData(ctx) if err != nil { return errors.Wrap(err, "failed to get first available wal data") } @@ -401,14 +343,14 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { } // use the same revision as previous operation - for walElement := range r.wal.ListEtcdWals(ctx, revision) { + for walElement := range r.dm.ListEtcdWals(ctx, revision) { if walElement.Err != nil { return err } if walElement.WalData.WalSequence <= curWalSeq { continue } - //if walElement.WalData.WalStatus == wal.WalStatusCommittedStorage { + //if walElement.WalData.WalStatus == datamanager.WalStatusCommittedStorage { if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil { return err @@ -416,7 +358,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error { //} //// update readdb only when the wal has been committed to objectstorage - //if walElement.WalData.WalStatus != wal.WalStatusCommittedStorage { + //if walElement.WalData.WalStatus != datamanager.WalStatusCommittedStorage { // return nil //} @@ -494,12 +436,12 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { wctx, cancel := context.WithCancel(ctx) defer cancel() r.log.Infof("revision: %d", revision) - wch := r.wal.Watch(wctx, revision+1) + wch := r.dm.Watch(wctx, revision+1) for we := range wch { r.log.Debugf("we: %s", util.Dump(we)) if we.Err != nil { err := we.Err - if err == wal.ErrCompacted { + if err == datamanager.ErrCompacted { r.log.Warnf("required events already compacted, reinitializing readdb") r.Initialized = false return nil @@ -558,7 +500,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error { return nil } -func (r *ReadDB) handleEvent(tx *db.Tx, we *wal.WatchElement) error { +func (r *ReadDB) handleEvent(tx *db.Tx, we *datamanager.WatchElement) error { //r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) //key := string(ev.Kv.Key) @@ -568,7 +510,7 @@ func (r *ReadDB) handleEvent(tx *db.Tx, we *wal.WatchElement) error { return nil } -func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error { +func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error { // update readdb only when the wal has been committed to objectstorage //if we.WalData.WalStatus != wal.WalStatusCommittedStorage { // return nil @@ -594,7 +536,7 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error { } func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { - walFile, err := r.wal.ReadWalData(walDataFileID) + walFile, err := r.dm.ReadWalData(walDataFileID) if err != nil { return errors.Wrapf(err, "cannot read wal data file %q", walDataFileID) } @@ -602,7 +544,7 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { dec := json.NewDecoder(walFile) for { - var action *wal.Action + var action *datamanager.Action err := dec.Decode(&action) if err == io.EOF { @@ -621,9 +563,9 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { return nil } -func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { +func (r *ReadDB) applyAction(tx *db.Tx, action *datamanager.Action) error { switch action.ActionType { - case wal.ActionTypePut: + case datamanager.ActionTypePut: switch types.ConfigType(action.DataType) { case types.ConfigTypeUser: if err := r.insertUser(tx, action.Data); err != nil { @@ -655,7 +597,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { } } - case wal.ActionTypeDelete: + case datamanager.ActionTypeDelete: switch types.ConfigType(action.DataType) { case types.ConfigTypeUser: r.log.Debugf("deleting user with id: %s", action.ID) @@ -799,7 +741,7 @@ func (r *ReadDB) insertChangeGroupRevision(tx *db.Tx, changegroup string, revisi return nil } -func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*wal.ChangeGroupsUpdateToken, error) { +func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*datamanager.ChangeGroupsUpdateToken, error) { s := changegrouprevisionSelect.Where(sq.Eq{"id": groups}) q, args, err := s.ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) @@ -823,7 +765,7 @@ func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*wal.C } } - return &wal.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil + return &datamanager.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil } func fetchChangeGroupsRevision(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) { diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/scheduler/api/api.go index c636001..5db400c 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/scheduler/api/api.go @@ -32,7 +32,7 @@ import ( "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" + "github.com/sorintlab/agola/internal/datamanager" "github.com/gorilla/mux" "github.com/pkg/errors" @@ -102,15 +102,15 @@ type LogsHandler struct { log *zap.SugaredLogger e *etcd.Store ost *objectstorage.ObjStorage - wal *wal.WalManager + dm *datamanager.DataManager } -func NewLogsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) *LogsHandler { +func NewLogsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *LogsHandler { return &LogsHandler{ log: logger.Sugar(), e: e, ost: ost, - wal: wal, + dm: dm, } } @@ -178,7 +178,7 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) { - r, err := store.GetRunEtcdOrOST(ctx, h.e, h.wal, runID) + r, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, runID) if err != nil { return err, true } @@ -340,15 +340,15 @@ type RunResponse struct { type RunHandler struct { log *zap.SugaredLogger e *etcd.Store - wal *wal.WalManager + dm *datamanager.DataManager readDB *readdb.ReadDB } -func NewRunHandler(logger *zap.Logger, e *etcd.Store, wal *wal.WalManager, readDB *readdb.ReadDB) *RunHandler { +func NewRunHandler(logger *zap.Logger, e *etcd.Store, dm *datamanager.DataManager, readDB *readdb.ReadDB) *RunHandler { return &RunHandler{ log: logger.Sugar(), e: e, - wal: wal, + dm: dm, readDB: readDB, } } @@ -364,7 +364,7 @@ func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } if run == nil { - run, err = store.OSTGetRun(h.wal, runID) + run, err = store.OSTGetRun(h.dm, runID) if err != nil && err != objectstorage.ErrNotExist { http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -375,7 +375,7 @@ func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - rc, err := store.OSTGetRunConfig(h.wal, run.ID) + rc, err := store.OSTGetRunConfig(h.dm, run.ID) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/internal/services/runservice/scheduler/command/command.go b/internal/services/runservice/scheduler/command/command.go index 347f1ec..e8fe910 100644 --- a/internal/services/runservice/scheduler/command/command.go +++ b/internal/services/runservice/scheduler/command/command.go @@ -20,6 +20,7 @@ import ( "path" "time" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" @@ -30,7 +31,6 @@ import ( "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" "github.com/pkg/errors" "go.uber.org/zap" @@ -41,16 +41,16 @@ type CommandHandler struct { e *etcd.Store readDB *readdb.ReadDB ost *objectstorage.ObjStorage - wal *wal.WalManager + dm *datamanager.DataManager } -func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler { +func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *CommandHandler { return &CommandHandler{ log: logger.Sugar(), e: e, readDB: readDB, ost: ost, - wal: wal, + dm: dm, } } @@ -218,12 +218,12 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest) // fetch the existing runconfig and run s.log.Infof("creating run from existing run") - rc, err := store.OSTGetRunConfig(s.wal, req.RunID) + rc, err := store.OSTGetRunConfig(s.dm, req.RunID) if err != nil { return nil, util.NewErrBadRequest(errors.Wrapf(err, "runconfig %q doens't exist", req.RunID)) } - run, err := store.GetRunEtcdOrOST(ctx, s.e, s.wal, req.RunID) + run, err := store.GetRunEtcdOrOST(ctx, s.e, s.dm, req.RunID) if err != nil { return nil, err } @@ -379,7 +379,7 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg run.EnqueueTime = util.TimePtr(time.Now()) - actions := []*wal.Action{} + actions := []*datamanager.Action{} // persist group counter rca, err := store.OSTUpdateRunCounterAction(ctx, c, run.Group) @@ -395,7 +395,7 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg } actions = append(actions, rca) - if _, err = s.wal.WriteWal(ctx, actions, cgt); err != nil { + if _, err = s.dm.WriteWal(ctx, actions, cgt); err != nil { return err } @@ -531,7 +531,7 @@ func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string) return nil } -func (s *CommandHandler) getRunCounter(group string) (uint64, *wal.ChangeGroupsUpdateToken, error) { +func (s *CommandHandler) getRunCounter(group string) (uint64, *datamanager.ChangeGroupsUpdateToken, error) { // use the first group dir after the root pl := util.PathList(group) if len(pl) < 2 { @@ -539,7 +539,7 @@ func (s *CommandHandler) getRunCounter(group string) (uint64, *wal.ChangeGroupsU } var c uint64 - var cgt *wal.ChangeGroupsUpdateToken + var cgt *datamanager.ChangeGroupsUpdateToken err := s.readDB.Do(func(tx *db.Tx) error { var err error c, err = s.readDB.GetRunCounterOST(tx, pl[1]) diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/scheduler/common/common.go index 01fca49..cc19844 100644 --- a/internal/services/runservice/scheduler/common/common.go +++ b/internal/services/runservice/scheduler/common/common.go @@ -15,7 +15,6 @@ package common import ( - "fmt" "path" ) @@ -75,18 +74,6 @@ const ( etcdWalsMinRevisionRange = 100 ) -func StorageRunFile(runID string) string { - return path.Join(StorageRunsDir, runID) -} - -func StorageRunConfigFile(runID string) string { - return path.Join(StorageRunsConfigDir, runID) -} - -func StorageRunCounterFile(group string) string { - return path.Join(StorageCountersDir, group) -} - type DataType string const ( @@ -94,16 +81,3 @@ const ( DataTypeRunConfig DataType = "runconfig" DataTypeRunCounter DataType = "runcounter" ) - -func DataToPathFunc(dataType string, id string) string { - switch DataType(dataType) { - case DataTypeRun: - return StorageRunFile(id) - case DataTypeRunConfig: - return StorageRunConfigFile(id) - case DataTypeRunCounter: - return StorageRunCounterFile(id) - } - - panic(fmt.Errorf("unknown data type %q", dataType)) -} diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/scheduler/readdb/readdb.go index 6a007fa..6dd6c02 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/scheduler/readdb/readdb.go @@ -28,6 +28,7 @@ import ( "sync" "time" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" @@ -36,7 +37,6 @@ import ( "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" "go.uber.org/zap" sq "github.com/Masterminds/squirrel" @@ -94,7 +94,7 @@ type ReadDB struct { e *etcd.Store rdb *db.DB ost *objectstorage.ObjStorage - wal *wal.WalManager + dm *datamanager.DataManager Initialized bool initLock sync.Mutex @@ -108,7 +108,7 @@ type ReadDB struct { dbWriteLock sync.Mutex } -func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) { +func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) { if err := os.MkdirAll(dataDir, 0770); err != nil { return nil, err } @@ -127,7 +127,7 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd. e: e, dataDir: dataDir, ost: ost, - wal: wal, + dm: dm, rdb: rdb, } @@ -451,7 +451,7 @@ func (r *ReadDB) handleRunEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdcl // TODO(sgotti) this is here just to avoid a window where the run is not in // run table and in the run_os table but should be changed/removed when we'll // implement run removal - run, err := store.OSTGetRun(r.wal, runID) + run, err := store.OSTGetRun(r.dm, runID) if err != nil { return err } @@ -516,7 +516,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { return err } - lastCommittedStorageWal, _, err := r.wal.LastCommittedStorageWal(ctx) + lastCommittedStorageWal, _, err := r.dm.LastCommittedStorageWal(ctx) if err != nil { return err } @@ -526,7 +526,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { doFullSync = true r.log.Warn("no startWalSeq in db, doing a full sync") } else { - ok, err := r.wal.HasOSTWal(curWalSeq) + ok, err := r.dm.HasOSTWal(curWalSeq) if err != nil { return err } @@ -557,7 +557,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { } if doFullSync { - r.log.Infof("doing a full sync from objectstorage files") + r.log.Infof("doing a full sync from dump") if err := r.ResetDB(); err != nil { return err } @@ -585,7 +585,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { // from wals on objectstorage is >= // if not (this happens when syncFromWals takes some time and in the meantime // many new wals are written, the next sync should be faster and able to continue - firstAvailableWalData, revision, err := r.wal.FirstAvailableWalData(ctx) + firstAvailableWalData, revision, err := r.dm.FirstAvailableWalData(ctx) if err != nil { return errors.Wrap(err, "failed to get first available wal data") } @@ -609,7 +609,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { } // use the same revision as previous operation - for walElement := range r.wal.ListEtcdWals(ctx, revision) { + for walElement := range r.dm.ListEtcdWals(ctx, revision) { if walElement.Err != nil { return err } @@ -634,134 +634,68 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error { } func (r *ReadDB) SyncFromDump() (string, error) { - type indexHeader struct { - LastWalSequence string + dumpIndex, err := r.dm.GetLastDataStatus() + if err != nil && err != objectstorage.ErrNotExist { + return "", errors.WithStack(err) } - type indexData struct { - DataType string - Data json.RawMessage + if err == objectstorage.ErrNotExist { + return "", nil } + for dataType, files := range dumpIndex.Files { + dumpf, err := r.ost.ReadObject(files[0]) + if err != nil { + return "", errors.WithStack(err) + } + dumpEntries := []*datamanager.DataEntry{} + dec := json.NewDecoder(dumpf) + for { + var de *datamanager.DataEntry - type indexDataRun struct { - ID string - Phase types.RunPhase - Group string - } + err := dec.Decode(&de) + if err == io.EOF { + // all done + break + } + if err != nil { + dumpf.Close() + return "", err + } + dumpEntries = append(dumpEntries, de) + } + dumpf.Close() - type indexDataRunCounter struct { - Group string - Counter uint64 - } - - var iheader *indexHeader - insertfunc := func(ids []*indexData) error { - err := r.rdb.Do(func(tx *db.Tx) error { - for _, id := range ids { - switch common.DataType(id.DataType) { - case common.DataTypeRun: - var ir *indexDataRun - if err := json.Unmarshal(id.Data, &ir); err != nil { - return err - } - run := &types.Run{ - ID: ir.ID, - Group: ir.Group, - Phase: ir.Phase, - } - r.log.Infof("inserting run %q", run.ID) - if err := r.insertRunOST(tx, run, []byte{}); err != nil { - return err - } - case common.DataTypeRunCounter: - var irc *indexDataRunCounter - if err := json.Unmarshal(id.Data, &irc); err != nil { - return err - } - r.log.Infof("inserting run counter %q, c: %d", irc.Group, irc.Counter) - if err := r.insertRunCounterOST(tx, irc.Group, irc.Counter); err != nil { - return err - } + err = r.rdb.Do(func(tx *db.Tx) error { + for _, de := range dumpEntries { + action := &datamanager.Action{ + ActionType: datamanager.ActionTypePut, + ID: de.ID, + DataType: dataType, + Data: de.Data, + } + if err := r.applyAction(tx, action); err != nil { + return err } } return nil }) - return err - } - - doneCh := make(chan struct{}) - defer close(doneCh) - - // get last dump - var dumpPath string - for object := range r.ost.List(path.Join(common.StorageRunsIndexesDir)+"/", "", true, doneCh) { - if object.Err != nil { - return "", object.Err - } - r.log.Infof("path: %s", object.Path) - - dumpPath = object.Path - } - if dumpPath == "" { - return "", nil - } - - f, err := r.ost.ReadObject(dumpPath) - if err != nil { - if err == objectstorage.ErrNotExist { - r.log.Warnf("object %s disappeared, ignoring", dumpPath) - } - return "", err - } - defer f.Close() - - dec := json.NewDecoder(f) - - if err := dec.Decode(&iheader); err != nil { - return "", err - } - count := 0 - ids := make([]*indexData, 0, paginationSize) - for { - var id *indexData - - err := dec.Decode(&id) - if err == io.EOF { - // all done - break - } if err != nil { - f.Close() return "", err } - ids = append(ids, id) - - if count > paginationSize { - if err := insertfunc(ids); err != nil { - return "", err - } - count = 0 - ids = make([]*indexData, 0, paginationSize) - } else { - count++ - } - } - if err := insertfunc(ids); err != nil { - return "", err } - return iheader.LastWalSequence, nil + return dumpIndex.WalSequence, nil } func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { - insertfunc := func(walFiles []*wal.WalFile) error { + insertfunc := func(walFiles []*datamanager.WalFile) error { err := r.rdb.Do(func(tx *db.Tx) error { for _, walFile := range walFiles { - walFilef, err := r.wal.ReadWal(walFile.WalSequence) + walFilef, err := r.dm.ReadWal(walFile.WalSequence) if err != nil { return err } dec := json.NewDecoder(walFilef) - var header *wal.WalHeader + var header *datamanager.WalHeader if err = dec.Decode(&header); err != nil && err != io.EOF { walFilef.Close() return err @@ -780,13 +714,13 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { } lastWalSeq := startWalSeq - walFiles := []*wal.WalFile{} + walFiles := []*datamanager.WalFile{} count := 0 doneCh := make(chan struct{}) defer close(doneCh) - for walFile := range r.wal.ListOSTWals(startWalSeq) { + for walFile := range r.dm.ListOSTWals(startWalSeq) { if walFile.Err != nil { return "", walFile.Err } @@ -799,7 +733,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) { return "", err } count = 0 - walFiles = []*wal.WalFile{} + walFiles = []*datamanager.WalFile{} } else { count++ } @@ -831,12 +765,12 @@ func (r *ReadDB) handleEventsOST(ctx context.Context) error { wctx, cancel := context.WithCancel(ctx) defer cancel() r.log.Infof("revision: %d", revision) - wch := r.wal.Watch(wctx, revision+1) + wch := r.dm.Watch(wctx, revision+1) for we := range wch { r.log.Debugf("we: %s", util.Dump(we)) if we.Err != nil { err := we.Err - if err == wal.ErrCompacted { + if err == datamanager.ErrCompacted { r.log.Warnf("required events already compacted, reinitializing readdb") r.Initialized = false return nil @@ -897,7 +831,7 @@ func (r *ReadDB) handleEventsOST(ctx context.Context) error { } func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { - walFile, err := r.wal.ReadWalData(walDataFileID) + walFile, err := r.dm.ReadWalData(walDataFileID) if err != nil { return errors.Wrapf(err, "cannot read wal data file %q", walDataFileID) } @@ -905,7 +839,7 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { dec := json.NewDecoder(walFile) for { - var action *wal.Action + var action *datamanager.Action err := dec.Decode(&action) if err == io.EOF { @@ -924,10 +858,10 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error { return nil } -func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { +func (r *ReadDB) applyAction(tx *db.Tx, action *datamanager.Action) error { r.log.Infof("action: dataType: %s, ID: %s", action.DataType, action.ID) switch action.ActionType { - case wal.ActionTypePut: + case datamanager.ActionTypePut: switch action.DataType { case string(common.DataTypeRun): var run *types.Run @@ -948,7 +882,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { } } - case wal.ActionTypeDelete: + case datamanager.ActionTypeDelete: switch action.DataType { case string(common.DataTypeRun): case string(common.DataTypeRunCounter): @@ -958,7 +892,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error { return nil } -func (r *ReadDB) handleEventOST(tx *db.Tx, we *wal.WatchElement) error { +func (r *ReadDB) handleEventOST(tx *db.Tx, we *datamanager.WatchElement) error { //r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) //key := string(ev.Kv.Key) @@ -968,7 +902,7 @@ func (r *ReadDB) handleEventOST(tx *db.Tx, we *wal.WatchElement) error { return nil } -func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error { +func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error { for cgName, cgRev := range we.ChangeGroupsRevisions { if err := r.insertChangeGroupRevisionOST(tx, cgName, cgRev); err != nil { return err @@ -977,7 +911,7 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error { if we.WalData != nil { // update readdb only when the wal has been committed to objectstorage - if we.WalData.WalStatus != wal.WalStatusCommitted { + if we.WalData.WalStatus != datamanager.WalStatusCommitted { return nil } @@ -1252,7 +1186,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [ } // get run from objectstorage - run, err := store.OSTGetRun(r.wal, runID) + run, err := store.OSTGetRun(r.dm, runID) if err != nil { return nil, errors.WithStack(err) } @@ -1498,7 +1432,7 @@ func (r *ReadDB) insertChangeGroupRevisionOST(tx *db.Tx, changegroup string, rev return nil } -func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*wal.ChangeGroupsUpdateToken, error) { +func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*datamanager.ChangeGroupsUpdateToken, error) { s := changegrouprevisionOSTSelect.Where(sq.Eq{"id": groups}) q, args, err := s.ToSql() r.log.Debugf("q: %s, args: %s", q, util.Dump(args)) @@ -1522,7 +1456,7 @@ func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*wa } } - return &wal.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil + return &datamanager.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil } func fetchChangeGroupsRevisionOST(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) { diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler/scheduler.go index 5b1af8f..969b488 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler/scheduler.go @@ -22,13 +22,11 @@ import ( "fmt" "net/http" "os" - "path" "path/filepath" - "strconv" "time" scommon "github.com/sorintlab/agola/internal/common" - "github.com/sorintlab/agola/internal/db" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/objectstorage" @@ -41,7 +39,6 @@ import ( "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" ghandlers "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -613,7 +610,7 @@ func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.Exec if err != nil { return err } - rc, err := store.OSTGetRunConfig(s.wal, r.ID) + rc, err := store.OSTGetRunConfig(s.dm, r.ID) if err != nil { return errors.Wrapf(err, "cannot get run config %q", r.ID) } @@ -1163,7 +1160,7 @@ func (s *Scheduler) runsScheduler(ctx context.Context) error { func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { log.Debugf("runScheduler") - rc, err := store.OSTGetRunConfig(s.wal, r.ID) + rc, err := store.OSTGetRunConfig(s.dm, r.ID) if err != nil { return errors.Wrapf(err, "cannot get run config %q", r.ID) } @@ -1271,9 +1268,9 @@ func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { return err } - actions := append([]*wal.Action{ra}) + actions := append([]*datamanager.Action{ra}) - if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil { + if _, err = s.dm.WriteWal(ctx, actions, nil); err != nil { return err } @@ -1285,197 +1282,6 @@ func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { return nil } -func (s *Scheduler) dumpOSTLoop(ctx context.Context) { - for { - log.Debugf("objectstorage dump loop") - - // TODO(sgotti) create new dump only after N files - if err := s.dumpOST(ctx); err != nil { - log.Errorf("err: %+v", err) - } - - select { - case <-ctx.Done(): - return - default: - } - - time.Sleep(10 * time.Second) - } -} - -func (s *Scheduler) dumpOST(ctx context.Context) error { - type indexHeader struct { - LastWalSequence string - } - type indexData struct { - DataType string - Data interface{} - } - - type indexDataRun struct { - ID string - Group string - Phase types.RunPhase - } - type indexDataRunCounter struct { - Group string - Counter uint64 - } - - indexDir := strconv.FormatInt(time.Now().UnixNano(), 10) - - var lastWalSequence string - err := s.readDB.Do(func(tx *db.Tx) error { - var err error - lastWalSequence, err = s.readDB.GetCommittedWalSequenceOST(tx) - return err - }) - if err != nil { - return err - } - - data := []byte{} - iheader := &indexHeader{LastWalSequence: lastWalSequence} - ihj, err := json.Marshal(iheader) - if err != nil { - return err - } - data = append(data, ihj...) - - var lastRunID string - stop := false - for { - err := s.readDB.Do(func(tx *db.Tx) error { - var err error - lruns, err := s.readDB.GetRunsFilteredOST(tx, nil, false, nil, lastRunID, 1000, types.SortOrderDesc) - if err != nil { - return err - } - if len(lruns) == 0 { - stop = true - } else { - lastRunID = lruns[len(lruns)-1].ID - } - for _, run := range lruns { - id := &indexData{DataType: string(common.DataTypeRun), Data: indexDataRun{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)}} - idj, err := json.Marshal(id) - if err != nil { - return err - } - data = append(data, idj...) - } - return nil - }) - if err != nil { - return err - } - if stop { - break - } - } - - var lastGroup string - stop = false - for { - err := s.readDB.Do(func(tx *db.Tx) error { - var err error - counters, err := s.readDB.GetRunCountersOST(tx, lastGroup, 1000) - if err != nil { - return err - } - if len(counters) == 0 { - stop = true - } else { - lastGroup = counters[len(counters)-1].Group - } - for _, counter := range counters { - id := &indexData{DataType: string(common.DataTypeRunCounter), Data: indexDataRunCounter{Group: counter.Group, Counter: counter.Counter}} - idj, err := json.Marshal(id) - if err != nil { - return err - } - data = append(data, idj...) - } - return nil - }) - if err != nil { - return err - } - if stop { - break - } - } - - index := path.Join(common.StorageRunsIndexesDir, indexDir, "all") - - if err = s.ost.WriteObject(index, bytes.NewReader(data)); err != nil { - return err - } - - return nil -} - -func (s *Scheduler) dumpOSTCleanerLoop(ctx context.Context) { - for { - log.Infof("objectstorage dump cleaner loop") - - if err := s.dumpOSTCleaner(ctx); err != nil { - log.Errorf("err: %+v", err) - } - - select { - case <-ctx.Done(): - return - default: - } - - time.Sleep(10 * time.Second) - } -} - -func (s *Scheduler) dumpOSTCleaner(ctx context.Context) error { - type indexData struct { - ID string - Group string - Phase types.RunPhase - } - - // collect all old indexes - objects := []string{} - doneCh := make(chan struct{}) - defer close(doneCh) - var indexPath string - for object := range s.ost.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) { - if object.Err != nil { - return object.Err - } - - h := util.PathList(object.Path) - if len(h) < 2 { - return errors.Errorf("wrong index dir path %q", object.Path) - } - curIndexPath := object.Path - if curIndexPath > indexPath { - if indexPath != "" { - objects = append(objects, indexPath) - } - indexPath = curIndexPath - } else { - objects = append(objects, curIndexPath) - } - } - - for _, object := range objects { - if err := s.ost.DeleteObject(object); err != nil { - log.Errorf("object: %s, err: %v", object, err) - return err - } - } - - return nil -} - func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) { for { if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil { @@ -1561,7 +1367,7 @@ type Scheduler struct { c *config.RunServiceScheduler e *etcd.Store ost *objectstorage.ObjStorage - wal *wal.WalManager + dm *datamanager.DataManager readDB *readdb.ReadDB ch *command.CommandHandler } @@ -1586,24 +1392,28 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule ost: ost, } - walConf := &wal.WalManagerConfig{ - E: e, - OST: ost, - DataToPathFunc: common.DataToPathFunc, + dmConf := &datamanager.DataManagerConfig{ + E: e, + OST: ost, + DataTypes: []string{ + string(common.DataTypeRun), + string(common.DataTypeRunConfig), + string(common.DataTypeRunCounter), + }, } - wal, err := wal.NewWalManager(ctx, logger, walConf) + dm, err := datamanager.NewDataManager(ctx, logger, dmConf) if err != nil { return nil, err } - s.wal = wal + s.dm = dm - readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, wal) + readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm) if err != nil { return nil, err } s.readDB = readDB - ch := command.NewCommandHandler(logger, e, readDB, ost, wal) + ch := command.NewCommandHandler(logger, e, readDB, ost, dm) s.ch = ch return s, nil @@ -1626,12 +1436,12 @@ func (s *Scheduler) InitEtcd(ctx context.Context) error { func (s *Scheduler) Run(ctx context.Context) error { errCh := make(chan error) - walReadyCh := make(chan struct{}) + dmReadyCh := make(chan struct{}) - go func() { errCh <- s.wal.Run(ctx, walReadyCh) }() + go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() - // wait for wal to be ready - <-walReadyCh + // wait for dm to be ready + <-dmReadyCh for { err := s.InitEtcd(ctx) @@ -1668,9 +1478,9 @@ func (s *Scheduler) Run(ctx context.Context) error { // api from clients executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch) - logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.wal) + logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm) - runHandler := api.NewRunHandler(logger, s.e, s.wal, s.readDB) + runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB) runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ch) runsHandler := api.NewRunsHandler(logger, s.readDB) runActionsHandler := api.NewRunActionsHandler(logger, s.ch) @@ -1714,8 +1524,6 @@ func (s *Scheduler) Run(ctx context.Context) error { go s.runTasksUpdaterLoop(ctx) go s.fetcherLoop(ctx) go s.finishedRunsArchiverLoop(ctx) - go s.dumpOSTLoop(ctx) - go s.dumpOSTCleanerLoop(ctx) go s.compactChangeGroupsLoop(ctx) go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) go s.executorTaskUpdateHandler(ctx, ch) diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/scheduler/store/store.go index 25e46c9..8de0a32 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/scheduler/store/store.go @@ -22,12 +22,12 @@ import ( "reflect" "strings" + "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - "github.com/sorintlab/agola/internal/wal" "github.com/pkg/errors" etcdclientv3 "go.etcd.io/etcd/clientv3" @@ -85,16 +85,7 @@ func OSTSubGroupTypes(group string) []string { return sg } -func OSTRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string { - paths := []string{} - subGroups := OSTSubGroups(group) - for _, subGroup := range subGroups { - paths = append(paths, common.StorageRunCounterFile(subGroup)) - } - return paths -} - -func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) { +func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*datamanager.Action, error) { // use the first group dir after the root pl := util.PathList(group) if len(pl) < 2 { @@ -106,8 +97,8 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa return nil, err } - action := &wal.Action{ - ActionType: wal.ActionTypePut, + action := &datamanager.Action{ + ActionType: datamanager.ActionTypePut, DataType: string(common.DataTypeRunCounter), ID: pl[1], Data: cj, @@ -145,9 +136,8 @@ func OSTCacheKey(p string) string { return strings.TrimSuffix(base, path.Ext(base)) } -func OSTGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, error) { - runConfigPath := common.StorageRunConfigFile(runConfigID) - rcf, _, err := wal.ReadObject(runConfigPath, nil) +func OSTGetRunConfig(dm *datamanager.DataManager, runConfigID string) (*types.RunConfig, error) { + rcf, _, err := dm.ReadObject(string(common.DataTypeRunConfig), runConfigID, nil) if err != nil { return nil, err } @@ -161,14 +151,14 @@ func OSTGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, return rc, nil } -func OSTSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) { +func OSTSaveRunConfigAction(rc *types.RunConfig) (*datamanager.Action, error) { rcj, err := json.Marshal(rc) if err != nil { return nil, err } - action := &wal.Action{ - ActionType: wal.ActionTypePut, + action := &datamanager.Action{ + ActionType: datamanager.ActionTypePut, DataType: string(common.DataTypeRunConfig), ID: rc.ID, Data: rcj, @@ -177,10 +167,8 @@ func OSTSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) { return action, nil } -func OSTGetRun(wal *wal.WalManager, runID string) (*types.Run, error) { - runPath := common.StorageRunFile(runID) - rf, _, err := wal.ReadObject(runPath, nil) - +func OSTGetRun(dm *datamanager.DataManager, runID string) (*types.Run, error) { + rf, _, err := dm.ReadObject(string(common.DataTypeRun), runID, nil) if err != nil { return nil, err } @@ -194,14 +182,14 @@ func OSTGetRun(wal *wal.WalManager, runID string) (*types.Run, error) { return r, nil } -func OSTSaveRunAction(r *types.Run) (*wal.Action, error) { +func OSTSaveRunAction(r *types.Run) (*datamanager.Action, error) { rj, err := json.Marshal(r) if err != nil { return nil, err } - action := &wal.Action{ - ActionType: wal.ActionTypePut, + action := &datamanager.Action{ + ActionType: datamanager.ActionTypePut, DataType: string(common.DataTypeRun), ID: r.ID, Data: rj, @@ -501,13 +489,13 @@ func GetRuns(ctx context.Context, e *etcd.Store) ([]*types.Run, error) { return runs, nil } -func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, wal *wal.WalManager, runID string) (*types.Run, error) { +func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, dm *datamanager.DataManager, runID string) (*types.Run, error) { r, _, err := GetRun(ctx, e, runID) if err != nil && err != etcd.ErrKeyNotFound { return nil, err } if r == nil { - r, err = OSTGetRun(wal, runID) + r, err = OSTGetRun(dm, runID) if err != nil && err != objectstorage.ErrNotExist { return nil, err }