d2b09d854f
Implement a new error handling library based on pkg/errors. It provides stack saving on wrapping and exports some function to add stack saving also to external errors. It also implements custom zerolog error formatting without adding too much verbosity by just printing the chain error file:line without a full stack trace of every error. * Add a --detailed-errors options to print error with they full chain * Wrap all error returns. Use errors.WithStack to wrap without adding a new messsage and error.Wrap[f] to add a message. * Add golangci-lint wrapcheck to check that external packages errors are wrapped. This won't check that internal packages error are wrapped. But we want also to ensure this case so we'll have to find something else to check also these.
869 lines
24 KiB
Go
869 lines
24 KiB
Go
// Copyright 2019 Sorint.lab
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package readdb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"io"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"agola.io/agola/internal/datamanager"
|
|
"agola.io/agola/internal/db"
|
|
"agola.io/agola/internal/errors"
|
|
"agola.io/agola/internal/etcd"
|
|
"agola.io/agola/internal/objectstorage"
|
|
"agola.io/agola/internal/sequence"
|
|
"agola.io/agola/internal/util"
|
|
"agola.io/agola/services/configstore/types"
|
|
|
|
sq "github.com/Masterminds/squirrel"
|
|
"github.com/rs/zerolog"
|
|
)
|
|
|
|
var (
|
|
// Use postgresql $ placeholder. It'll be converted to ? from the provided db functions
|
|
sb = sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
|
|
|
|
revisionSelect = sb.Select("revision").From("revision")
|
|
revisionInsert = sb.Insert("revision").Columns("revision")
|
|
|
|
committedwalsequenceSelect = sb.Select("seq").From("committedwalsequence")
|
|
committedwalsequenceInsert = sb.Insert("committedwalsequence").Columns("seq")
|
|
|
|
changegrouprevisionSelect = sb.Select("id, revision").From("changegrouprevision")
|
|
changegrouprevisionInsert = sb.Insert("changegrouprevision").Columns("id", "revision")
|
|
)
|
|
|
|
type ReadDB struct {
|
|
log zerolog.Logger
|
|
dataDir string
|
|
e *etcd.Store
|
|
rdb *db.DB
|
|
ost *objectstorage.ObjStorage
|
|
dm *datamanager.DataManager
|
|
|
|
Initialized bool
|
|
initLock sync.Mutex
|
|
}
|
|
|
|
func NewReadDB(ctx context.Context, log zerolog.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) {
|
|
if err := os.MkdirAll(dataDir, 0770); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
readDB := &ReadDB{
|
|
log: log,
|
|
dataDir: dataDir,
|
|
e: e,
|
|
ost: ost,
|
|
dm: dm,
|
|
}
|
|
|
|
return readDB, nil
|
|
}
|
|
|
|
func (r *ReadDB) SetInitialized(initialized bool) {
|
|
r.initLock.Lock()
|
|
r.Initialized = initialized
|
|
r.initLock.Unlock()
|
|
}
|
|
|
|
func (r *ReadDB) IsInitialized() bool {
|
|
r.initLock.Lock()
|
|
defer r.initLock.Unlock()
|
|
return r.Initialized
|
|
}
|
|
|
|
// Initialize populates the readdb with the current etcd data and save the
|
|
// revision to then feed it with the etcd events
|
|
func (r *ReadDB) Initialize(ctx context.Context) error {
|
|
// sync the rdb
|
|
if err := r.SyncRDB(ctx); err != nil {
|
|
return errors.Wrapf(err, "error syncing db")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) ResetDB(ctx context.Context) error {
|
|
// TODO(sgotti) this needs to be protected by a mutex
|
|
if r.rdb != nil {
|
|
r.rdb.Close()
|
|
}
|
|
|
|
// drop rdb
|
|
if err := os.Remove(filepath.Join(r.dataDir, "db")); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(r.dataDir, "db"))
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
// populate readdb
|
|
if err := rdb.Create(ctx, Stmts); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
r.rdb = rdb
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) SyncFromDump(ctx context.Context) (string, error) {
|
|
dumpIndex, err := r.dm.GetLastDataStatus()
|
|
if err != nil {
|
|
return "", errors.WithStack(err)
|
|
}
|
|
for dataType, files := range dumpIndex.Files {
|
|
for _, file := range files {
|
|
dumpf, err := r.ost.ReadObject(r.dm.DataFilePath(dataType, file.ID))
|
|
if err != nil {
|
|
return "", errors.WithStack(err)
|
|
}
|
|
dumpEntries := []*datamanager.DataEntry{}
|
|
dec := json.NewDecoder(dumpf)
|
|
for {
|
|
var de *datamanager.DataEntry
|
|
|
|
err := dec.Decode(&de)
|
|
if errors.Is(err, io.EOF) {
|
|
// all done
|
|
break
|
|
}
|
|
if err != nil {
|
|
dumpf.Close()
|
|
return "", errors.WithStack(err)
|
|
}
|
|
dumpEntries = append(dumpEntries, de)
|
|
}
|
|
dumpf.Close()
|
|
|
|
err = r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
for _, de := range dumpEntries {
|
|
action := &datamanager.Action{
|
|
ActionType: datamanager.ActionTypePut,
|
|
ID: de.ID,
|
|
DataType: dataType,
|
|
Data: de.Data,
|
|
}
|
|
if err := r.applyAction(tx, action); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return "", errors.WithStack(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
err = r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
if err := r.insertCommittedWalSequence(tx, dumpIndex.WalSequence); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return "", errors.WithStack(err)
|
|
}
|
|
|
|
return dumpIndex.WalSequence, nil
|
|
}
|
|
|
|
func (r *ReadDB) SyncFromWals(ctx context.Context, startWalSeq, endWalSeq string) (string, error) {
|
|
insertfunc := func(walFiles []*datamanager.WalFile) error {
|
|
err := r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
for _, walFile := range walFiles {
|
|
header, err := r.dm.ReadWal(walFile.WalSequence)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
if err := r.insertCommittedWalSequence(tx, walFile.WalSequence); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
if err := r.applyWal(tx, header.WalDataFileID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
lastWalSeq := startWalSeq
|
|
walFiles := []*datamanager.WalFile{}
|
|
count := 0
|
|
|
|
doneCh := make(chan struct{})
|
|
defer close(doneCh)
|
|
|
|
for walFile := range r.dm.ListOSTWals(startWalSeq) {
|
|
if walFile.Err != nil {
|
|
return "", errors.WithStack(walFile.Err)
|
|
}
|
|
|
|
walFiles = append(walFiles, walFile)
|
|
lastWalSeq = walFile.WalSequence
|
|
|
|
if count > 100 {
|
|
if err := insertfunc(walFiles); err != nil {
|
|
return "", errors.WithStack(err)
|
|
}
|
|
count = 0
|
|
walFiles = []*datamanager.WalFile{}
|
|
} else {
|
|
count++
|
|
}
|
|
}
|
|
if err := insertfunc(walFiles); err != nil {
|
|
return "", errors.WithStack(err)
|
|
}
|
|
|
|
return lastWalSeq, nil
|
|
}
|
|
|
|
func (r *ReadDB) SyncRDB(ctx context.Context) error {
|
|
// get the last committed storage wal sequence saved in the rdb
|
|
curWalSeq := ""
|
|
err := r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
var err error
|
|
curWalSeq, err = r.GetCommittedWalSequence(tx)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
lastCommittedStorageWal, _, err := r.dm.LastCommittedStorageWal(ctx)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
doFullSync := false
|
|
if curWalSeq == "" {
|
|
doFullSync = true
|
|
r.log.Warn().Msgf("no startWalSeq in db, doing a full sync")
|
|
} else {
|
|
ok, err := r.dm.HasOSTWal(curWalSeq)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
if !ok {
|
|
r.log.Warn().Msgf("no wal with seq %q in objectstorage, doing a full sync", curWalSeq)
|
|
doFullSync = true
|
|
}
|
|
|
|
// if the epoch of the wals has changed this means etcd has been reset. If so
|
|
// we should do a full resync since we are saving in the rdb also data that
|
|
// was not yet committed to objectstorage so we should have the rdb ahead of
|
|
// the current objectstorage data
|
|
// TODO(sgotti) improve this to avoid doing a full resync
|
|
curWalSequence, err := sequence.Parse(curWalSeq)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
curWalEpoch := curWalSequence.Epoch
|
|
|
|
lastCommittedStorageWalSequence, err := sequence.Parse(lastCommittedStorageWal)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
if curWalEpoch != lastCommittedStorageWalSequence.Epoch {
|
|
r.log.Warn().Msgf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, doing a full sync", curWalEpoch, lastCommittedStorageWalSequence.Epoch)
|
|
doFullSync = true
|
|
}
|
|
}
|
|
|
|
if doFullSync {
|
|
r.log.Info().Msgf("doing a full sync from dump")
|
|
if err := r.ResetDB(ctx); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
var err error
|
|
curWalSeq, err = r.SyncFromDump(ctx)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
r.log.Debug().Msgf("startWalSeq: %s", curWalSeq)
|
|
|
|
// Sync from wals
|
|
// sync from objectstorage until the current known lastCommittedStorageWal in
|
|
// etcd since wals are first committed to objectstorage and then in etcd we
|
|
// would like to avoid to store in rdb something that is not yet marked as
|
|
// committedstorage in etcd
|
|
curWalSeq, err = r.SyncFromWals(ctx, curWalSeq, lastCommittedStorageWal)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to sync from wals")
|
|
}
|
|
|
|
// Get the first available wal from etcd and check that our current walseq
|
|
// from wals on objectstorage is >=
|
|
// if not (this happens when syncFromWals takes some time and in the meantime
|
|
// many new wals are written, the next sync should be faster and able to continue
|
|
firstAvailableWalData, revision, err := r.dm.FirstAvailableWalData(ctx)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to get first available wal data")
|
|
}
|
|
r.log.Debug().Msgf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData))
|
|
r.log.Debug().Msgf("revision: %d", revision)
|
|
if firstAvailableWalData == nil {
|
|
return errors.Errorf("no wal data in etcd")
|
|
}
|
|
if curWalSeq < firstAvailableWalData.WalSequence {
|
|
return errors.Errorf("current applied wal seq %q is smaller than the first available wal in etcd %q", curWalSeq, firstAvailableWalData.WalSequence)
|
|
}
|
|
|
|
r.log.Info().Msgf("syncing from wals")
|
|
err = r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
if err := r.insertRevision(tx, revision); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
// use the same revision as previous operation
|
|
for walElement := range r.dm.ListEtcdWals(ctx, revision) {
|
|
if walElement.Err != nil {
|
|
return errors.WithStack(walElement.Err)
|
|
}
|
|
if walElement.WalData.WalSequence <= curWalSeq {
|
|
continue
|
|
}
|
|
|
|
// update readdb only when the wal has been committed to etcd
|
|
if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
|
|
return nil
|
|
}
|
|
|
|
if err := r.insertCommittedWalSequence(tx, walElement.WalData.WalSequence); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
r.log.Debug().Msgf("applying wal to db")
|
|
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
// sync changegroups, use the same revision of previous operations
|
|
changeGroupsRevisions, err := r.dm.ListEtcdChangeGroups(ctx, revision)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
for changeGroupID, changeGroupRevision := range changeGroupsRevisions {
|
|
if err := r.insertChangeGroupRevision(tx, changeGroupID, changeGroupRevision); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
func (r *ReadDB) Run(ctx context.Context) error {
|
|
if r.rdb != nil {
|
|
r.rdb.Close()
|
|
}
|
|
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(r.dataDir, "db"))
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
r.rdb = rdb
|
|
|
|
// populate readdb
|
|
if err := r.rdb.Create(ctx, Stmts); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
revision, err := r.GetRevision(ctx)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
if revision == 0 || !r.Initialized {
|
|
for {
|
|
err := r.Initialize(ctx)
|
|
if err == nil {
|
|
break
|
|
}
|
|
r.log.Err(err).Msgf("initialize err")
|
|
|
|
sleepCh := time.NewTimer(1 * time.Second).C
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-sleepCh:
|
|
}
|
|
}
|
|
}
|
|
r.SetInitialized(true)
|
|
|
|
for {
|
|
for {
|
|
initialized := r.IsInitialized()
|
|
if initialized {
|
|
break
|
|
}
|
|
err := r.Initialize(ctx)
|
|
if err == nil {
|
|
r.SetInitialized(true)
|
|
break
|
|
}
|
|
r.log.Err(err).Msgf("initialize err")
|
|
|
|
sleepCh := time.NewTimer(1 * time.Second).C
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-sleepCh:
|
|
}
|
|
}
|
|
|
|
doneCh := make(chan struct{}, 2)
|
|
hctx, cancel := context.WithCancel(ctx)
|
|
wg := &sync.WaitGroup{}
|
|
|
|
wg.Add(1)
|
|
|
|
go func() {
|
|
r.log.Info().Msgf("starting handleEvents")
|
|
if err := r.handleEvents(hctx); err != nil {
|
|
r.log.Err(err).Msgf("handleEvents err")
|
|
}
|
|
wg.Done()
|
|
doneCh <- struct{}{}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
r.log.Info().Msgf("readdb exiting")
|
|
cancel()
|
|
return nil
|
|
case <-doneCh:
|
|
// cancel context and wait for the all the goroutines to exit
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
|
|
sleepCh := time.NewTimer(1 * time.Second).C
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
case <-sleepCh:
|
|
}
|
|
}
|
|
}
|
|
|
|
// TODO(sgotti) improve to apply when the wal have been "committedstorage" and
|
|
// not only "committed", in this way we don't have to full resync when etcd is
|
|
// lost/reset
|
|
func (r *ReadDB) handleEvents(ctx context.Context) error {
|
|
var revision int64
|
|
err := r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
err := tx.QueryRow("select revision from revision order by revision desc limit 1").Scan(&revision)
|
|
if err != nil {
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
revision = 0
|
|
} else {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
wctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
r.log.Debug().Msgf("revision: %d", revision)
|
|
wch := r.dm.Watch(wctx, revision+1)
|
|
for we := range wch {
|
|
r.log.Debug().Msgf("we: %s", util.Dump(we))
|
|
if we.Err != nil {
|
|
err := we.Err
|
|
if errors.Is(err, datamanager.ErrCompacted) {
|
|
r.log.Warn().Msgf("required events already compacted, reinitializing readdb")
|
|
r.Initialized = false
|
|
return nil
|
|
}
|
|
return errors.Wrapf(err, "watch error")
|
|
}
|
|
|
|
// a single transaction for every response (every response contains all the
|
|
// events happened in an etcd revision).
|
|
err = r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
|
|
// if theres a wal seq epoch change something happened to etcd, usually (if
|
|
// the user hasn't messed up with etcd keys) this means etcd has been reset
|
|
// in such case we should resync from the objectstorage state to ensure we
|
|
// apply all the wal marked as committedstorage (since they could have been
|
|
// lost from etcd)
|
|
curWalSeq, err := r.GetCommittedWalSequence(tx)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
r.log.Debug().Msgf("curWalSeq: %q", curWalSeq)
|
|
if curWalSeq != "" && we.WalData != nil {
|
|
curWalSequence, err := sequence.Parse(curWalSeq)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
curWalEpoch := curWalSequence.Epoch
|
|
|
|
weWalSequence, err := sequence.Parse(we.WalData.WalSequence)
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
r.log.Debug().Msgf("we.WalData.WalSequence: %q", we.WalData.WalSequence)
|
|
weWalEpoch := weWalSequence.Epoch
|
|
if curWalEpoch != weWalEpoch {
|
|
r.Initialized = false
|
|
return errors.Errorf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from objectstorage", curWalEpoch, weWalEpoch)
|
|
}
|
|
}
|
|
|
|
if err := r.handleEvent(tx, we); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
if err := r.insertRevision(tx, we.Revision); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
r.log.Info().Msgf("wch closed")
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) handleEvent(tx *db.Tx, we *datamanager.WatchElement) error {
|
|
//r.log.Debug().Msgf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
|
//key := string(ev.Kv.Key)
|
|
|
|
if err := r.handleWalEvent(tx, we); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error {
|
|
for cgName, cgRev := range we.ChangeGroupsRevisions {
|
|
if err := r.insertChangeGroupRevision(tx, cgName, cgRev); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
if we.WalData != nil {
|
|
// update readdb only when the wal has been committed to etcd
|
|
if we.WalData.WalStatus != datamanager.WalStatusCommitted {
|
|
return nil
|
|
}
|
|
|
|
if err := r.insertCommittedWalSequence(tx, we.WalData.WalSequence); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
|
|
r.log.Debug().Msgf("applying wal to db")
|
|
return r.applyWal(tx, we.WalData.WalDataFileID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
|
|
walFile, err := r.dm.ReadWalData(walDataFileID)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "cannot read wal data file %q", walDataFileID)
|
|
}
|
|
defer walFile.Close()
|
|
|
|
dec := json.NewDecoder(walFile)
|
|
for {
|
|
var action *datamanager.Action
|
|
|
|
err := dec.Decode(&action)
|
|
if errors.Is(err, io.EOF) {
|
|
// all done
|
|
break
|
|
}
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to decode wal file")
|
|
}
|
|
|
|
if err := r.applyAction(tx, action); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) applyAction(tx *db.Tx, action *datamanager.Action) error {
|
|
switch action.ActionType {
|
|
case datamanager.ActionTypePut:
|
|
switch types.ConfigType(action.DataType) {
|
|
case types.ConfigTypeUser:
|
|
if err := r.insertUser(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeOrg:
|
|
if err := r.insertOrg(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeOrgMember:
|
|
if err := r.insertOrgMember(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeProjectGroup:
|
|
if err := r.insertProjectGroup(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeProject:
|
|
if err := r.insertProject(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeRemoteSource:
|
|
if err := r.insertRemoteSource(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeSecret:
|
|
if err := r.insertSecret(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeVariable:
|
|
if err := r.insertVariable(tx, action.Data); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
|
|
case datamanager.ActionTypeDelete:
|
|
switch types.ConfigType(action.DataType) {
|
|
case types.ConfigTypeUser:
|
|
r.log.Debug().Msgf("deleting user with id: %s", action.ID)
|
|
if err := r.deleteUser(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeOrg:
|
|
r.log.Debug().Msgf("deleting org with id: %s", action.ID)
|
|
if err := r.deleteOrg(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeOrgMember:
|
|
r.log.Debug().Msgf("deleting orgmember with id: %s", action.ID)
|
|
if err := r.deleteOrgMember(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeProjectGroup:
|
|
r.log.Debug().Msgf("deleting project group with id: %s", action.ID)
|
|
if err := r.deleteProjectGroup(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeProject:
|
|
r.log.Debug().Msgf("deleting project with id: %s", action.ID)
|
|
if err := r.deleteProject(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeRemoteSource:
|
|
r.log.Debug().Msgf("deleting remote source with id: %s", action.ID)
|
|
if err := r.deleteRemoteSource(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeSecret:
|
|
r.log.Debug().Msgf("deleting secret with id: %s", action.ID)
|
|
if err := r.deleteSecret(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
case types.ConfigTypeVariable:
|
|
r.log.Debug().Msgf("deleting variable with id: %s", action.ID)
|
|
if err := r.deleteVariable(tx, action.ID); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) Do(ctx context.Context, f func(tx *db.Tx) error) error {
|
|
return errors.WithStack(r.rdb.Do(ctx, f))
|
|
}
|
|
|
|
func (r *ReadDB) insertRevision(tx *db.Tx, revision int64) error {
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from revision"); err != nil {
|
|
return errors.Wrapf(err, "failed to delete revision")
|
|
}
|
|
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
|
|
q, args, err := revisionInsert.Values(revision).ToSql()
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to build query")
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) GetRevision(ctx context.Context) (int64, error) {
|
|
var revision int64
|
|
|
|
err := r.rdb.Do(ctx, func(tx *db.Tx) error {
|
|
var err error
|
|
revision, err = r.getRevision(tx)
|
|
return errors.WithStack(err)
|
|
})
|
|
return revision, errors.WithStack(err)
|
|
}
|
|
|
|
func (r *ReadDB) getRevision(tx *db.Tx) (int64, error) {
|
|
var revision int64
|
|
|
|
q, args, err := revisionSelect.ToSql()
|
|
r.log.Debug().Msgf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return 0, errors.Wrapf(err, "failed to build query")
|
|
}
|
|
|
|
err = tx.QueryRow(q, args...).Scan(&revision)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return 0, nil
|
|
}
|
|
return revision, errors.WithStack(err)
|
|
}
|
|
|
|
func (r *ReadDB) insertCommittedWalSequence(tx *db.Tx, seq string) error {
|
|
r.log.Debug().Msgf("insert seq: %s", seq)
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from committedwalsequence"); err != nil {
|
|
return errors.Wrapf(err, "failed to delete committedwalsequence")
|
|
}
|
|
q, args, err := committedwalsequenceInsert.Values(seq).ToSql()
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to build query")
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) GetCommittedWalSequence(tx *db.Tx) (string, error) {
|
|
var seq string
|
|
|
|
q, args, err := committedwalsequenceSelect.OrderBy("seq").Limit(1).ToSql()
|
|
r.log.Debug().Msgf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return "", errors.Wrapf(err, "failed to build query")
|
|
}
|
|
|
|
err = tx.QueryRow(q, args...).Scan(&seq)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return "", nil
|
|
}
|
|
return seq, errors.WithStack(err)
|
|
}
|
|
|
|
func (r *ReadDB) insertChangeGroupRevision(tx *db.Tx, changegroup string, revision int64) error {
|
|
r.log.Debug().Msgf("insertChangeGroupRevision: %s %d", changegroup, revision)
|
|
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from changegrouprevision where id = $1", changegroup); err != nil {
|
|
return errors.Wrapf(err, "failed to delete run")
|
|
}
|
|
// insert only if revision > 0
|
|
if revision > 0 {
|
|
q, args, err := changegrouprevisionInsert.Values(changegroup, revision).ToSql()
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to build query")
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return errors.WithStack(err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*datamanager.ChangeGroupsUpdateToken, error) {
|
|
s := changegrouprevisionSelect.Where(sq.Eq{"id": groups})
|
|
q, args, err := s.ToSql()
|
|
r.log.Debug().Msgf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "failed to build query")
|
|
}
|
|
cgr, err := fetchChangeGroupsRevision(tx, q, args...)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
revision, err := r.getRevision(tx)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
|
|
// for non existing changegroups use a changegroup with revision = 0
|
|
for _, g := range groups {
|
|
if _, ok := cgr[g]; !ok {
|
|
cgr[g] = 0
|
|
}
|
|
}
|
|
|
|
return &datamanager.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
|
|
}
|
|
|
|
func fetchChangeGroupsRevision(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) {
|
|
rows, err := tx.Query(q, args...)
|
|
if err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
defer rows.Close()
|
|
return scanChangeGroupsRevision(rows)
|
|
}
|
|
|
|
func scanChangeGroupsRevision(rows *sql.Rows) (map[string]int64, error) {
|
|
changegroups := map[string]int64{}
|
|
for rows.Next() {
|
|
var (
|
|
id string
|
|
revision int64
|
|
)
|
|
if err := rows.Scan(&id, &revision); err != nil {
|
|
return nil, errors.Wrapf(err, "failed to scan rows")
|
|
}
|
|
changegroups[id] = revision
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, errors.WithStack(err)
|
|
}
|
|
return changegroups, nil
|
|
}
|