wal: update and rename to datamanager

* Rename to datamanager since it handles a complete "database" backed by an
objectstorage and etcd

* Don't write every single entry as a single file but group them in a single
file. In future improve this to split the data in multiple files of a max size.
This commit is contained in:
Simone Gotti 2019-04-26 16:00:03 +02:00
parent 41e333d7ec
commit 2c3e6bf9e4
18 changed files with 1172 additions and 1305 deletions

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
package datamanager
import (
"context"
@ -20,7 +20,6 @@ import (
"fmt"
"io"
"path"
"sort"
"strings"
"sync"
"time"
@ -32,11 +31,12 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"
)
// TODO(sgotti) rewrite this to use a sqlite local cache
type WalChanges struct {
actions map[string][]*Action
puts map[string]string
deletes map[string]string
pathsOrdered []string
puts map[string]map[string]string // map[dataType]map[id]
deletes map[string]map[string]string
walSeq string
revision int64
changeGroupsRevisions changeGroupsRevisions
@ -44,13 +44,20 @@ type WalChanges struct {
sync.Mutex
}
func NewWalChanges() *WalChanges {
return &WalChanges{
func NewWalChanges(dataTypes []string) *WalChanges {
changes := &WalChanges{
actions: make(map[string][]*Action),
puts: make(map[string]string),
deletes: make(map[string]string),
puts: make(map[string]map[string]string),
deletes: make(map[string]map[string]string),
changeGroupsRevisions: make(changeGroupsRevisions),
}
for _, dataType := range dataTypes {
changes.puts[dataType] = make(map[string]string)
changes.deletes[dataType] = make(map[string]string)
}
return changes
}
func (c *WalChanges) String() string {
@ -69,8 +76,8 @@ func (c *WalChanges) curWalSeq() string {
return c.walSeq
}
func (c *WalChanges) getPut(p string) (string, bool) {
walseq, ok := c.puts[p]
func (c *WalChanges) getPut(dataType, id string) (string, bool) {
walseq, ok := c.puts[dataType][id]
return walseq, ok
}
@ -87,30 +94,30 @@ func (c *WalChanges) getDelete(p string) bool {
return ok
}
func (c *WalChanges) addPut(p, walseq string, revision int64) {
delete(c.deletes, p)
c.puts[p] = walseq
func (c *WalChanges) addPut(dataType, id, walseq string, revision int64) {
delete(c.deletes[dataType], id)
c.puts[dataType][id] = walseq
c.walSeq = walseq
c.revision = revision
}
func (c *WalChanges) removePut(p string, revision int64) {
delete(c.puts, p)
func (c *WalChanges) removePut(dataType, id string, revision int64) {
delete(c.puts[dataType], id)
c.revision = revision
}
func (c *WalChanges) addDelete(p, walseq string, revision int64) {
delete(c.puts, p)
c.deletes[p] = walseq
func (c *WalChanges) addDelete(dataType, id, walseq string, revision int64) {
delete(c.puts[dataType], id)
c.deletes[dataType][id] = walseq
c.walSeq = walseq
c.revision = revision
}
func (c *WalChanges) removeDelete(p string, revision int64) {
delete(c.deletes, p)
func (c *WalChanges) removeDelete(dataType, id string, revision int64) {
delete(c.deletes[dataType], id)
c.revision = revision
}
@ -137,28 +144,18 @@ func (c *WalChanges) removeChangeGroup(cgName string) {
delete(c.changeGroupsRevisions, cgName)
}
func (c *WalChanges) updatePathsOrdered() {
c.pathsOrdered = make([]string, len(c.puts))
i := 0
for p := range c.puts {
c.pathsOrdered[i] = p
i++
}
sort.Sort(sort.StringSlice(c.pathsOrdered))
}
func (d *DataManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error {
walDataFilePath := d.storageWalDataFile(walData.WalDataFileID)
func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error {
walDataFilePath := w.storageWalDataFile(walData.WalDataFileID)
walDataFile, err := w.ost.ReadObject(walDataFilePath)
walDataFile, err := d.ost.ReadObject(walDataFilePath)
if err != nil {
return errors.Wrapf(err, "failed to read waldata %q", walDataFilePath)
}
defer walDataFile.Close()
dec := json.NewDecoder(walDataFile)
w.changes.Lock()
defer w.changes.Unlock()
d.changes.Lock()
defer d.changes.Unlock()
for {
var action *Action
@ -171,48 +168,42 @@ func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revi
return errors.Wrapf(err, "failed to decode wal file")
}
w.applyWalChangesAction(ctx, action, walData.WalSequence, revision)
d.applyWalChangesAction(ctx, action, walData.WalSequence, revision)
}
w.changes.updatePathsOrdered()
return nil
}
func (w *WalManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) {
dataPath := w.dataToPathFunc(action.DataType, action.ID)
if dataPath == "" {
return
}
func (d *DataManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) {
switch action.ActionType {
case ActionTypePut:
w.changes.addPut(dataPath, walSequence, revision)
d.changes.addPut(action.DataType, action.ID, walSequence, revision)
case ActionTypeDelete:
w.changes.addDelete(dataPath, walSequence, revision)
d.changes.addDelete(action.DataType, action.ID, walSequence, revision)
}
if w.changes.actions[walSequence] == nil {
w.changes.actions[walSequence] = []*Action{}
if d.changes.actions[walSequence] == nil {
d.changes.actions[walSequence] = []*Action{}
}
w.changes.actions[walSequence] = append(w.changes.actions[walSequence], action)
d.changes.actions[walSequence] = append(d.changes.actions[walSequence], action)
}
func (w *WalManager) watcherLoop(ctx context.Context) error {
func (d *DataManager) watcherLoop(ctx context.Context) error {
for {
initialized := w.changes.initialized
initialized := d.changes.initialized
if !initialized {
if err := w.initializeChanges(ctx); err != nil {
w.log.Errorf("watcher err: %+v", err)
if err := d.initializeChanges(ctx); err != nil {
d.log.Errorf("watcher err: %+v", err)
}
} else {
if err := w.watcher(ctx); err != nil {
w.log.Errorf("watcher err: %+v", err)
if err := d.watcher(ctx); err != nil {
d.log.Errorf("watcher err: %+v", err)
}
}
select {
case <-ctx.Done():
w.log.Infof("watcher exiting")
d.log.Infof("watcher exiting")
return nil
default:
}
@ -221,11 +212,11 @@ func (w *WalManager) watcherLoop(ctx context.Context) error {
}
}
func (w *WalManager) initializeChanges(ctx context.Context) error {
func (d *DataManager) initializeChanges(ctx context.Context) error {
var revision int64
var continuation *etcd.ListPagedContinuation
for {
listResp, err := w.e.ListPaged(ctx, etcdWalsDir+"/", 0, 10, continuation)
listResp, err := d.e.ListPaged(ctx, etcdWalsDir+"/", 0, 10, continuation)
if err != nil {
return err
}
@ -239,7 +230,7 @@ func (w *WalManager) initializeChanges(ctx context.Context) error {
if err := json.Unmarshal(kv.Value, &walData); err != nil {
return err
}
if err := w.applyWalChanges(ctx, walData, revision); err != nil {
if err := d.applyWalChanges(ctx, walData, revision); err != nil {
return err
}
}
@ -251,7 +242,7 @@ func (w *WalManager) initializeChanges(ctx context.Context) error {
continuation = nil
// use the same revision
for {
listResp, err := w.e.ListPaged(ctx, etcdChangeGroupsDir+"/", 0, 10, continuation)
listResp, err := d.e.ListPaged(ctx, etcdChangeGroupsDir+"/", 0, 10, continuation)
if err != nil {
return err
}
@ -259,40 +250,40 @@ func (w *WalManager) initializeChanges(ctx context.Context) error {
continuation = listResp.Continuation
for _, kv := range resp.Kvs {
w.changes.Lock()
d.changes.Lock()
changeGroup := path.Base(string(kv.Key))
w.changes.putChangeGroup(changeGroup, kv.ModRevision)
w.changes.Unlock()
d.changes.putChangeGroup(changeGroup, kv.ModRevision)
d.changes.Unlock()
}
if !listResp.HasMore {
break
}
}
w.changes.Lock()
w.changes.revision = revision
w.changes.initialized = true
w.changes.Unlock()
d.changes.Lock()
d.changes.revision = revision
d.changes.initialized = true
d.changes.Unlock()
return nil
}
func (w *WalManager) watcher(ctx context.Context) error {
w.changes.Lock()
revision := w.changes.curRevision()
w.changes.Unlock()
func (d *DataManager) watcher(ctx context.Context) error {
d.changes.Lock()
revision := d.changes.curRevision()
d.changes.Unlock()
wctx, cancel := context.WithCancel(ctx)
defer cancel()
wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision+1)
wch := d.e.Watch(wctx, etcdWalBaseDir+"/", revision+1)
for wresp := range wch {
if wresp.Canceled {
err := wresp.Err()
if err == etcdclientv3rpc.ErrCompacted {
w.log.Errorf("required events already compacted, reinitializing watcher changes")
w.changes.Lock()
w.changes.initialized = false
w.changes.Unlock()
d.log.Errorf("required events already compacted, reinitializing watcher changes")
d.changes.Lock()
d.changes.initialized = false
d.changes.Unlock()
}
return errors.Wrapf(err, "watch error")
}
@ -312,56 +303,66 @@ func (w *WalManager) watcher(ctx context.Context) error {
if walData.WalStatus != WalStatusCommitted {
continue
}
if err := w.applyWalChanges(ctx, walData, revision); err != nil {
if err := d.applyWalChanges(ctx, walData, revision); err != nil {
return err
}
case mvccpb.DELETE:
walseq := path.Base(string(key))
w.changes.Lock()
putsToDelete := []string{}
deletesToDelete := []string{}
for p, pwalseq := range w.changes.puts {
if pwalseq == walseq {
putsToDelete = append(putsToDelete, p)
d.changes.Lock()
putsToDelete := map[string][]string{}
deletesToDelete := map[string][]string{}
for _, dataType := range d.dataTypes {
putsToDelete[dataType] = []string{}
deletesToDelete[dataType] = []string{}
}
for _, dataType := range d.dataTypes {
for p, pwalseq := range d.changes.puts[dataType] {
if pwalseq == walseq {
putsToDelete[dataType] = append(putsToDelete[dataType], p)
}
}
}
for p, pwalseq := range w.changes.deletes {
if pwalseq == walseq {
deletesToDelete = append(deletesToDelete, p)
for _, dataType := range d.dataTypes {
for id, pwalseq := range d.changes.deletes[dataType] {
if pwalseq == walseq {
deletesToDelete[dataType] = append(deletesToDelete[dataType], id)
}
}
}
for _, p := range putsToDelete {
w.changes.removePut(p, revision)
for dataType, ids := range putsToDelete {
for _, id := range ids {
d.changes.removePut(dataType, id, revision)
}
}
for _, p := range deletesToDelete {
w.changes.removeDelete(p, revision)
for dataType, ids := range putsToDelete {
for _, id := range ids {
d.changes.removeDelete(dataType, id, revision)
}
}
delete(w.changes.actions, walseq)
delete(d.changes.actions, walseq)
w.changes.updatePathsOrdered()
w.changes.Unlock()
d.changes.Unlock()
}
case strings.HasPrefix(key, etcdChangeGroupsDir+"/"):
switch ev.Type {
case mvccpb.PUT:
w.changes.Lock()
d.changes.Lock()
changeGroup := strings.TrimPrefix(string(ev.Kv.Key), etcdChangeGroupsDir+"/")
w.changes.putChangeGroup(changeGroup, ev.Kv.ModRevision)
w.changes.Unlock()
d.changes.putChangeGroup(changeGroup, ev.Kv.ModRevision)
d.changes.Unlock()
case mvccpb.DELETE:
w.changes.Lock()
d.changes.Lock()
changeGroup := strings.TrimPrefix(string(ev.Kv.Key), etcdChangeGroupsDir+"/")
w.changes.removeChangeGroup(changeGroup)
w.changes.Unlock()
d.changes.removeChangeGroup(changeGroup)
d.changes.Unlock()
}
case key == etcdPingKey:
w.changes.Lock()
w.changes.putRevision(wresp.Header.Revision)
w.changes.Unlock()
d.changes.Lock()
d.changes.putRevision(wresp.Header.Revision)
d.changes.Unlock()
}
}
}

View File

@ -0,0 +1,300 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package datamanager
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"github.com/pkg/errors"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/sequence"
)
type DataStatus struct {
DataSequence string `json:"data_sequence,omitempty"`
WalSequence string `json:"wal_sequence,omitempty"`
Files map[string][]string `json:"files,omitempty"`
}
type DataFileIndex struct {
Index map[string]int `json:"index,omitempty"`
}
type DataEntry struct {
ID string `json:"id,omitempty"`
DataType string `json:"data_type,omitempty"`
Data []byte `json:"data,omitempty"`
}
func dataStatusPath(sequence string) string {
return fmt.Sprintf("%s/%s.status", storageDataDir, sequence)
}
func dataFileIndexPath(datatype, sequence string) string {
return fmt.Sprintf("%s/%s/%s.index", storageDataDir, datatype, sequence)
}
func dataFilePath(datatype, sequence string) string {
return fmt.Sprintf("%s/%s/%s.data", storageDataDir, datatype, sequence)
}
// TODO(sgotti)
// split/merge data files at max N bytes (i.e 16MiB) so we'll rewrite only files
// with changed data
func (d *DataManager) writeData(ctx context.Context, wals []*WalData) error {
dataSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey)
if err != nil {
return err
}
for _, dataType := range d.dataTypes {
if err := d.writeDataType(ctx, wals, dataType, dataSequence.String()); err != nil {
return err
}
}
var lastWalSequence string
for _, walData := range wals {
lastWalSequence = walData.WalSequence
}
dataStatus := &DataStatus{
DataSequence: dataSequence.String(),
WalSequence: lastWalSequence,
Files: make(map[string][]string),
}
for _, dataType := range d.dataTypes {
dataStatus.Files[dataType] = []string{dataFilePath(dataType, dataSequence.String())}
}
dataStatusj, err := json.Marshal(dataStatus)
if err != nil {
return err
}
if err := d.ost.WriteObject(dataStatusPath(dataSequence.String()), bytes.NewReader(dataStatusj)); err != nil {
return err
}
return nil
}
func (d *DataManager) writeDataType(ctx context.Context, wals []*WalData, datatype, dataSequence string) error {
curDataStatus, err := d.GetLastDataStatus()
if err != nil && err != objectstorage.ErrNotExist {
return err
}
dataEntriesMap := map[string]*DataEntry{}
if err != objectstorage.ErrNotExist {
curDataSequence := curDataStatus.DataSequence
oldDataf, err := d.ost.ReadObject(dataFilePath(datatype, curDataSequence))
if err != nil && err != objectstorage.ErrNotExist {
return err
}
if err != objectstorage.ErrNotExist {
dec := json.NewDecoder(oldDataf)
for {
var de *DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil {
oldDataf.Close()
return err
}
dataEntriesMap[de.ID] = de
}
oldDataf.Close()
}
}
for _, walData := range wals {
walFilef, err := d.ReadWal(walData.WalSequence)
if err != nil {
return err
}
dec := json.NewDecoder(walFilef)
var header *WalHeader
if err = dec.Decode(&header); err != nil && err != io.EOF {
walFilef.Close()
return err
}
walFilef.Close()
walFile, err := d.ReadWalData(header.WalDataFileID)
if err != nil {
return errors.Wrapf(err, "cannot read wal data file %q", header.WalDataFileID)
}
defer walFile.Close()
dec = json.NewDecoder(walFile)
for {
var action *Action
err := dec.Decode(&action)
if err == io.EOF {
// all done
break
}
if err != nil {
return errors.Wrapf(err, "failed to decode wal file")
}
if action.DataType != datatype {
continue
}
switch action.ActionType {
case ActionTypePut:
de := &DataEntry{
ID: action.ID,
DataType: action.DataType,
Data: action.Data,
}
dataEntriesMap[de.ID] = de
case ActionTypeDelete:
delete(dataEntriesMap, action.ID)
}
}
}
dataEntries := []*DataEntry{}
for _, de := range dataEntriesMap {
dataEntries = append(dataEntries, de)
}
dataFileIndex := &DataFileIndex{
Index: make(map[string]int),
}
var buf bytes.Buffer
pos := 0
for _, de := range dataEntries {
dataFileIndex.Index[de.ID] = pos
dataEntryj, err := json.Marshal(de)
if err != nil {
return err
}
if _, err := buf.Write(dataEntryj); err != nil {
return err
}
pos += len(dataEntryj)
}
if err := d.ost.WriteObject(dataFilePath(datatype, dataSequence), &buf); err != nil {
return err
}
dataFileIndexj, err := json.Marshal(dataFileIndex)
if err != nil {
return err
}
if err := d.ost.WriteObject(dataFileIndexPath(datatype, dataSequence), bytes.NewReader(dataFileIndexj)); err != nil {
return err
}
return nil
}
func (d *DataManager) Read(dataType, id string) (io.Reader, error) {
curDataStatus, err := d.GetLastDataStatus()
if err != nil {
return nil, err
}
dataSequence := curDataStatus.DataSequence
dataFileIndexf, err := d.ost.ReadObject(dataFileIndexPath(dataType, dataSequence))
if err != nil {
return nil, err
}
var dataFileIndex *DataFileIndex
dec := json.NewDecoder(dataFileIndexf)
err = dec.Decode(&dataFileIndex)
if err != nil {
dataFileIndexf.Close()
return nil, errors.WithStack(err)
}
dataFileIndexf.Close()
pos, ok := dataFileIndex.Index[id]
if !ok {
return nil, objectstorage.ErrNotExist
}
dataf, err := d.ost.ReadObject(dataFilePath(dataType, dataSequence))
if err != nil {
return nil, errors.WithStack(err)
}
if _, err := dataf.Seek(int64(pos), io.SeekStart); err != nil {
dataf.Close()
return nil, errors.WithStack(err)
}
var de *DataEntry
dec = json.NewDecoder(dataf)
if err := dec.Decode(&de); err != nil {
dataf.Close()
return nil, err
}
dataf.Close()
return bytes.NewReader(de.Data), nil
}
func (d *DataManager) GetLastDataStatusPath() (string, error) {
doneCh := make(chan struct{})
defer close(doneCh)
var dataStatusPath string
for object := range d.ost.List(storageDataDir+"/", "", false, doneCh) {
if object.Err != nil {
return "", object.Err
}
if strings.HasSuffix(object.Path, ".status") {
dataStatusPath = object.Path
}
}
if dataStatusPath == "" {
return "", objectstorage.ErrNotExist
}
return dataStatusPath, nil
}
func (d *DataManager) GetLastDataStatus() (*DataStatus, error) {
dataStatusPath, err := d.GetLastDataStatusPath()
if err != nil {
return nil, err
}
dataStatusf, err := d.ost.ReadObject(dataStatusPath)
if err != nil {
return nil, err
}
defer dataStatusf.Close()
var dataStatus *DataStatus
dec := json.NewDecoder(dataStatusf)
return dataStatus, dec.Decode(&dataStatus)
}

View File

@ -0,0 +1,164 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package datamanager
import (
"context"
"path"
"strings"
"time"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/pkg/errors"
"go.uber.org/zap"
)
// TODO(sgotti) handle etcd unwanted changes:
// * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it.
// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one
// Storage paths
// wals/{walSeq}
//
// Etcd paths
// wals/{walSeq}
const (
DefaultCheckpointInterval = 1 * time.Minute
DefaultEtcdWalsKeepNum = 100
DefaultMinCheckpointWalsNum = 100
)
var (
ErrCompacted = errors.New("required revision has been compacted")
ErrConcurrency = errors.New("wal concurrency error: change groups already updated")
)
var (
// Storage paths. Always use path (not filepath) to use the "/" separator
storageDataDir = "data"
storageWalsDir = "wals"
storageWalsStatusDir = path.Join(storageWalsDir, "status")
storageWalsDataDir = path.Join(storageWalsDir, "data")
// etcd paths. Always use path (not filepath) to use the "/" separator
etcdWalBaseDir = "datamanager"
etcdWalsDir = path.Join(etcdWalBaseDir, "wals")
etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata")
etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq")
etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq")
etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock")
etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock")
etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock")
etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups")
etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev")
etcdPingKey = path.Join(etcdWalBaseDir, "ping")
)
const (
etcdChangeGroupMinRevisionRange = 1000
)
type DataManagerConfig struct {
BasePath string
E *etcd.Store
OST *objectstorage.ObjStorage
DataTypes []string
EtcdWalsKeepNum int
CheckpointInterval time.Duration
// MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint
MinCheckpointWalsNum int
}
type DataManager struct {
basePath string
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
changes *WalChanges
dataTypes []string
etcdWalsKeepNum int
checkpointInterval time.Duration
minCheckpointWalsNum int
}
func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) {
if conf.EtcdWalsKeepNum == 0 {
conf.EtcdWalsKeepNum = DefaultEtcdWalsKeepNum
}
if conf.EtcdWalsKeepNum < 1 {
return nil, errors.New("etcdWalsKeepNum must be greater than 0")
}
if conf.CheckpointInterval == 0 {
conf.CheckpointInterval = DefaultCheckpointInterval
}
if conf.MinCheckpointWalsNum == 0 {
conf.MinCheckpointWalsNum = DefaultMinCheckpointWalsNum
}
if conf.MinCheckpointWalsNum < 1 {
return nil, errors.New("minCheckpointWalsNum must be greater than 0")
}
d := &DataManager{
basePath: conf.BasePath,
log: logger.Sugar(),
e: conf.E,
ost: conf.OST,
changes: NewWalChanges(conf.DataTypes),
dataTypes: conf.DataTypes,
etcdWalsKeepNum: conf.EtcdWalsKeepNum,
checkpointInterval: conf.CheckpointInterval,
minCheckpointWalsNum: conf.MinCheckpointWalsNum,
}
// add trailing slash the basepath
if d.basePath != "" && !strings.HasSuffix(d.basePath, "/") {
d.basePath = d.basePath + "/"
}
return d, nil
}
func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
for {
err := d.InitEtcd(ctx)
if err == nil {
break
}
d.log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
}
readyCh <- struct{}{}
go d.watcherLoop(ctx)
go d.syncLoop(ctx)
go d.checkpointLoop(ctx)
go d.walCleanerLoop(ctx)
go d.compactChangeGroupsLoop(ctx)
go d.etcdPingerLoop(ctx)
select {
case <-ctx.Done():
d.log.Infof("walmanager exiting")
return nil
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
package datamanager
import (
"context"
@ -26,7 +26,6 @@ import (
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/testutil"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
@ -82,22 +81,24 @@ func TestEtcdReset(t *testing.T) {
t.Fatalf("unexpected err: %v", err)
}
walConfig := &WalManagerConfig{
dmConfig := &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
EtcdWalsKeepNum: 10,
DataTypes: []string{"datatype01"},
}
wal, err := NewWalManager(ctx, logger, walConfig)
walReadyCh := make(chan struct{})
dm, err := NewDataManager(ctx, logger, dmConfig)
dmReadyCh := make(chan struct{})
t.Logf("starting wal")
go wal.Run(ctx, walReadyCh)
<-walReadyCh
t.Logf("starting datamanager")
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
actions := []*Action{
{
ActionType: ActionTypePut,
DataType: "datatype01",
Data: []byte("{}"),
},
}
@ -107,7 +108,7 @@ func TestEtcdReset(t *testing.T) {
objectID := fmt.Sprintf("object%02d", i)
expectedObjects = append(expectedObjects, objectID)
actions[0].ID = objectID
if _, err := wal.WriteWal(ctx, actions, nil); err != nil {
if _, err := dm.WriteWal(ctx, actions, nil); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
@ -115,7 +116,7 @@ func TestEtcdReset(t *testing.T) {
// wait for wal to be committed storage
time.Sleep(5 * time.Second)
t.Logf("stopping wal")
t.Logf("stopping datamanager")
cancel()
t.Logf("stopping etcd")
@ -133,35 +134,29 @@ func TestEtcdReset(t *testing.T) {
defer shutdownEtcd(tetcd)
ctx, cancel = context.WithCancel(context.Background())
walConfig = &WalManagerConfig{
defer cancel()
dmConfig = &DataManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
EtcdWalsKeepNum: 10,
DataTypes: []string{"datatype01"},
}
wal, err = NewWalManager(ctx, logger, walConfig)
walReadyCh = make(chan struct{})
dm, err = NewDataManager(ctx, logger, dmConfig)
dmReadyCh = make(chan struct{})
t.Logf("starting wal")
go wal.Run(ctx, walReadyCh)
<-walReadyCh
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
time.Sleep(5 * time.Second)
curObjects := []string{}
doneCh := make(chan struct{})
for object := range wal.List("", "", true, doneCh) {
t.Logf("path: %q", object.Path)
if object.Err != nil {
t.Fatalf("unexpected err: %v", object.Err)
for i := 0; i < 20; i++ {
objectID := fmt.Sprintf("object%02d", i)
_, _, err = dm.ReadObject("datatype01", objectID, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
curObjects = append(curObjects, object.Path)
}
close(doneCh)
t.Logf("curObjects: %s", curObjects)
if diff := cmp.Diff(expectedObjects, curObjects); diff != "" {
t.Error(diff)
}
}
@ -185,61 +180,63 @@ func TestConcurrentUpdate(t *testing.T) {
t.Fatalf("unexpected err: %v", err)
}
walConfig := &WalManagerConfig{
dmConfig := &DataManagerConfig{
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
EtcdWalsKeepNum: 10,
DataTypes: []string{"datatype01"},
}
wal, err := NewWalManager(ctx, logger, walConfig)
dm, err := NewDataManager(ctx, logger, dmConfig)
actions := []*Action{
{
ActionType: ActionTypePut,
ID: "/object01",
ID: "object01",
DataType: "datatype01",
Data: []byte("{}"),
},
}
walReadyCh := make(chan struct{})
go wal.Run(ctx, walReadyCh)
<-walReadyCh
dmReadyCh := make(chan struct{})
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
time.Sleep(5 * time.Second)
cgNames := []string{"changegroup01", "changegroup02"}
cgt, err := wal.GetChangeGroupsUpdateToken(cgNames)
cgt, err := dm.GetChangeGroupsUpdateToken(cgNames)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// populate with a wal
cgt, err = wal.WriteWal(ctx, actions, cgt)
cgt, err = dm.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// this must work successfully
oldcgt := cgt
cgt, err = wal.WriteWal(ctx, actions, cgt)
cgt, err = dm.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// this must fail since we are using the old cgt
_, err = wal.WriteWal(ctx, actions, oldcgt)
_, err = dm.WriteWal(ctx, actions, oldcgt)
if err != ErrConcurrency {
t.Fatalf("expected err: %v, got %v", ErrConcurrency, err)
}
oldcgt = cgt
// this must work successfully
cgt, err = wal.WriteWal(ctx, actions, cgt)
cgt, err = dm.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// this must fail since we are using the old cgt
_, err = wal.WriteWal(ctx, actions, oldcgt)
_, err = dm.WriteWal(ctx, actions, oldcgt)
if err != ErrConcurrency {
t.Fatalf("expected err: %v, got %v", ErrConcurrency, err)
}
@ -266,39 +263,155 @@ func TestWalCleaner(t *testing.T) {
}
walKeepNum := 10
walConfig := &WalManagerConfig{
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
EtcdWalsKeepNum: walKeepNum,
dmConfig := &DataManagerConfig{
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
EtcdWalsKeepNum: walKeepNum,
DataTypes: []string{"datatype01"},
MinCheckpointWalsNum: 1,
}
wal, err := NewWalManager(ctx, logger, walConfig)
dm, err := NewDataManager(ctx, logger, dmConfig)
actions := []*Action{
{
ActionType: ActionTypePut,
ID: "/object01",
ID: "object01",
DataType: "datatype01",
Data: []byte("{}"),
},
}
walReadyCh := make(chan struct{})
go wal.Run(ctx, walReadyCh)
<-walReadyCh
dmReadyCh := make(chan struct{})
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
for i := 0; i < 20; i++ {
if _, err := wal.WriteWal(ctx, actions, nil); err != nil {
if _, err := dm.WriteWal(ctx, actions, nil); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// wait for walCleaner to complete
time.Sleep(5 * time.Second)
dm.checkpoint(ctx)
dm.walCleaner(ctx)
walsCount := 0
for range wal.ListEtcdWals(ctx, 0) {
for range dm.ListEtcdWals(ctx, 0) {
walsCount++
}
if walsCount != walKeepNum {
t.Fatalf("expected %d wals in etcd, got %d wals", walKeepNum, walsCount)
}
}
func TestReadObject(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx := context.Background()
ostDir, err := ioutil.TempDir(dir, "ost")
ost, err := objectstorage.NewPosixStorage(ostDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
dmConfig := &DataManagerConfig{
E: tetcd.TestEtcd.Store,
OST: objectstorage.NewObjStorage(ost, "/"),
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum: 1,
DataTypes: []string{"datatype01"},
}
dm, err := NewDataManager(ctx, logger, dmConfig)
actions := []*Action{}
for i := 0; i < 20; i++ {
actions = append(actions, &Action{
ActionType: ActionTypePut,
ID: fmt.Sprintf("object%d", i),
DataType: "datatype01",
Data: []byte(fmt.Sprintf(`{ "ID": "%d" }`, i)),
})
}
dmReadyCh := make(chan struct{})
go dm.Run(ctx, dmReadyCh)
<-dmReadyCh
time.Sleep(5 * time.Second)
// populate with a wal
_, err = dm.WriteWal(ctx, actions, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// wait for the event to be read
time.Sleep(500 * time.Millisecond)
// should read it
_, _, err = dm.ReadObject("datatype01", "object1", nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
_, _, err = dm.ReadObject("datatype01", "object19", nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
actions = []*Action{}
for i := 0; i < 10; i++ {
actions = append(actions, &Action{
ActionType: ActionTypeDelete,
ID: fmt.Sprintf("object%d", i),
DataType: "datatype01",
})
}
_, err = dm.WriteWal(ctx, actions, nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// wait for the event to be read
time.Sleep(500 * time.Millisecond)
// test read from changes (since not checkpoint yet)
// should not exists
_, _, err = dm.ReadObject("datatype01", "object1", nil)
if err != objectstorage.ErrNotExist {
t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err)
}
// should exist
_, _, err = dm.ReadObject("datatype01", "object19", nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
// do a checkpoint and wal clean
dm.checkpoint(ctx)
dm.walCleaner(ctx)
// wait for the event to be read
time.Sleep(500 * time.Millisecond)
// test read from data
// should not exists
_, _, err = dm.ReadObject("datatype01", "object1", nil)
if err != objectstorage.ErrNotExist {
t.Fatalf("expected err %v, got: %v", objectstorage.ErrNotExist, err)
}
// should exist
_, _, err = dm.ReadObject("datatype01", "object19", nil)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
package datamanager
import (
"bytes"
@ -35,70 +35,14 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)
// TODO(sgotti) handle etcd unwanted changes:
// * Etcd cluster rebuild: we cannot rely on etcd header ClusterID since it could be the same as it's generated using the listen urls. We should add our own clusterid key and use it.
// * Etcd cluster restored to a previous revision: really bad cause should detect that the revision is smaller than the current one
// Storage paths
// wals/{walSeq}
//
// Etcd paths
// wals/{walSeq}
const (
DefaultEtcdWalsKeepNum = 100
)
var (
ErrCompacted = errors.New("required revision has been compacted")
ErrConcurrency = errors.New("wal concurrency error: change groups already updated")
)
var (
// Storage paths. Always use path (not filepath) to use the "/" separator
storageObjectsPrefix = "data/"
storageWalsDir = "wals"
storageWalsStatusDir = path.Join(storageWalsDir, "status")
storageWalsDataDir = path.Join(storageWalsDir, "data")
// etcd paths. Always use path (not filepath) to use the "/" separator
etcdWalBaseDir = "walmanager"
etcdWalsDir = path.Join(etcdWalBaseDir, "wals")
etcdWalsDataKey = path.Join(etcdWalBaseDir, "walsdata")
etcdWalSeqKey = path.Join(etcdWalBaseDir, "walseq")
etcdLastCommittedStorageWalSeqKey = path.Join(etcdWalBaseDir, "lastcommittedstoragewalseq")
etcdSyncLockKey = path.Join(etcdWalBaseDir, "synclock")
etcdCheckpointLockKey = path.Join(etcdWalBaseDir, "checkpointlock")
etcdWalCleanerLockKey = path.Join(etcdWalBaseDir, "walcleanerlock")
etcdChangeGroupsDir = path.Join(etcdWalBaseDir, "changegroups")
etcdChangeGroupMinRevisionKey = path.Join(etcdWalBaseDir, "changegroupsminrev")
etcdPingKey = path.Join(etcdWalBaseDir, "ping")
)
const (
etcdChangeGroupMinRevisionRange = 1000
)
func (w *WalManager) toStorageDataPath(path string) string {
return w.basePath + storageObjectsPrefix + path
func (d *DataManager) storageWalStatusFile(walSeq string) string {
return path.Join(d.basePath, storageWalsStatusDir, walSeq)
}
func (w *WalManager) fromStorageDataPath(path string) string {
return strings.TrimPrefix(path, w.basePath+storageObjectsPrefix)
}
func (w *WalManager) storageWalStatusFile(walSeq string) string {
return path.Join(w.basePath, storageWalsStatusDir, walSeq)
}
func (w *WalManager) storageWalDataFile(walFileID string) string {
return path.Join(w.basePath, storageWalsDataDir, walFileID)
func (d *DataManager) storageWalDataFile(walFileID string) string {
return path.Join(d.basePath, storageWalsDataDir, walFileID)
}
func etcdWalKey(walSeq string) string {
@ -122,7 +66,6 @@ type Action struct {
type WalHeader struct {
WalDataFileID string
PreviousWalSequence string
ChangeGroups map[string]int64
}
type WalStatus string
@ -146,7 +89,9 @@ type WalData struct {
WalStatus WalStatus
WalSequence string
PreviousWalSequence string
ChangeGroups map[string]int64
// internal values not saved
Revision int64 `json:"-"`
}
type ChangeGroupsUpdateToken struct {
@ -156,85 +101,53 @@ type ChangeGroupsUpdateToken struct {
type changeGroupsRevisions map[string]int64
func (w *WalManager) ChangesCurrentRevision() (int64, error) {
w.changes.Lock()
defer w.changes.Unlock()
if !w.changes.initialized {
return 0, errors.Errorf("wal changes not ready")
}
return w.changes.revision, nil
}
func (w *WalManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) {
w.changes.Lock()
defer w.changes.Unlock()
if !w.changes.initialized {
func (d *DataManager) GetChangeGroupsUpdateToken(cgNames []string) (*ChangeGroupsUpdateToken, error) {
d.changes.Lock()
defer d.changes.Unlock()
if !d.changes.initialized {
return nil, errors.Errorf("wal changes not ready")
}
revision := w.changes.curRevision()
cgr := w.changes.getChangeGroups(cgNames)
revision := d.changes.curRevision()
cgr := d.changes.getChangeGroups(cgNames)
return &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
}
func (w *WalManager) MergeChangeGroupsUpdateTokens(cgts []*ChangeGroupsUpdateToken) *ChangeGroupsUpdateToken {
mcgt := &ChangeGroupsUpdateToken{ChangeGroupsRevisions: make(changeGroupsRevisions)}
for _, cgt := range cgts {
// keep the lower curRevision
if cgt.CurRevision != 0 && cgt.CurRevision < mcgt.CurRevision {
mcgt.CurRevision = cgt.CurRevision
}
// keep the lower changegroup revision
for cgName, cgRev := range cgt.ChangeGroupsRevisions {
if mr, ok := mcgt.ChangeGroupsRevisions[cgName]; ok {
if cgRev < mr {
mcgt.ChangeGroupsRevisions[cgName] = cgRev
}
} else {
mcgt.ChangeGroupsRevisions[cgName] = cgRev
}
}
}
return mcgt
}
func (w *WalManager) ReadObject(p string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) {
w.changes.Lock()
if !w.changes.initialized {
w.changes.Unlock()
func (d *DataManager) ReadObject(dataType, id string, cgNames []string) (io.ReadCloser, *ChangeGroupsUpdateToken, error) {
d.changes.Lock()
if !d.changes.initialized {
d.changes.Unlock()
return nil, nil, errors.Errorf("wal changes not ready")
}
walseq, ok := w.changes.getPut(p)
revision := w.changes.curRevision()
cgr := w.changes.getChangeGroups(cgNames)
actions := w.changes.actions[walseq]
w.changes.Unlock()
walseq, ok := d.changes.getPut(dataType, id)
revision := d.changes.curRevision()
cgr := d.changes.getChangeGroups(cgNames)
actions := d.changes.actions[walseq]
d.changes.Unlock()
cgt := &ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}
if ok {
for _, action := range actions {
if action.ActionType == ActionTypePut {
dataPath := w.dataToPathFunc(action.DataType, action.ID)
if dataPath == p {
w.log.Debugf("reading file from wal: %q", dataPath)
if action.DataType == dataType && action.ID == id {
d.log.Debugf("reading datatype %q, id %q from wal: %q", dataType, id)
return ioutil.NopCloser(bytes.NewReader(action.Data)), cgt, nil
}
}
}
return nil, nil, errors.Errorf("no file %s in wal %s", p, walseq)
return nil, nil, errors.Errorf("no datatype %q, id %q in wal %s", dataType, id, walseq)
}
f, err := w.ost.ReadObject(w.toStorageDataPath(p))
return f, cgt, err
f, err := d.Read(dataType, id)
return ioutil.NopCloser(f), cgt, err
}
func (w *WalManager) changesList(paths []string, prefix, startWith string, recursive bool) []string {
func (d *DataManager) changesList(paths []string, prefix, startWith string, recursive bool) []string {
fpaths := []string{}
for _, p := range paths {
if !recursive && len(p) > len(prefix) {
rel := strings.TrimPrefix(p, prefix)
skip := strings.Contains(rel, w.ost.Delimiter())
skip := strings.Contains(rel, d.ost.Delimiter())
if skip {
continue
}
@ -247,79 +160,8 @@ func (w *WalManager) changesList(paths []string, prefix, startWith string, recur
return fpaths
}
func (w *WalManager) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan objectstorage.ObjectInfo {
objectCh := make(chan objectstorage.ObjectInfo, 1)
prefix = w.toStorageDataPath(prefix)
startWith = w.toStorageDataPath(startWith)
w.changes.Lock()
if !w.changes.initialized {
w.changes.Unlock()
objectCh <- objectstorage.ObjectInfo{Err: errors.Errorf("wal changes not ready")}
return objectCh
}
changesList := w.changesList(w.changes.pathsOrdered, prefix, startWith, recursive)
deletedChangesMap := w.changes.getDeletesMap()
w.changes.Unlock()
ci := 0
go func(objectCh chan<- objectstorage.ObjectInfo) {
defer close(objectCh)
for object := range w.ost.List(prefix, startWith, recursive, doneCh) {
if object.Err != nil {
objectCh <- object
return
}
object.Path = w.fromStorageDataPath(object.Path)
for ci < len(changesList) {
p := changesList[ci]
if p < object.Path {
//w.log.Infof("using path from changelist: %q", p)
select {
// Send object content.
case objectCh <- objectstorage.ObjectInfo{Path: p}:
// If receives done from the caller, return here.
case <-doneCh:
return
}
ci++
} else if p == object.Path {
ci++
break
} else {
break
}
}
if _, ok := deletedChangesMap[object.Path]; ok {
continue
}
//w.log.Infof("using path from objectstorage: %q", object.Path)
select {
// Send object content.
case objectCh <- object:
// If receives done from the caller, return here.
case <-doneCh:
return
}
}
for ci < len(changesList) {
//w.log.Infof("using path from changelist: %q", changesList[ci])
objectCh <- objectstorage.ObjectInfo{
Path: changesList[ci],
}
ci++
}
}(objectCh)
return objectCh
}
func (w *WalManager) HasOSTWal(walseq string) (bool, error) {
_, err := w.ost.Stat(w.storageWalStatusFile(walseq) + ".committed")
func (d *DataManager) HasOSTWal(walseq string) (bool, error) {
_, err := d.ost.Stat(d.storageWalStatusFile(walseq) + ".committed")
if err == objectstorage.ErrNotExist {
return false, nil
}
@ -329,12 +171,12 @@ func (w *WalManager) HasOSTWal(walseq string) (bool, error) {
return true, nil
}
func (w *WalManager) ReadWal(walseq string) (io.ReadCloser, error) {
return w.ost.ReadObject(w.storageWalStatusFile(walseq) + ".committed")
func (d *DataManager) ReadWal(walseq string) (io.ReadCloser, error) {
return d.ost.ReadObject(d.storageWalStatusFile(walseq) + ".committed")
}
func (w *WalManager) ReadWalData(walFileID string) (io.ReadCloser, error) {
return w.ost.ReadObject(w.storageWalDataFile(walFileID))
func (d *DataManager) ReadWalData(walFileID string) (io.ReadCloser, error) {
return d.ost.ReadObject(d.storageWalDataFile(walFileID))
}
type WalFile struct {
@ -344,7 +186,7 @@ type WalFile struct {
Checkpointed bool
}
func (w *WalManager) ListOSTWals(start string) <-chan *WalFile {
func (d *DataManager) ListOSTWals(start string) <-chan *WalFile {
walCh := make(chan *WalFile, 1)
go func() {
@ -355,10 +197,10 @@ func (w *WalManager) ListOSTWals(start string) <-chan *WalFile {
curWal := &WalFile{}
var startPath string
if start != "" {
startPath = w.storageWalStatusFile(start)
startPath = d.storageWalStatusFile(start)
}
for object := range w.ost.List(path.Join(w.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) {
for object := range d.ost.List(path.Join(d.basePath, storageWalsStatusDir)+"/", startPath, true, doneCh) {
if object.Err != nil {
walCh <- &WalFile{
Err: object.Err,
@ -411,14 +253,14 @@ type ListEtcdWalsElement struct {
Err error
}
func (w *WalManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement {
func (d *DataManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *ListEtcdWalsElement {
walCh := make(chan *ListEtcdWalsElement, 1)
go func() {
defer close(walCh)
var continuation *etcd.ListPagedContinuation
for {
listResp, err := w.e.ListPaged(ctx, etcdWalsDir, revision, 10, continuation)
listResp, err := d.e.ListPaged(ctx, etcdWalsDir, revision, 10, continuation)
if err != nil {
walCh <- &ListEtcdWalsElement{
Err: err,
@ -448,9 +290,9 @@ func (w *WalManager) ListEtcdWals(ctx context.Context, revision int64) <-chan *L
// FirstAvailableWalData returns the first (the one with smaller sequence) wal
// and returns it (or nil if not available) and the etcd revision at the time of
// the operation
func (w *WalManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error) {
func (d *DataManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64, error) {
// list waldata and just get the first if available
listResp, err := w.e.ListPaged(ctx, etcdWalsDir, 0, 1, nil)
listResp, err := d.e.ListPaged(ctx, etcdWalsDir, 0, 1, nil)
if err != nil {
return nil, 0, err
}
@ -469,8 +311,8 @@ func (w *WalManager) FirstAvailableWalData(ctx context.Context) (*WalData, int64
return walData, revision, nil
}
func (w *WalManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) {
resp, err := w.e.Get(ctx, etcdLastCommittedStorageWalSeqKey, 0)
func (d *DataManager) LastCommittedStorageWal(ctx context.Context) (string, int64, error) {
resp, err := d.e.Get(ctx, etcdLastCommittedStorageWalSeqKey, 0)
if err != nil && err != etcd.ErrKeyNotFound {
return "", 0, err
}
@ -491,7 +333,7 @@ type WatchElement struct {
Err error
}
func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement {
func (d *DataManager) Watch(ctx context.Context, revision int64) <-chan *WatchElement {
walCh := make(chan *WatchElement, 1)
// TODO(sgotti) if the etcd cluster goes down, watch won't return an error but
@ -499,7 +341,7 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle
// is down and report an error so our clients can react (i.e. a readdb could
// mark itself as not in sync)
wctx := etcdclientv3.WithRequireLeader(ctx)
wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision)
wch := d.e.Watch(wctx, etcdWalBaseDir+"/", revision)
go func() {
defer close(walCh)
@ -577,21 +419,21 @@ func (w *WalManager) Watch(ctx context.Context, revision int64) <-chan *WatchEle
// handle possible objectstorage list operation eventual consistency gaps (list
// won't report a wal at seq X but a wal at X+n, if this kind of eventual
// consistency ever exists)
func (w *WalManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) {
return w.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil)
func (d *DataManager) WriteWal(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken) (*ChangeGroupsUpdateToken, error) {
return d.WriteWalAdditionalOps(ctx, actions, cgt, nil, nil)
}
func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error) {
func (d *DataManager) WriteWalAdditionalOps(ctx context.Context, actions []*Action, cgt *ChangeGroupsUpdateToken, cmp []etcdclientv3.Cmp, then []etcdclientv3.Op) (*ChangeGroupsUpdateToken, error) {
if len(actions) == 0 {
return nil, errors.Errorf("cannot write wal: actions is empty")
}
walSequence, err := sequence.IncSequence(ctx, w.e, etcdWalSeqKey)
walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey)
if err != nil {
return nil, err
}
resp, err := w.e.Get(ctx, etcdWalsDataKey, 0)
resp, err := d.e.Get(ctx, etcdWalsDataKey, 0)
if err != nil {
return nil, err
}
@ -603,7 +445,7 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio
walsData.Revision = resp.Kvs[0].ModRevision
walDataFileID := uuid.NewV4().String()
walDataFilePath := w.storageWalDataFile(walDataFileID)
walDataFilePath := d.storageWalDataFile(walDataFileID)
walKey := etcdWalKey(walSequence.String())
var buf bytes.Buffer
@ -616,10 +458,10 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio
return nil, err
}
}
if err := w.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil {
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader(buf.Bytes())); err != nil {
return nil, err
}
w.log.Debugf("wrote wal file: %s", walDataFilePath)
d.log.Debugf("wrote wal file: %s", walDataFilePath)
walsData.LastCommittedWalSequence = walSequence.String()
@ -673,7 +515,7 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio
// This will only succeed if no one else have concurrently updated the walsData
// TODO(sgotti) retry if it failed due to concurrency errors
txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...).Else(getWalsData, getWal)
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...).Else(getWalsData, getWal)
tresp, err := txn.Commit()
if err != nil {
return nil, etcd.FromEtcdError(err)
@ -697,18 +539,18 @@ func (w *WalManager) WriteWalAdditionalOps(ctx context.Context, actions []*Actio
}
// try to commit storage right now
if err := w.sync(ctx); err != nil {
w.log.Errorf("wal sync error: %+v", err)
if err := d.sync(ctx); err != nil {
d.log.Errorf("wal sync error: %+v", err)
}
return ncgt, nil
}
func (w *WalManager) syncLoop(ctx context.Context) {
func (d *DataManager) syncLoop(ctx context.Context) {
for {
w.log.Debugf("syncer")
if err := w.sync(ctx); err != nil {
w.log.Errorf("syncer error: %+v", err)
d.log.Debugf("syncer")
if err := d.sync(ctx); err != nil {
d.log.Errorf("syncer error: %+v", err)
}
select {
@ -721,8 +563,8 @@ func (w *WalManager) syncLoop(ctx context.Context) {
}
}
func (w *WalManager) sync(ctx context.Context) error {
session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
func (d *DataManager) sync(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
@ -735,7 +577,7 @@ func (w *WalManager) sync(ctx context.Context) error {
}
defer m.Unlock(ctx)
resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0)
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return err
}
@ -748,11 +590,10 @@ func (w *WalManager) sync(ctx context.Context) error {
// TODO(sgotti) this could be optimized by parallelizing writes of wals that don't have common change groups
switch walData.WalStatus {
case WalStatusCommitted:
walFilePath := w.storageWalStatusFile(walData.WalSequence)
w.log.Debugf("syncing committed wal to storage")
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("syncing committed wal to storage")
header := &WalHeader{
WalDataFileID: walData.WalDataFileID,
ChangeGroups: walData.ChangeGroups,
PreviousWalSequence: walData.PreviousWalSequence,
}
headerj, err := json.Marshal(header)
@ -761,11 +602,11 @@ func (w *WalManager) sync(ctx context.Context) error {
}
walFileCommittedPath := walFilePath + ".committed"
if err := w.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil {
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj)); err != nil {
return err
}
w.log.Debugf("updating wal to state %q", WalStatusCommittedStorage)
d.log.Debugf("updating wal to state %q", WalStatusCommittedStorage)
walData.WalStatus = WalStatusCommittedStorage
walDataj, err := json.Marshal(walData)
if err != nil {
@ -779,7 +620,7 @@ func (w *WalManager) sync(ctx context.Context) error {
then = append(then, etcdclientv3.OpPut(string(etcdLastCommittedStorageWalSeqKey), string(walData.WalSequence)))
// This will only succeed if the no one else have concurrently updated the wal keys in etcd
txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...)
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...)
tresp, err := txn.Commit()
if err != nil {
return etcd.FromEtcdError(err)
@ -788,10 +629,10 @@ func (w *WalManager) sync(ctx context.Context) error {
return errors.Errorf("failed to write committedstorage wal: concurrent update")
}
case WalStatusCheckpointed:
walFilePath := w.storageWalStatusFile(walData.WalSequence)
w.log.Debugf("checkpointing committed wal to storage")
walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("checkpointing committed wal to storage")
walFileCheckpointedPath := walFilePath + ".checkpointed"
if err := w.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil {
if err := d.ost.WriteObject(walFileCheckpointedPath, bytes.NewReader([]byte{})); err != nil {
return err
}
}
@ -799,11 +640,11 @@ func (w *WalManager) sync(ctx context.Context) error {
return nil
}
func (w *WalManager) checkpointLoop(ctx context.Context) {
func (d *DataManager) checkpointLoop(ctx context.Context) {
for {
w.log.Debugf("checkpointer")
if err := w.checkpoint(ctx); err != nil {
w.log.Errorf("checkpoint error: %v", err)
d.log.Debugf("checkpointer")
if err := d.checkpoint(ctx); err != nil {
d.log.Errorf("checkpoint error: %v", err)
}
select {
@ -812,12 +653,12 @@ func (w *WalManager) checkpointLoop(ctx context.Context) {
default:
}
time.Sleep(2 * time.Second)
time.Sleep(d.checkpointInterval)
}
}
func (w *WalManager) checkpoint(ctx context.Context) error {
session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
func (d *DataManager) checkpoint(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
@ -830,79 +671,44 @@ func (w *WalManager) checkpoint(ctx context.Context) error {
}
defer m.Unlock(ctx)
resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0)
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return err
}
walsData := []*WalData{}
for _, kv := range resp.Kvs {
var walData WalData
var walData *WalData
if err := json.Unmarshal(kv.Value, &walData); err != nil {
return err
}
walData.Revision = kv.ModRevision
if walData.WalStatus == WalStatusCommitted {
w.log.Warnf("wal %s not yet committed storage", walData.WalSequence)
d.log.Warnf("wal %s not yet committed storage", walData.WalSequence)
break
}
if walData.WalStatus == WalStatusCheckpointed {
continue
}
walFilePath := w.storageWalDataFile(walData.WalDataFileID)
w.log.Debugf("checkpointing wal: %q", walData.WalSequence)
walsData = append(walsData, walData)
}
if len(walsData) < d.minCheckpointWalsNum {
return nil
}
walFile, err := w.ost.ReadObject(walFilePath)
if err != nil {
return err
}
dec := json.NewDecoder(walFile)
for {
var action *Action
if err := d.writeData(ctx, walsData); err != nil {
return errors.Wrapf(err, "checkpoint function error")
}
err := dec.Decode(&action)
if err == io.EOF {
// all done
break
}
if err != nil {
walFile.Close()
return err
}
if err := w.checkpointAction(ctx, action); err != nil {
walFile.Close()
return err
}
}
walFile.Close()
w.log.Debugf("updating wal to state %q", WalStatusCheckpointed)
for _, walData := range walsData {
d.log.Debugf("updating wal to state %q", WalStatusCheckpointed)
walData.WalStatus = WalStatusCheckpointed
walDataj, err := json.Marshal(walData)
if err != nil {
return err
}
if _, err := w.e.AtomicPut(ctx, string(kv.Key), walDataj, kv.ModRevision, nil); err != nil {
return err
}
}
return nil
}
func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error {
dataPath := w.dataToPathFunc(action.DataType, action.ID)
if dataPath == "" {
return nil
}
path := w.toStorageDataPath(dataPath)
switch action.ActionType {
case ActionTypePut:
w.log.Debugf("writing file: %q", path)
if err := w.ost.WriteObject(path, bytes.NewReader(action.Data)); err != nil {
return err
}
case ActionTypeDelete:
w.log.Debugf("deleting file: %q", path)
if err := w.ost.DeleteObject(path); err != nil && err != objectstorage.ErrNotExist {
walKey := etcdWalKey(walData.WalSequence)
if _, err := d.e.AtomicPut(ctx, walKey, walDataj, walData.Revision, nil); err != nil {
return err
}
}
@ -910,11 +716,11 @@ func (w *WalManager) checkpointAction(ctx context.Context, action *Action) error
return nil
}
func (w *WalManager) walCleanerLoop(ctx context.Context) {
func (d *DataManager) walCleanerLoop(ctx context.Context) {
for {
w.log.Debugf("walcleaner")
if err := w.walCleaner(ctx); err != nil {
w.log.Errorf("walcleaner error: %v", err)
d.log.Debugf("walcleaner")
if err := d.walCleaner(ctx); err != nil {
d.log.Errorf("walcleaner error: %v", err)
}
select {
@ -930,8 +736,8 @@ func (w *WalManager) walCleanerLoop(ctx context.Context) {
// walCleaner will clean already checkpointed wals from etcd
// it must always keep at least one wal that is needed for resync operations
// from clients
func (w *WalManager) walCleaner(ctx context.Context) error {
session, err := concurrency.NewSession(w.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
func (d *DataManager) walCleaner(ctx context.Context) error {
session, err := concurrency.NewSession(d.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
if err != nil {
return err
}
@ -944,14 +750,14 @@ func (w *WalManager) walCleaner(ctx context.Context) error {
}
defer m.Unlock(ctx)
resp, err := w.e.List(ctx, etcdWalsDir+"/", "", 0)
resp, err := d.e.List(ctx, etcdWalsDir+"/", "", 0)
if err != nil {
return err
}
if len(resp.Kvs) <= w.etcdWalsKeepNum {
if len(resp.Kvs) <= d.etcdWalsKeepNum {
return nil
}
removeCount := len(resp.Kvs) - w.etcdWalsKeepNum
removeCount := len(resp.Kvs) - d.etcdWalsKeepNum
for _, kv := range resp.Kvs {
var walData WalData
@ -969,8 +775,8 @@ func (w *WalManager) walCleaner(ctx context.Context) error {
// sure that no objects with old data will be returned? Is it enough to read
// it back or the result could just be luckily correct but another client may
// arrive to a differnt S3 server that is not yet in sync?
w.log.Infof("removing wal %q from etcd", walData.WalSequence)
if _, err := w.e.AtomicDelete(ctx, string(kv.Key), kv.ModRevision); err != nil {
d.log.Infof("removing wal %q from etcd", walData.WalSequence)
if _, err := d.e.AtomicDelete(ctx, string(kv.Key), kv.ModRevision); err != nil {
return err
}
@ -983,10 +789,10 @@ func (w *WalManager) walCleaner(ctx context.Context) error {
return nil
}
func (w *WalManager) compactChangeGroupsLoop(ctx context.Context) {
func (d *DataManager) compactChangeGroupsLoop(ctx context.Context) {
for {
if err := w.compactChangeGroups(ctx); err != nil {
w.log.Errorf("err: %+v", err)
if err := d.compactChangeGroups(ctx); err != nil {
d.log.Errorf("err: %+v", err)
}
select {
@ -999,8 +805,8 @@ func (w *WalManager) compactChangeGroupsLoop(ctx context.Context) {
}
}
func (w *WalManager) compactChangeGroups(ctx context.Context) error {
resp, err := w.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey)
func (d *DataManager) compactChangeGroups(ctx context.Context) error {
resp, err := d.e.Client().Get(ctx, etcdChangeGroupMinRevisionKey)
if err != nil {
return err
}
@ -1010,7 +816,7 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error {
// first update minrevision
cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(etcdChangeGroupMinRevisionKey), "=", revision)
then := etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, "")
txn := w.e.Client().Txn(ctx).If(cmp).Then(then)
txn := d.e.Client().Txn(ctx).If(cmp).Then(then)
tresp, err := txn.Commit()
if err != nil {
return etcd.FromEtcdError(err)
@ -1022,7 +828,7 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error {
revision = tresp.Header.Revision
// then remove all the groups keys with modrevision < minrevision
resp, err = w.e.List(ctx, etcdChangeGroupsDir, "", 0)
resp, err = d.e.List(ctx, etcdChangeGroupsDir, "", 0)
if err != nil {
return err
}
@ -1030,13 +836,13 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error {
if kv.ModRevision < revision-etcdChangeGroupMinRevisionRange {
cmp := etcdclientv3.Compare(etcdclientv3.ModRevision(string(kv.Key)), "=", kv.ModRevision)
then := etcdclientv3.OpDelete(string(kv.Key))
txn := w.e.Client().Txn(ctx).If(cmp).Then(then)
txn := d.e.Client().Txn(ctx).If(cmp).Then(then)
tresp, err := txn.Commit()
if err != nil {
return etcd.FromEtcdError(err)
}
if !tresp.Succeeded {
w.log.Errorf("failed to update change group min revision key due to concurrent update")
d.log.Errorf("failed to update change group min revision key due to concurrent update")
}
}
}
@ -1050,10 +856,10 @@ func (w *WalManager) compactChangeGroups(ctx context.Context) error {
// walWrites will fails since the provided changegrouptoken will have an old
// revision
// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress???
func (w *WalManager) etcdPingerLoop(ctx context.Context) {
func (d *DataManager) etcdPingerLoop(ctx context.Context) {
for {
if err := w.etcdPinger(ctx); err != nil {
w.log.Errorf("err: %+v", err)
if err := d.etcdPinger(ctx); err != nil {
d.log.Errorf("err: %+v", err)
}
select {
@ -1066,17 +872,16 @@ func (w *WalManager) etcdPingerLoop(ctx context.Context) {
}
}
func (w *WalManager) etcdPinger(ctx context.Context) error {
if _, err := w.e.Put(ctx, etcdPingKey, []byte{}, nil); err != nil {
func (d *DataManager) etcdPinger(ctx context.Context) error {
if _, err := d.e.Put(ctx, etcdPingKey, []byte{}, nil); err != nil {
return err
}
return nil
}
func (w *WalManager) InitEtcd(ctx context.Context) error {
func (d *DataManager) InitEtcd(ctx context.Context) error {
writeWal := func(wal *WalFile) error {
w.log.Infof("wal seq: %s", wal.WalSequence)
walFile, err := w.ost.ReadObject(w.storageWalStatusFile(wal.WalSequence) + ".committed")
walFile, err := d.ost.ReadObject(d.storageWalStatusFile(wal.WalSequence) + ".committed")
if err != nil {
return err
}
@ -1092,7 +897,6 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
WalSequence: wal.WalSequence,
WalDataFileID: header.WalDataFileID,
WalStatus: WalStatusCommitted,
ChangeGroups: header.ChangeGroups,
}
if wal.Checkpointed {
walData.WalStatus = WalStatusCheckpointed
@ -1107,7 +911,7 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
// only add if it doesn't exist
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalKey(wal.WalSequence)), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdWalKey(wal.WalSequence), string(walDataj)))
txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...)
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...)
tresp, err := txn.Commit()
if err != nil {
return etcd.FromEtcdError(err)
@ -1124,12 +928,12 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdChangeGroupMinRevisionKey), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdChangeGroupMinRevisionKey, ""))
txn := w.e.Client().Txn(ctx).If(cmp...).Then(then...)
txn := d.e.Client().Txn(ctx).If(cmp...).Then(then...)
if _, err := txn.Commit(); err != nil {
return etcd.FromEtcdError(err)
}
_, err := w.e.Get(ctx, etcdWalsDataKey, 0)
_, err := d.e.Get(ctx, etcdWalsDataKey, 0)
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
@ -1137,7 +941,7 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
return nil
}
w.log.Infof("no data found in etcd, initializing")
d.log.Infof("no data found in etcd, initializing")
// walsdata not found in etcd
@ -1148,8 +952,8 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
lastCommittedStorageWalElem := lastCommittedStorageWalsRing
lastCommittedStorageWalSequence := ""
wroteWals := 0
for wal := range w.ListOSTWals("") {
w.log.Infof("wal: %s", wal)
for wal := range d.ListOSTWals("") {
d.log.Debugf("wal: %s", wal)
if wal.Err != nil {
return wal.Err
}
@ -1205,7 +1009,7 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj)))
then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence))
txn = w.e.Client().Txn(ctx).If(cmp...).Then(then...)
txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...)
tresp, err := txn.Commit()
if err != nil {
return etcd.FromEtcdError(err)
@ -1216,89 +1020,3 @@ func (w *WalManager) InitEtcd(ctx context.Context) error {
return nil
}
type CheckpointFunc func(action *Action) error
type DataToPathFunc func(dataType string, id string) string
func NoOpDataToPath(dataType string, id string) string {
return ""
}
type WalManagerConfig struct {
BasePath string
E *etcd.Store
OST *objectstorage.ObjStorage
EtcdWalsKeepNum int
CheckpointFunc CheckpointFunc
DataToPathFunc DataToPathFunc
}
type WalManager struct {
basePath string
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
changes *WalChanges
etcdWalsKeepNum int
checkpointFunc CheckpointFunc
dataToPathFunc DataToPathFunc
}
func NewWalManager(ctx context.Context, logger *zap.Logger, conf *WalManagerConfig) (*WalManager, error) {
if conf.EtcdWalsKeepNum == 0 {
conf.EtcdWalsKeepNum = DefaultEtcdWalsKeepNum
}
if conf.EtcdWalsKeepNum < 1 {
return nil, errors.New("etcdWalsKeepNum must be greater than 0")
}
dataToPathFunc := conf.DataToPathFunc
if dataToPathFunc == nil {
dataToPathFunc = NoOpDataToPath
}
w := &WalManager{
basePath: conf.BasePath,
log: logger.Sugar(),
e: conf.E,
ost: conf.OST,
etcdWalsKeepNum: conf.EtcdWalsKeepNum,
changes: NewWalChanges(),
checkpointFunc: conf.CheckpointFunc,
dataToPathFunc: dataToPathFunc,
}
// add trailing slash the basepath
if w.basePath != "" && !strings.HasSuffix(w.basePath, "/") {
w.basePath = w.basePath + "/"
}
return w, nil
}
func (w *WalManager) Run(ctx context.Context, readyCh chan struct{}) error {
for {
err := w.InitEtcd(ctx)
if err == nil {
break
}
w.log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
}
readyCh <- struct{}{}
go w.watcherLoop(ctx)
go w.syncLoop(ctx)
go w.checkpointLoop(ctx)
go w.walCleanerLoop(ctx)
go w.compactChangeGroupsLoop(ctx)
go w.etcdPingerLoop(ctx)
select {
case <-ctx.Done():
w.log.Infof("walmanager exiting")
return nil
}
}

View File

@ -26,9 +26,15 @@ import (
var ErrNotExist = errors.New("does not exist")
type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
}
type Storage interface {
Stat(filepath string) (*ObjectInfo, error)
ReadObject(filepath string) (io.ReadCloser, error)
ReadObject(filepath string) (ReadSeekCloser, error)
WriteObject(filepath string, data io.Reader) error
DeleteObject(filepath string) error
List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo

View File

@ -251,7 +251,7 @@ func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) {
return &ObjectInfo{Path: p, LastModified: fi.ModTime()}, nil
}
func (s *PosixStorage) ReadObject(p string) (io.ReadCloser, error) {
func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) {
fspath, err := s.fsPath(p)
if err != nil {
return nil, err

View File

@ -73,7 +73,7 @@ func (s *S3Storage) Stat(p string) (*ObjectInfo, error) {
return &ObjectInfo{Path: p, LastModified: oi.LastModified}, nil
}
func (s *S3Storage) ReadObject(filepath string) (io.ReadCloser, error) {
func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) {
if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil {
merr := minio.ToErrorResponse(err)
if merr.StatusCode == http.StatusNotFound {

View File

@ -19,11 +19,11 @@ import (
"encoding/json"
"path"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/services/configstore/readdb"
"github.com/sorintlab/agola/internal/services/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
@ -33,14 +33,14 @@ import (
type CommandHandler struct {
log *zap.SugaredLogger
readDB *readdb.ReadDB
wal *wal.WalManager
dm *datamanager.DataManager
}
func NewCommandHandler(logger *zap.Logger, readDB *readdb.ReadDB, wal *wal.WalManager) *CommandHandler {
func NewCommandHandler(logger *zap.Logger, readDB *readdb.ReadDB, dm *datamanager.DataManager) *CommandHandler {
return &CommandHandler{
log: logger.Sugar(),
readDB: readDB,
wal: wal,
dm: dm,
}
}
@ -52,7 +52,7 @@ func (s *CommandHandler) CreateProjectGroup(ctx context.Context, projectGroup *t
return nil, util.NewErrBadRequest(errors.Errorf("project group parent id required"))
}
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -106,16 +106,16 @@ func (s *CommandHandler) CreateProjectGroup(ctx context.Context, projectGroup *t
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal projectGroup")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeProjectGroup),
ID: projectGroup.ID,
Data: pcj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return projectGroup, err
}
@ -127,7 +127,7 @@ func (s *CommandHandler) CreateProject(ctx context.Context, project *types.Proje
return nil, util.NewErrBadRequest(errors.Errorf("project parent id required"))
}
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -182,23 +182,23 @@ func (s *CommandHandler) CreateProject(ctx context.Context, project *types.Proje
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal project")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeProject),
ID: project.ID,
Data: pcj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return project, err
}
func (s *CommandHandler) DeleteProject(ctx context.Context, projectRef string) error {
var project *types.Project
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -230,15 +230,15 @@ func (s *CommandHandler) DeleteProject(ctx context.Context, projectRef string) e
}
// TODO(sgotti) delete project secrets/variables
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypeDelete,
ActionType: datamanager.ActionTypeDelete,
DataType: string(types.ConfigTypeProject),
ID: project.ID,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}
@ -253,7 +253,7 @@ func (s *CommandHandler) CreateUser(ctx context.Context, req *CreateUserRequest)
return nil, util.NewErrBadRequest(errors.Errorf("user name required"))
}
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{req.UserName}
var rs *types.RemoteSource
@ -335,29 +335,29 @@ func (s *CommandHandler) CreateUser(ctx context.Context, req *CreateUserRequest)
return nil, errors.Wrapf(err, "failed to marshal project group")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeProjectGroup),
ID: pg.ID,
Data: pgj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return user, err
}
func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error {
var user *types.User
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{user.UserName}
// must do all the check in a single transaction to avoid concurrent changes
@ -382,9 +382,9 @@ func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error
return err
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypeDelete,
ActionType: datamanager.ActionTypeDelete,
DataType: string(types.ConfigTypeUser),
ID: user.ID,
},
@ -392,7 +392,7 @@ func (s *CommandHandler) DeleteUser(ctx context.Context, userName string) error
// changegroup is the username (and in future the email) to ensure no
// concurrent user creation/modification using the same name
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}
@ -417,7 +417,7 @@ func (s *CommandHandler) CreateUserLA(ctx context.Context, req *CreateUserLARequ
var user *types.User
var rs *types.RemoteSource
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -477,16 +477,16 @@ func (s *CommandHandler) CreateUserLA(ctx context.Context, req *CreateUserLARequ
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal user")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return la, err
}
@ -500,7 +500,7 @@ func (s *CommandHandler) DeleteUserLA(ctx context.Context, userName, laID string
var user *types.User
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -536,16 +536,16 @@ func (s *CommandHandler) DeleteUserLA(ctx context.Context, userName, laID string
if err != nil {
return errors.Wrapf(err, "failed to marshal user")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}
@ -567,7 +567,7 @@ func (s *CommandHandler) UpdateUserLA(ctx context.Context, req *UpdateUserLARequ
var user *types.User
var rs *types.RemoteSource
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -616,16 +616,16 @@ func (s *CommandHandler) UpdateUserLA(ctx context.Context, req *UpdateUserLARequ
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal user")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return la, err
}
@ -636,7 +636,7 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam
var user *types.User
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -677,9 +677,9 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam
if err != nil {
return "", errors.Wrapf(err, "failed to marshal user")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
@ -687,7 +687,7 @@ func (s *CommandHandler) CreateUserToken(ctx context.Context, userName, tokenNam
}
// changegroup is the userid
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return token, err
}
@ -701,7 +701,7 @@ func (s *CommandHandler) DeleteUserToken(ctx context.Context, userName, tokenNam
var user *types.User
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -737,16 +737,16 @@ func (s *CommandHandler) DeleteUserToken(ctx context.Context, userName, tokenNam
if err != nil {
return errors.Wrapf(err, "failed to marshal user")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeUser),
ID: user.ID,
Data: userj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}
@ -755,7 +755,7 @@ func (s *CommandHandler) CreateRemoteSource(ctx context.Context, remoteSource *t
return nil, util.NewErrBadRequest(errors.Errorf("remotesource name required"))
}
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{remoteSource.Name}
// must do all the check in a single transaction to avoid concurrent changes
@ -786,23 +786,23 @@ func (s *CommandHandler) CreateRemoteSource(ctx context.Context, remoteSource *t
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal remotesource")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeRemoteSource),
ID: remoteSource.ID,
Data: rsj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return remoteSource, err
}
func (s *CommandHandler) DeleteRemoteSource(ctx context.Context, remoteSourceName string) error {
var remoteSource *types.RemoteSource
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{remoteSource.ID}
// must do all the check in a single transaction to avoid concurrent changes
@ -827,16 +827,16 @@ func (s *CommandHandler) DeleteRemoteSource(ctx context.Context, remoteSourceNam
return err
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypeDelete,
ActionType: datamanager.ActionTypeDelete,
DataType: string(types.ConfigTypeRemoteSource),
ID: remoteSource.ID,
},
}
// changegroup is all the remote sources
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}
@ -845,7 +845,7 @@ func (s *CommandHandler) CreateOrg(ctx context.Context, org *types.Organization)
return nil, util.NewErrBadRequest(errors.Errorf("org name required"))
}
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{org.Name}
// must do all the check in a single transaction to avoid concurrent changes
@ -887,22 +887,22 @@ func (s *CommandHandler) CreateOrg(ctx context.Context, org *types.Organization)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal project group")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeOrg),
ID: org.ID,
Data: orgj,
},
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeProjectGroup),
ID: pg.ID,
Data: pgj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return org, err
}
@ -910,7 +910,7 @@ func (s *CommandHandler) DeleteOrg(ctx context.Context, orgName string) error {
var org *types.Organization
var projects []*types.Project
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{orgName}
// must do all the check in a single transaction to avoid concurrent changes
@ -936,23 +936,23 @@ func (s *CommandHandler) DeleteOrg(ctx context.Context, orgName string) error {
return err
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypeDelete,
ActionType: datamanager.ActionTypeDelete,
DataType: string(types.ConfigTypeOrg),
ID: org.ID,
},
}
// delete all org projects
for _, project := range projects {
actions = append(actions, &wal.Action{
ActionType: wal.ActionTypeDelete,
actions = append(actions, &datamanager.Action{
ActionType: datamanager.ActionTypeDelete,
DataType: string(types.ConfigTypeProject),
ID: project.ID,
})
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}
@ -979,7 +979,7 @@ func (s *CommandHandler) CreateSecret(ctx context.Context, secret *types.Secret)
return nil, util.NewErrBadRequest(errors.Errorf("invalid secret parent type %q", secret.Parent.Type))
}
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{secret.Name}
// must do all the check in a single transaction to avoid concurrent changes
@ -1017,23 +1017,23 @@ func (s *CommandHandler) CreateSecret(ctx context.Context, secret *types.Secret)
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal secret")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeSecret),
ID: secret.ID,
Data: secretj,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return secret, err
}
func (s *CommandHandler) DeleteSecret(ctx context.Context, parentType types.ConfigType, parentRef, secretName string) error {
var secret *types.Secret
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -1064,15 +1064,15 @@ func (s *CommandHandler) DeleteSecret(ctx context.Context, parentType types.Conf
return err
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypeDelete,
ActionType: datamanager.ActionTypeDelete,
DataType: string(types.ConfigTypeSecret),
ID: secret.ID,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}
@ -1093,7 +1093,7 @@ func (s *CommandHandler) CreateVariable(ctx context.Context, variable *types.Var
return nil, util.NewErrBadRequest(errors.Errorf("invalid variable parent type %q", variable.Parent.Type))
}
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
cgNames := []string{variable.Name}
// must do all the check in a single transaction to avoid concurrent changes
@ -1131,23 +1131,23 @@ func (s *CommandHandler) CreateVariable(ctx context.Context, variable *types.Var
if err != nil {
return nil, errors.Wrapf(err, "failed to marshal variable")
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypePut,
ActionType: datamanager.ActionTypePut,
DataType: string(types.ConfigTypeVariable),
ID: variable.ID,
Data: variablej,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return variable, err
}
func (s *CommandHandler) DeleteVariable(ctx context.Context, parentType types.ConfigType, parentRef, variableName string) error {
var variable *types.Variable
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
// must do all the check in a single transaction to avoid concurrent changes
err := s.readDB.Do(func(tx *db.Tx) error {
@ -1177,14 +1177,14 @@ func (s *CommandHandler) DeleteVariable(ctx context.Context, parentType types.Co
return err
}
actions := []*wal.Action{
actions := []*datamanager.Action{
{
ActionType: wal.ActionTypeDelete,
ActionType: datamanager.ActionTypeDelete,
DataType: string(types.ConfigTypeVariable),
ID: variable.ID,
},
}
_, err = s.wal.WriteWal(ctx, actions, cgt)
_, err = s.dm.WriteWal(ctx, actions, cgt)
return err
}

View File

@ -15,103 +15,14 @@
package common
import (
"fmt"
"net/url"
"path"
"strings"
"github.com/sorintlab/agola/internal/services/types"
)
var (
// Storage paths. Always use path (not filepath) to use the "/" separator
StorageDataDir = "data"
StorageUsersDir = path.Join(StorageDataDir, "users")
StorageOrgsDir = path.Join(StorageDataDir, "orgs")
StorageProjectsDir = path.Join(StorageDataDir, "projects")
StorageProjectGroupsDir = path.Join(StorageDataDir, "projectgroups")
StorageRemoteSourcesDir = path.Join(StorageDataDir, "remotesources")
StorageSecretsDir = path.Join(StorageDataDir, "secrets")
StorageVariablesDir = path.Join(StorageDataDir, "variables")
)
const (
etcdWalsMinRevisionRange = 100
)
func StorageUserFile(userID string) string {
return path.Join(StorageUsersDir, userID)
}
func StorageOrgFile(orgID string) string {
return path.Join(StorageOrgsDir, orgID)
}
func StorageProjectGroupFile(projectGroupID string) string {
return path.Join(StorageProjectGroupsDir, projectGroupID)
}
func StorageProjectFile(projectID string) string {
return path.Join(StorageProjectsDir, projectID)
}
func StorageRemoteSourceFile(userID string) string {
return path.Join(StorageRemoteSourcesDir, userID)
}
func StorageSecretFile(secretID string) string {
return path.Join(StorageSecretsDir, secretID)
}
func StorageVariableFile(variableID string) string {
return path.Join(StorageVariablesDir, variableID)
}
func PathToTypeID(p string) (types.ConfigType, string) {
var configType types.ConfigType
switch path.Dir(p) {
case StorageUsersDir:
configType = types.ConfigTypeUser
case StorageOrgsDir:
configType = types.ConfigTypeOrg
case StorageProjectGroupsDir:
configType = types.ConfigTypeProjectGroup
case StorageProjectsDir:
configType = types.ConfigTypeProject
case StorageRemoteSourcesDir:
configType = types.ConfigTypeRemoteSource
case StorageSecretsDir:
configType = types.ConfigTypeSecret
case StorageVariablesDir:
configType = types.ConfigTypeVariable
default:
panic(fmt.Errorf("cannot determine configtype for path: %q", p))
}
return configType, path.Base(p)
}
func DataToPathFunc(dataType string, id string) string {
switch types.ConfigType(dataType) {
case types.ConfigTypeUser:
return StorageUserFile(id)
case types.ConfigTypeOrg:
return StorageOrgFile(id)
case types.ConfigTypeProjectGroup:
return StorageProjectGroupFile(id)
case types.ConfigTypeProject:
return StorageProjectFile(id)
case types.ConfigTypeRemoteSource:
return StorageRemoteSourceFile(id)
case types.ConfigTypeSecret:
return StorageSecretFile(id)
case types.ConfigTypeVariable:
return StorageVariableFile(id)
}
panic(fmt.Errorf("unknown data type %q", dataType))
}
type RefType int
const (

View File

@ -21,16 +21,16 @@ import (
"path/filepath"
scommon "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/etcd"
slog "github.com/sorintlab/agola/internal/log"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/config"
"github.com/sorintlab/agola/internal/services/configstore/api"
"github.com/sorintlab/agola/internal/services/configstore/command"
"github.com/sorintlab/agola/internal/services/configstore/common"
"github.com/sorintlab/agola/internal/services/configstore/readdb"
"github.com/sorintlab/agola/internal/services/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
ghandlers "github.com/gorilla/handlers"
"github.com/gorilla/mux"
@ -45,7 +45,7 @@ var log = logger.Sugar()
type ConfigStore struct {
c *config.ConfigStore
e *etcd.Store
wal *wal.WalManager
dm *datamanager.DataManager
readDB *readdb.ReadDB
ost *objectstorage.ObjStorage
ch *command.CommandHandler
@ -72,24 +72,32 @@ func NewConfigStore(ctx context.Context, c *config.ConfigStore) (*ConfigStore, e
ost: ost,
}
walConf := &wal.WalManagerConfig{
E: e,
OST: ost,
DataToPathFunc: common.DataToPathFunc,
dmConf := &datamanager.DataManagerConfig{
E: e,
OST: ost,
DataTypes: []string{
string(types.ConfigTypeUser),
string(types.ConfigTypeOrg),
string(types.ConfigTypeProjectGroup),
string(types.ConfigTypeProject),
string(types.ConfigTypeRemoteSource),
string(types.ConfigTypeSecret),
string(types.ConfigTypeVariable),
},
}
wal, err := wal.NewWalManager(ctx, logger, walConf)
dm, err := datamanager.NewDataManager(ctx, logger, dmConf)
if err != nil {
return nil, err
}
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, wal)
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm)
if err != nil {
return nil, err
}
cs.wal = wal
cs.dm = dm
cs.readDB = readDB
ch := command.NewCommandHandler(logger, readDB, wal)
ch := command.NewCommandHandler(logger, readDB, dm)
cs.ch = ch
return cs, nil
@ -97,12 +105,12 @@ func NewConfigStore(ctx context.Context, c *config.ConfigStore) (*ConfigStore, e
func (s *ConfigStore) Run(ctx context.Context) error {
errCh := make(chan error)
walReadyCh := make(chan struct{})
dmReadyCh := make(chan struct{})
go func() { errCh <- s.wal.Run(ctx, walReadyCh) }()
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
// wait for wal to be ready
<-walReadyCh
// wait for dm to be ready
<-dmReadyCh
go func() { errCh <- s.readDB.Run(ctx) }()

View File

@ -19,20 +19,18 @@ import (
"database/sql"
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"time"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/sequence"
"github.com/sorintlab/agola/internal/services/configstore/common"
"github.com/sorintlab/agola/internal/services/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
sq "github.com/Masterminds/squirrel"
"github.com/pkg/errors"
@ -59,13 +57,13 @@ type ReadDB struct {
e *etcd.Store
rdb *db.DB
ost *objectstorage.ObjStorage
wal *wal.WalManager
dm *datamanager.DataManager
Initialized bool
initMutex sync.Mutex
}
func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) {
func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) {
if err := os.MkdirAll(dataDir, 0770); err != nil {
return nil, err
}
@ -85,7 +83,7 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.
rdb: rdb,
e: e,
ost: ost,
wal: wal,
dm: dm,
}
return readDB, nil
@ -125,65 +123,44 @@ func (r *ReadDB) ResetDB() error {
return nil
}
func (r *ReadDB) SyncFromFiles() (string, error) {
doneCh := make(chan struct{})
defer close(doneCh)
var lastCheckpointedWal string
// Get last checkpointed wal from lts
for wal := range r.wal.ListOSTWals("") {
if wal.Err != nil {
return "", wal.Err
func (r *ReadDB) SyncFromDump() (string, error) {
dumpIndex, err := r.dm.GetLastDataStatus()
if err != nil && err != objectstorage.ErrNotExist {
return "", errors.WithStack(err)
}
if err == objectstorage.ErrNotExist {
return "", nil
}
for dataType, files := range dumpIndex.Files {
dumpf, err := r.ost.ReadObject(files[0])
if err != nil {
return "", errors.WithStack(err)
}
if wal.Checkpointed {
lastCheckpointedWal = wal.WalSequence
dumpEntries := []*datamanager.DataEntry{}
dec := json.NewDecoder(dumpf)
for {
var de *datamanager.DataEntry
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil {
dumpf.Close()
return "", err
}
dumpEntries = append(dumpEntries, de)
}
}
dumpf.Close()
doneCh = make(chan struct{})
haveConfigFiles := false
for object := range r.wal.List(common.StorageDataDir, "", true, doneCh) {
if object.Err != nil {
close(doneCh)
return "", object.Err
}
haveConfigFiles = true
break
}
close(doneCh)
if lastCheckpointedWal == "" && haveConfigFiles {
return "", errors.Errorf("no last checkpointed wal in lts but the storage has config files. This should never happen!")
}
if !haveConfigFiles {
return lastCheckpointedWal, nil
}
insertfunc := func(objs []string) error {
err := r.rdb.Do(func(tx *db.Tx) error {
for _, obj := range objs {
f, _, err := r.wal.ReadObject(obj, nil)
if err != nil {
if err == objectstorage.ErrNotExist {
r.log.Warnf("object %s disappeared, ignoring", obj)
}
return err
}
data, err := ioutil.ReadAll(f)
if err != nil {
f.Close()
return err
}
f.Close()
configType, id := common.PathToTypeID(obj)
action := &wal.Action{
ActionType: wal.ActionTypePut,
DataType: string(configType),
ID: id,
Data: data,
err = r.rdb.Do(func(tx *db.Tx) error {
for _, de := range dumpEntries {
action := &datamanager.Action{
ActionType: datamanager.ActionTypePut,
ID: de.ID,
DataType: dataType,
Data: de.Data,
}
if err := r.applyAction(tx, action); err != nil {
return err
@ -191,59 +168,24 @@ func (r *ReadDB) SyncFromFiles() (string, error) {
}
return nil
})
return err
}
objs := []string{}
count := 0
doneCh = make(chan struct{})
defer close(doneCh)
// file may have changed in the meantime (due to checkpointing) but we don't
// need to have a consistent snapshot since we'll apply all the wals and handle
// them
for object := range r.wal.List(common.StorageDataDir, "", true, doneCh) {
if object.Err != nil {
return "", object.Err
}
objs = append(objs, object.Path)
if count > 100 {
if err := insertfunc(objs); err != nil {
return "", err
}
count = 0
objs = []string{}
} else {
count++
if err != nil {
return "", err
}
}
if err := insertfunc(objs); err != nil {
return "", err
}
// save the wal sequence of the last checkpointed wal before syncing from files
err := r.rdb.Do(func(tx *db.Tx) error {
return r.insertCommittedWalSequence(tx, lastCheckpointedWal)
})
if err != nil {
return "", err
}
return lastCheckpointedWal, nil
return dumpIndex.WalSequence, nil
}
func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) {
insertfunc := func(walFiles []*wal.WalFile) error {
insertfunc := func(walFiles []*datamanager.WalFile) error {
err := r.rdb.Do(func(tx *db.Tx) error {
for _, walFile := range walFiles {
walFilef, err := r.wal.ReadWal(walFile.WalSequence)
walFilef, err := r.dm.ReadWal(walFile.WalSequence)
if err != nil {
return err
}
dec := json.NewDecoder(walFilef)
var header *wal.WalHeader
var header *datamanager.WalHeader
if err = dec.Decode(&header); err != nil && err != io.EOF {
walFilef.Close()
return err
@ -262,13 +204,13 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) {
}
lastWalSeq := startWalSeq
walFiles := []*wal.WalFile{}
walFiles := []*datamanager.WalFile{}
count := 0
doneCh := make(chan struct{})
defer close(doneCh)
for walFile := range r.wal.ListOSTWals(startWalSeq) {
for walFile := range r.dm.ListOSTWals(startWalSeq) {
if walFile.Err != nil {
return "", walFile.Err
}
@ -281,7 +223,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) {
return "", err
}
count = 0
walFiles = []*wal.WalFile{}
walFiles = []*datamanager.WalFile{}
} else {
count++
}
@ -308,7 +250,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
return err
}
lastCommittedStorageWal, _, err := r.wal.LastCommittedStorageWal(ctx)
lastCommittedStorageWal, _, err := r.dm.LastCommittedStorageWal(ctx)
if err != nil {
return err
}
@ -318,7 +260,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
doFullSync = true
r.log.Warn("no startWalSeq in db, doing a full sync")
} else {
ok, err := r.wal.HasOSTWal(curWalSeq)
ok, err := r.dm.HasOSTWal(curWalSeq)
if err != nil {
return err
}
@ -349,15 +291,15 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
}
if doFullSync {
r.log.Infof("doing a full sync from lts files")
r.log.Infof("doing a full sync from dump")
if err := r.ResetDB(); err != nil {
return err
}
var err error
curWalSeq, err = r.SyncFromFiles()
curWalSeq, err = r.SyncFromDump()
if err != nil {
return err
return errors.WithStack(err)
}
}
@ -377,7 +319,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
// from wals on objectstorage is >=
// if not (this happens when syncFromWals takes some time and in the meantime
// many new wals are written, the next sync should be faster and able to continue
firstAvailableWalData, revision, err := r.wal.FirstAvailableWalData(ctx)
firstAvailableWalData, revision, err := r.dm.FirstAvailableWalData(ctx)
if err != nil {
return errors.Wrap(err, "failed to get first available wal data")
}
@ -401,14 +343,14 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
}
// use the same revision as previous operation
for walElement := range r.wal.ListEtcdWals(ctx, revision) {
for walElement := range r.dm.ListEtcdWals(ctx, revision) {
if walElement.Err != nil {
return err
}
if walElement.WalData.WalSequence <= curWalSeq {
continue
}
//if walElement.WalData.WalStatus == wal.WalStatusCommittedStorage {
//if walElement.WalData.WalStatus == datamanager.WalStatusCommittedStorage {
if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil {
return err
@ -416,7 +358,7 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
//}
//// update readdb only when the wal has been committed to objectstorage
//if walElement.WalData.WalStatus != wal.WalStatusCommittedStorage {
//if walElement.WalData.WalStatus != datamanager.WalStatusCommittedStorage {
// return nil
//}
@ -494,12 +436,12 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error {
wctx, cancel := context.WithCancel(ctx)
defer cancel()
r.log.Infof("revision: %d", revision)
wch := r.wal.Watch(wctx, revision+1)
wch := r.dm.Watch(wctx, revision+1)
for we := range wch {
r.log.Debugf("we: %s", util.Dump(we))
if we.Err != nil {
err := we.Err
if err == wal.ErrCompacted {
if err == datamanager.ErrCompacted {
r.log.Warnf("required events already compacted, reinitializing readdb")
r.Initialized = false
return nil
@ -558,7 +500,7 @@ func (r *ReadDB) HandleEvents(ctx context.Context) error {
return nil
}
func (r *ReadDB) handleEvent(tx *db.Tx, we *wal.WatchElement) error {
func (r *ReadDB) handleEvent(tx *db.Tx, we *datamanager.WatchElement) error {
//r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//key := string(ev.Kv.Key)
@ -568,7 +510,7 @@ func (r *ReadDB) handleEvent(tx *db.Tx, we *wal.WatchElement) error {
return nil
}
func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error {
func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error {
// update readdb only when the wal has been committed to objectstorage
//if we.WalData.WalStatus != wal.WalStatusCommittedStorage {
// return nil
@ -594,7 +536,7 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error {
}
func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
walFile, err := r.wal.ReadWalData(walDataFileID)
walFile, err := r.dm.ReadWalData(walDataFileID)
if err != nil {
return errors.Wrapf(err, "cannot read wal data file %q", walDataFileID)
}
@ -602,7 +544,7 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
dec := json.NewDecoder(walFile)
for {
var action *wal.Action
var action *datamanager.Action
err := dec.Decode(&action)
if err == io.EOF {
@ -621,9 +563,9 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
return nil
}
func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error {
func (r *ReadDB) applyAction(tx *db.Tx, action *datamanager.Action) error {
switch action.ActionType {
case wal.ActionTypePut:
case datamanager.ActionTypePut:
switch types.ConfigType(action.DataType) {
case types.ConfigTypeUser:
if err := r.insertUser(tx, action.Data); err != nil {
@ -655,7 +597,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error {
}
}
case wal.ActionTypeDelete:
case datamanager.ActionTypeDelete:
switch types.ConfigType(action.DataType) {
case types.ConfigTypeUser:
r.log.Debugf("deleting user with id: %s", action.ID)
@ -799,7 +741,7 @@ func (r *ReadDB) insertChangeGroupRevision(tx *db.Tx, changegroup string, revisi
return nil
}
func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*wal.ChangeGroupsUpdateToken, error) {
func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*datamanager.ChangeGroupsUpdateToken, error) {
s := changegrouprevisionSelect.Where(sq.Eq{"id": groups})
q, args, err := s.ToSql()
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
@ -823,7 +765,7 @@ func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*wal.C
}
}
return &wal.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
return &datamanager.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
}
func fetchChangeGroupsRevision(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) {

View File

@ -32,7 +32,7 @@ import (
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/gorilla/mux"
"github.com/pkg/errors"
@ -102,15 +102,15 @@ type LogsHandler struct {
log *zap.SugaredLogger
e *etcd.Store
ost *objectstorage.ObjStorage
wal *wal.WalManager
dm *datamanager.DataManager
}
func NewLogsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) *LogsHandler {
func NewLogsHandler(logger *zap.Logger, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *LogsHandler {
return &LogsHandler{
log: logger.Sugar(),
e: e,
ost: ost,
wal: wal,
dm: dm,
}
}
@ -178,7 +178,7 @@ func (h *LogsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, setup bool, step int, w http.ResponseWriter, follow, stream bool) (error, bool) {
r, err := store.GetRunEtcdOrOST(ctx, h.e, h.wal, runID)
r, err := store.GetRunEtcdOrOST(ctx, h.e, h.dm, runID)
if err != nil {
return err, true
}
@ -340,15 +340,15 @@ type RunResponse struct {
type RunHandler struct {
log *zap.SugaredLogger
e *etcd.Store
wal *wal.WalManager
dm *datamanager.DataManager
readDB *readdb.ReadDB
}
func NewRunHandler(logger *zap.Logger, e *etcd.Store, wal *wal.WalManager, readDB *readdb.ReadDB) *RunHandler {
func NewRunHandler(logger *zap.Logger, e *etcd.Store, dm *datamanager.DataManager, readDB *readdb.ReadDB) *RunHandler {
return &RunHandler{
log: logger.Sugar(),
e: e,
wal: wal,
dm: dm,
readDB: readDB,
}
}
@ -364,7 +364,7 @@ func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if run == nil {
run, err = store.OSTGetRun(h.wal, runID)
run, err = store.OSTGetRun(h.dm, runID)
if err != nil && err != objectstorage.ErrNotExist {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
@ -375,7 +375,7 @@ func (h *RunHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
rc, err := store.OSTGetRunConfig(h.wal, run.ID)
rc, err := store.OSTGetRunConfig(h.dm, run.ID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return

View File

@ -20,6 +20,7 @@ import (
"path"
"time"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
@ -30,7 +31,6 @@ import (
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
"github.com/pkg/errors"
"go.uber.org/zap"
@ -41,16 +41,16 @@ type CommandHandler struct {
e *etcd.Store
readDB *readdb.ReadDB
ost *objectstorage.ObjStorage
wal *wal.WalManager
dm *datamanager.DataManager
}
func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, wal *wal.WalManager) *CommandHandler {
func NewCommandHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *CommandHandler {
return &CommandHandler{
log: logger.Sugar(),
e: e,
readDB: readDB,
ost: ost,
wal: wal,
dm: dm,
}
}
@ -218,12 +218,12 @@ func (s *CommandHandler) recreateRun(ctx context.Context, req *RunCreateRequest)
// fetch the existing runconfig and run
s.log.Infof("creating run from existing run")
rc, err := store.OSTGetRunConfig(s.wal, req.RunID)
rc, err := store.OSTGetRunConfig(s.dm, req.RunID)
if err != nil {
return nil, util.NewErrBadRequest(errors.Wrapf(err, "runconfig %q doens't exist", req.RunID))
}
run, err := store.GetRunEtcdOrOST(ctx, s.e, s.wal, req.RunID)
run, err := store.GetRunEtcdOrOST(ctx, s.e, s.dm, req.RunID)
if err != nil {
return nil, err
}
@ -379,7 +379,7 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg
run.EnqueueTime = util.TimePtr(time.Now())
actions := []*wal.Action{}
actions := []*datamanager.Action{}
// persist group counter
rca, err := store.OSTUpdateRunCounterAction(ctx, c, run.Group)
@ -395,7 +395,7 @@ func (s *CommandHandler) saveRun(ctx context.Context, rb *types.RunBundle, runcg
}
actions = append(actions, rca)
if _, err = s.wal.WriteWal(ctx, actions, cgt); err != nil {
if _, err = s.dm.WriteWal(ctx, actions, cgt); err != nil {
return err
}
@ -531,7 +531,7 @@ func (s *CommandHandler) DeleteExecutor(ctx context.Context, executorID string)
return nil
}
func (s *CommandHandler) getRunCounter(group string) (uint64, *wal.ChangeGroupsUpdateToken, error) {
func (s *CommandHandler) getRunCounter(group string) (uint64, *datamanager.ChangeGroupsUpdateToken, error) {
// use the first group dir after the root
pl := util.PathList(group)
if len(pl) < 2 {
@ -539,7 +539,7 @@ func (s *CommandHandler) getRunCounter(group string) (uint64, *wal.ChangeGroupsU
}
var c uint64
var cgt *wal.ChangeGroupsUpdateToken
var cgt *datamanager.ChangeGroupsUpdateToken
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
c, err = s.readDB.GetRunCounterOST(tx, pl[1])

View File

@ -15,7 +15,6 @@
package common
import (
"fmt"
"path"
)
@ -75,18 +74,6 @@ const (
etcdWalsMinRevisionRange = 100
)
func StorageRunFile(runID string) string {
return path.Join(StorageRunsDir, runID)
}
func StorageRunConfigFile(runID string) string {
return path.Join(StorageRunsConfigDir, runID)
}
func StorageRunCounterFile(group string) string {
return path.Join(StorageCountersDir, group)
}
type DataType string
const (
@ -94,16 +81,3 @@ const (
DataTypeRunConfig DataType = "runconfig"
DataTypeRunCounter DataType = "runcounter"
)
func DataToPathFunc(dataType string, id string) string {
switch DataType(dataType) {
case DataTypeRun:
return StorageRunFile(id)
case DataTypeRunConfig:
return StorageRunConfigFile(id)
case DataTypeRunCounter:
return StorageRunCounterFile(id)
}
panic(fmt.Errorf("unknown data type %q", dataType))
}

View File

@ -28,6 +28,7 @@ import (
"sync"
"time"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
@ -36,7 +37,6 @@ import (
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
"go.uber.org/zap"
sq "github.com/Masterminds/squirrel"
@ -94,7 +94,7 @@ type ReadDB struct {
e *etcd.Store
rdb *db.DB
ost *objectstorage.ObjStorage
wal *wal.WalManager
dm *datamanager.DataManager
Initialized bool
initLock sync.Mutex
@ -108,7 +108,7 @@ type ReadDB struct {
dbWriteLock sync.Mutex
}
func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, wal *wal.WalManager) (*ReadDB, error) {
func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) {
if err := os.MkdirAll(dataDir, 0770); err != nil {
return nil, err
}
@ -127,7 +127,7 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.
e: e,
dataDir: dataDir,
ost: ost,
wal: wal,
dm: dm,
rdb: rdb,
}
@ -451,7 +451,7 @@ func (r *ReadDB) handleRunEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdcl
// TODO(sgotti) this is here just to avoid a window where the run is not in
// run table and in the run_os table but should be changed/removed when we'll
// implement run removal
run, err := store.OSTGetRun(r.wal, runID)
run, err := store.OSTGetRun(r.dm, runID)
if err != nil {
return err
}
@ -516,7 +516,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
return err
}
lastCommittedStorageWal, _, err := r.wal.LastCommittedStorageWal(ctx)
lastCommittedStorageWal, _, err := r.dm.LastCommittedStorageWal(ctx)
if err != nil {
return err
}
@ -526,7 +526,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
doFullSync = true
r.log.Warn("no startWalSeq in db, doing a full sync")
} else {
ok, err := r.wal.HasOSTWal(curWalSeq)
ok, err := r.dm.HasOSTWal(curWalSeq)
if err != nil {
return err
}
@ -557,7 +557,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
}
if doFullSync {
r.log.Infof("doing a full sync from objectstorage files")
r.log.Infof("doing a full sync from dump")
if err := r.ResetDB(); err != nil {
return err
}
@ -585,7 +585,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
// from wals on objectstorage is >=
// if not (this happens when syncFromWals takes some time and in the meantime
// many new wals are written, the next sync should be faster and able to continue
firstAvailableWalData, revision, err := r.wal.FirstAvailableWalData(ctx)
firstAvailableWalData, revision, err := r.dm.FirstAvailableWalData(ctx)
if err != nil {
return errors.Wrap(err, "failed to get first available wal data")
}
@ -609,7 +609,7 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
}
// use the same revision as previous operation
for walElement := range r.wal.ListEtcdWals(ctx, revision) {
for walElement := range r.dm.ListEtcdWals(ctx, revision) {
if walElement.Err != nil {
return err
}
@ -634,134 +634,68 @@ func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
}
func (r *ReadDB) SyncFromDump() (string, error) {
type indexHeader struct {
LastWalSequence string
dumpIndex, err := r.dm.GetLastDataStatus()
if err != nil && err != objectstorage.ErrNotExist {
return "", errors.WithStack(err)
}
type indexData struct {
DataType string
Data json.RawMessage
if err == objectstorage.ErrNotExist {
return "", nil
}
for dataType, files := range dumpIndex.Files {
dumpf, err := r.ost.ReadObject(files[0])
if err != nil {
return "", errors.WithStack(err)
}
dumpEntries := []*datamanager.DataEntry{}
dec := json.NewDecoder(dumpf)
for {
var de *datamanager.DataEntry
type indexDataRun struct {
ID string
Phase types.RunPhase
Group string
}
err := dec.Decode(&de)
if err == io.EOF {
// all done
break
}
if err != nil {
dumpf.Close()
return "", err
}
dumpEntries = append(dumpEntries, de)
}
dumpf.Close()
type indexDataRunCounter struct {
Group string
Counter uint64
}
var iheader *indexHeader
insertfunc := func(ids []*indexData) error {
err := r.rdb.Do(func(tx *db.Tx) error {
for _, id := range ids {
switch common.DataType(id.DataType) {
case common.DataTypeRun:
var ir *indexDataRun
if err := json.Unmarshal(id.Data, &ir); err != nil {
return err
}
run := &types.Run{
ID: ir.ID,
Group: ir.Group,
Phase: ir.Phase,
}
r.log.Infof("inserting run %q", run.ID)
if err := r.insertRunOST(tx, run, []byte{}); err != nil {
return err
}
case common.DataTypeRunCounter:
var irc *indexDataRunCounter
if err := json.Unmarshal(id.Data, &irc); err != nil {
return err
}
r.log.Infof("inserting run counter %q, c: %d", irc.Group, irc.Counter)
if err := r.insertRunCounterOST(tx, irc.Group, irc.Counter); err != nil {
return err
}
err = r.rdb.Do(func(tx *db.Tx) error {
for _, de := range dumpEntries {
action := &datamanager.Action{
ActionType: datamanager.ActionTypePut,
ID: de.ID,
DataType: dataType,
Data: de.Data,
}
if err := r.applyAction(tx, action); err != nil {
return err
}
}
return nil
})
return err
}
doneCh := make(chan struct{})
defer close(doneCh)
// get last dump
var dumpPath string
for object := range r.ost.List(path.Join(common.StorageRunsIndexesDir)+"/", "", true, doneCh) {
if object.Err != nil {
return "", object.Err
}
r.log.Infof("path: %s", object.Path)
dumpPath = object.Path
}
if dumpPath == "" {
return "", nil
}
f, err := r.ost.ReadObject(dumpPath)
if err != nil {
if err == objectstorage.ErrNotExist {
r.log.Warnf("object %s disappeared, ignoring", dumpPath)
}
return "", err
}
defer f.Close()
dec := json.NewDecoder(f)
if err := dec.Decode(&iheader); err != nil {
return "", err
}
count := 0
ids := make([]*indexData, 0, paginationSize)
for {
var id *indexData
err := dec.Decode(&id)
if err == io.EOF {
// all done
break
}
if err != nil {
f.Close()
return "", err
}
ids = append(ids, id)
if count > paginationSize {
if err := insertfunc(ids); err != nil {
return "", err
}
count = 0
ids = make([]*indexData, 0, paginationSize)
} else {
count++
}
}
if err := insertfunc(ids); err != nil {
return "", err
}
return iheader.LastWalSequence, nil
return dumpIndex.WalSequence, nil
}
func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) {
insertfunc := func(walFiles []*wal.WalFile) error {
insertfunc := func(walFiles []*datamanager.WalFile) error {
err := r.rdb.Do(func(tx *db.Tx) error {
for _, walFile := range walFiles {
walFilef, err := r.wal.ReadWal(walFile.WalSequence)
walFilef, err := r.dm.ReadWal(walFile.WalSequence)
if err != nil {
return err
}
dec := json.NewDecoder(walFilef)
var header *wal.WalHeader
var header *datamanager.WalHeader
if err = dec.Decode(&header); err != nil && err != io.EOF {
walFilef.Close()
return err
@ -780,13 +714,13 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) {
}
lastWalSeq := startWalSeq
walFiles := []*wal.WalFile{}
walFiles := []*datamanager.WalFile{}
count := 0
doneCh := make(chan struct{})
defer close(doneCh)
for walFile := range r.wal.ListOSTWals(startWalSeq) {
for walFile := range r.dm.ListOSTWals(startWalSeq) {
if walFile.Err != nil {
return "", walFile.Err
}
@ -799,7 +733,7 @@ func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) {
return "", err
}
count = 0
walFiles = []*wal.WalFile{}
walFiles = []*datamanager.WalFile{}
} else {
count++
}
@ -831,12 +765,12 @@ func (r *ReadDB) handleEventsOST(ctx context.Context) error {
wctx, cancel := context.WithCancel(ctx)
defer cancel()
r.log.Infof("revision: %d", revision)
wch := r.wal.Watch(wctx, revision+1)
wch := r.dm.Watch(wctx, revision+1)
for we := range wch {
r.log.Debugf("we: %s", util.Dump(we))
if we.Err != nil {
err := we.Err
if err == wal.ErrCompacted {
if err == datamanager.ErrCompacted {
r.log.Warnf("required events already compacted, reinitializing readdb")
r.Initialized = false
return nil
@ -897,7 +831,7 @@ func (r *ReadDB) handleEventsOST(ctx context.Context) error {
}
func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
walFile, err := r.wal.ReadWalData(walDataFileID)
walFile, err := r.dm.ReadWalData(walDataFileID)
if err != nil {
return errors.Wrapf(err, "cannot read wal data file %q", walDataFileID)
}
@ -905,7 +839,7 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
dec := json.NewDecoder(walFile)
for {
var action *wal.Action
var action *datamanager.Action
err := dec.Decode(&action)
if err == io.EOF {
@ -924,10 +858,10 @@ func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
return nil
}
func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error {
func (r *ReadDB) applyAction(tx *db.Tx, action *datamanager.Action) error {
r.log.Infof("action: dataType: %s, ID: %s", action.DataType, action.ID)
switch action.ActionType {
case wal.ActionTypePut:
case datamanager.ActionTypePut:
switch action.DataType {
case string(common.DataTypeRun):
var run *types.Run
@ -948,7 +882,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error {
}
}
case wal.ActionTypeDelete:
case datamanager.ActionTypeDelete:
switch action.DataType {
case string(common.DataTypeRun):
case string(common.DataTypeRunCounter):
@ -958,7 +892,7 @@ func (r *ReadDB) applyAction(tx *db.Tx, action *wal.Action) error {
return nil
}
func (r *ReadDB) handleEventOST(tx *db.Tx, we *wal.WatchElement) error {
func (r *ReadDB) handleEventOST(tx *db.Tx, we *datamanager.WatchElement) error {
//r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//key := string(ev.Kv.Key)
@ -968,7 +902,7 @@ func (r *ReadDB) handleEventOST(tx *db.Tx, we *wal.WatchElement) error {
return nil
}
func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error {
func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error {
for cgName, cgRev := range we.ChangeGroupsRevisions {
if err := r.insertChangeGroupRevisionOST(tx, cgName, cgRev); err != nil {
return err
@ -977,7 +911,7 @@ func (r *ReadDB) handleWalEvent(tx *db.Tx, we *wal.WatchElement) error {
if we.WalData != nil {
// update readdb only when the wal has been committed to objectstorage
if we.WalData.WalStatus != wal.WalStatusCommitted {
if we.WalData.WalStatus != datamanager.WalStatusCommitted {
return nil
}
@ -1252,7 +1186,7 @@ func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter [
}
// get run from objectstorage
run, err := store.OSTGetRun(r.wal, runID)
run, err := store.OSTGetRun(r.dm, runID)
if err != nil {
return nil, errors.WithStack(err)
}
@ -1498,7 +1432,7 @@ func (r *ReadDB) insertChangeGroupRevisionOST(tx *db.Tx, changegroup string, rev
return nil
}
func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*wal.ChangeGroupsUpdateToken, error) {
func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*datamanager.ChangeGroupsUpdateToken, error) {
s := changegrouprevisionOSTSelect.Where(sq.Eq{"id": groups})
q, args, err := s.ToSql()
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
@ -1522,7 +1456,7 @@ func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*wa
}
}
return &wal.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
return &datamanager.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
}
func fetchChangeGroupsRevisionOST(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) {

View File

@ -22,13 +22,11 @@ import (
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"time"
scommon "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/etcd"
slog "github.com/sorintlab/agola/internal/log"
"github.com/sorintlab/agola/internal/objectstorage"
@ -41,7 +39,6 @@ import (
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
ghandlers "github.com/gorilla/handlers"
"github.com/gorilla/mux"
@ -613,7 +610,7 @@ func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.Exec
if err != nil {
return err
}
rc, err := store.OSTGetRunConfig(s.wal, r.ID)
rc, err := store.OSTGetRunConfig(s.dm, r.ID)
if err != nil {
return errors.Wrapf(err, "cannot get run config %q", r.ID)
}
@ -1163,7 +1160,7 @@ func (s *Scheduler) runsScheduler(ctx context.Context) error {
func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error {
log.Debugf("runScheduler")
rc, err := store.OSTGetRunConfig(s.wal, r.ID)
rc, err := store.OSTGetRunConfig(s.dm, r.ID)
if err != nil {
return errors.Wrapf(err, "cannot get run config %q", r.ID)
}
@ -1271,9 +1268,9 @@ func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error {
return err
}
actions := append([]*wal.Action{ra})
actions := append([]*datamanager.Action{ra})
if _, err = s.wal.WriteWal(ctx, actions, nil); err != nil {
if _, err = s.dm.WriteWal(ctx, actions, nil); err != nil {
return err
}
@ -1285,197 +1282,6 @@ func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error {
return nil
}
func (s *Scheduler) dumpOSTLoop(ctx context.Context) {
for {
log.Debugf("objectstorage dump loop")
// TODO(sgotti) create new dump only after N files
if err := s.dumpOST(ctx); err != nil {
log.Errorf("err: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(10 * time.Second)
}
}
func (s *Scheduler) dumpOST(ctx context.Context) error {
type indexHeader struct {
LastWalSequence string
}
type indexData struct {
DataType string
Data interface{}
}
type indexDataRun struct {
ID string
Group string
Phase types.RunPhase
}
type indexDataRunCounter struct {
Group string
Counter uint64
}
indexDir := strconv.FormatInt(time.Now().UnixNano(), 10)
var lastWalSequence string
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
lastWalSequence, err = s.readDB.GetCommittedWalSequenceOST(tx)
return err
})
if err != nil {
return err
}
data := []byte{}
iheader := &indexHeader{LastWalSequence: lastWalSequence}
ihj, err := json.Marshal(iheader)
if err != nil {
return err
}
data = append(data, ihj...)
var lastRunID string
stop := false
for {
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
lruns, err := s.readDB.GetRunsFilteredOST(tx, nil, false, nil, lastRunID, 1000, types.SortOrderDesc)
if err != nil {
return err
}
if len(lruns) == 0 {
stop = true
} else {
lastRunID = lruns[len(lruns)-1].ID
}
for _, run := range lruns {
id := &indexData{DataType: string(common.DataTypeRun), Data: indexDataRun{ID: run.ID, Group: run.GroupPath, Phase: types.RunPhase(run.Phase)}}
idj, err := json.Marshal(id)
if err != nil {
return err
}
data = append(data, idj...)
}
return nil
})
if err != nil {
return err
}
if stop {
break
}
}
var lastGroup string
stop = false
for {
err := s.readDB.Do(func(tx *db.Tx) error {
var err error
counters, err := s.readDB.GetRunCountersOST(tx, lastGroup, 1000)
if err != nil {
return err
}
if len(counters) == 0 {
stop = true
} else {
lastGroup = counters[len(counters)-1].Group
}
for _, counter := range counters {
id := &indexData{DataType: string(common.DataTypeRunCounter), Data: indexDataRunCounter{Group: counter.Group, Counter: counter.Counter}}
idj, err := json.Marshal(id)
if err != nil {
return err
}
data = append(data, idj...)
}
return nil
})
if err != nil {
return err
}
if stop {
break
}
}
index := path.Join(common.StorageRunsIndexesDir, indexDir, "all")
if err = s.ost.WriteObject(index, bytes.NewReader(data)); err != nil {
return err
}
return nil
}
func (s *Scheduler) dumpOSTCleanerLoop(ctx context.Context) {
for {
log.Infof("objectstorage dump cleaner loop")
if err := s.dumpOSTCleaner(ctx); err != nil {
log.Errorf("err: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(10 * time.Second)
}
}
func (s *Scheduler) dumpOSTCleaner(ctx context.Context) error {
type indexData struct {
ID string
Group string
Phase types.RunPhase
}
// collect all old indexes
objects := []string{}
doneCh := make(chan struct{})
defer close(doneCh)
var indexPath string
for object := range s.ost.List(common.StorageRunsIndexesDir+"/", "", true, doneCh) {
if object.Err != nil {
return object.Err
}
h := util.PathList(object.Path)
if len(h) < 2 {
return errors.Errorf("wrong index dir path %q", object.Path)
}
curIndexPath := object.Path
if curIndexPath > indexPath {
if indexPath != "" {
objects = append(objects, indexPath)
}
indexPath = curIndexPath
} else {
objects = append(objects, curIndexPath)
}
}
for _, object := range objects {
if err := s.ost.DeleteObject(object); err != nil {
log.Errorf("object: %s, err: %v", object, err)
return err
}
}
return nil
}
func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) {
for {
if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil {
@ -1561,7 +1367,7 @@ type Scheduler struct {
c *config.RunServiceScheduler
e *etcd.Store
ost *objectstorage.ObjStorage
wal *wal.WalManager
dm *datamanager.DataManager
readDB *readdb.ReadDB
ch *command.CommandHandler
}
@ -1586,24 +1392,28 @@ func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Schedule
ost: ost,
}
walConf := &wal.WalManagerConfig{
E: e,
OST: ost,
DataToPathFunc: common.DataToPathFunc,
dmConf := &datamanager.DataManagerConfig{
E: e,
OST: ost,
DataTypes: []string{
string(common.DataTypeRun),
string(common.DataTypeRunConfig),
string(common.DataTypeRunCounter),
},
}
wal, err := wal.NewWalManager(ctx, logger, walConf)
dm, err := datamanager.NewDataManager(ctx, logger, dmConf)
if err != nil {
return nil, err
}
s.wal = wal
s.dm = dm
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, wal)
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm)
if err != nil {
return nil, err
}
s.readDB = readDB
ch := command.NewCommandHandler(logger, e, readDB, ost, wal)
ch := command.NewCommandHandler(logger, e, readDB, ost, dm)
s.ch = ch
return s, nil
@ -1626,12 +1436,12 @@ func (s *Scheduler) InitEtcd(ctx context.Context) error {
func (s *Scheduler) Run(ctx context.Context) error {
errCh := make(chan error)
walReadyCh := make(chan struct{})
dmReadyCh := make(chan struct{})
go func() { errCh <- s.wal.Run(ctx, walReadyCh) }()
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
// wait for wal to be ready
<-walReadyCh
// wait for dm to be ready
<-dmReadyCh
for {
err := s.InitEtcd(ctx)
@ -1668,9 +1478,9 @@ func (s *Scheduler) Run(ctx context.Context) error {
// api from clients
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ch)
logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.wal)
logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm)
runHandler := api.NewRunHandler(logger, s.e, s.wal, s.readDB)
runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB)
runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ch)
runsHandler := api.NewRunsHandler(logger, s.readDB)
runActionsHandler := api.NewRunActionsHandler(logger, s.ch)
@ -1714,8 +1524,6 @@ func (s *Scheduler) Run(ctx context.Context) error {
go s.runTasksUpdaterLoop(ctx)
go s.fetcherLoop(ctx)
go s.finishedRunsArchiverLoop(ctx)
go s.dumpOSTLoop(ctx)
go s.dumpOSTCleanerLoop(ctx)
go s.compactChangeGroupsLoop(ctx)
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
go s.executorTaskUpdateHandler(ctx, ch)

View File

@ -22,12 +22,12 @@ import (
"reflect"
"strings"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
"github.com/sorintlab/agola/internal/wal"
"github.com/pkg/errors"
etcdclientv3 "go.etcd.io/etcd/clientv3"
@ -85,16 +85,7 @@ func OSTSubGroupTypes(group string) []string {
return sg
}
func OSTRunCounterPaths(group, runID string, sortOrder types.SortOrder) []string {
paths := []string{}
subGroups := OSTSubGroups(group)
for _, subGroup := range subGroups {
paths = append(paths, common.StorageRunCounterFile(subGroup))
}
return paths
}
func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wal.Action, error) {
func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*datamanager.Action, error) {
// use the first group dir after the root
pl := util.PathList(group)
if len(pl) < 2 {
@ -106,8 +97,8 @@ func OSTUpdateRunCounterAction(ctx context.Context, c uint64, group string) (*wa
return nil, err
}
action := &wal.Action{
ActionType: wal.ActionTypePut,
action := &datamanager.Action{
ActionType: datamanager.ActionTypePut,
DataType: string(common.DataTypeRunCounter),
ID: pl[1],
Data: cj,
@ -145,9 +136,8 @@ func OSTCacheKey(p string) string {
return strings.TrimSuffix(base, path.Ext(base))
}
func OSTGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig, error) {
runConfigPath := common.StorageRunConfigFile(runConfigID)
rcf, _, err := wal.ReadObject(runConfigPath, nil)
func OSTGetRunConfig(dm *datamanager.DataManager, runConfigID string) (*types.RunConfig, error) {
rcf, _, err := dm.ReadObject(string(common.DataTypeRunConfig), runConfigID, nil)
if err != nil {
return nil, err
}
@ -161,14 +151,14 @@ func OSTGetRunConfig(wal *wal.WalManager, runConfigID string) (*types.RunConfig,
return rc, nil
}
func OSTSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) {
func OSTSaveRunConfigAction(rc *types.RunConfig) (*datamanager.Action, error) {
rcj, err := json.Marshal(rc)
if err != nil {
return nil, err
}
action := &wal.Action{
ActionType: wal.ActionTypePut,
action := &datamanager.Action{
ActionType: datamanager.ActionTypePut,
DataType: string(common.DataTypeRunConfig),
ID: rc.ID,
Data: rcj,
@ -177,10 +167,8 @@ func OSTSaveRunConfigAction(rc *types.RunConfig) (*wal.Action, error) {
return action, nil
}
func OSTGetRun(wal *wal.WalManager, runID string) (*types.Run, error) {
runPath := common.StorageRunFile(runID)
rf, _, err := wal.ReadObject(runPath, nil)
func OSTGetRun(dm *datamanager.DataManager, runID string) (*types.Run, error) {
rf, _, err := dm.ReadObject(string(common.DataTypeRun), runID, nil)
if err != nil {
return nil, err
}
@ -194,14 +182,14 @@ func OSTGetRun(wal *wal.WalManager, runID string) (*types.Run, error) {
return r, nil
}
func OSTSaveRunAction(r *types.Run) (*wal.Action, error) {
func OSTSaveRunAction(r *types.Run) (*datamanager.Action, error) {
rj, err := json.Marshal(r)
if err != nil {
return nil, err
}
action := &wal.Action{
ActionType: wal.ActionTypePut,
action := &datamanager.Action{
ActionType: datamanager.ActionTypePut,
DataType: string(common.DataTypeRun),
ID: r.ID,
Data: rj,
@ -501,13 +489,13 @@ func GetRuns(ctx context.Context, e *etcd.Store) ([]*types.Run, error) {
return runs, nil
}
func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, wal *wal.WalManager, runID string) (*types.Run, error) {
func GetRunEtcdOrOST(ctx context.Context, e *etcd.Store, dm *datamanager.DataManager, runID string) (*types.Run, error) {
r, _, err := GetRun(ctx, e, runID)
if err != nil && err != etcd.ErrKeyNotFound {
return nil, err
}
if r == nil {
r, err = OSTGetRun(wal, runID)
r, err = OSTGetRun(dm, runID)
if err != nil && err != objectstorage.ErrNotExist {
return nil, err
}