1631 lines
44 KiB
Go
1631 lines
44 KiB
Go
// Copyright 2019 Sorint.lab
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package readdb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"path"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sorintlab/agola/internal/datamanager"
|
|
"github.com/sorintlab/agola/internal/db"
|
|
"github.com/sorintlab/agola/internal/etcd"
|
|
"github.com/sorintlab/agola/internal/objectstorage"
|
|
ostypes "github.com/sorintlab/agola/internal/objectstorage/types"
|
|
"github.com/sorintlab/agola/internal/sequence"
|
|
"github.com/sorintlab/agola/internal/services/runservice/common"
|
|
"github.com/sorintlab/agola/internal/services/runservice/store"
|
|
"github.com/sorintlab/agola/internal/services/runservice/types"
|
|
"github.com/sorintlab/agola/internal/util"
|
|
|
|
sq "github.com/Masterminds/squirrel"
|
|
etcdclientv3 "go.etcd.io/etcd/clientv3"
|
|
etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
|
"go.uber.org/zap"
|
|
errors "golang.org/x/xerrors"
|
|
)
|
|
|
|
const (
|
|
paginationSize = 100
|
|
)
|
|
|
|
var (
|
|
// Use postgresql $ placeholder. It'll be converted to ? from the provided db functions
|
|
sb = sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
|
|
|
|
// readdb tables based on etcd data
|
|
revisionSelect = sb.Select("revision").From("revision")
|
|
revisionInsert = sb.Insert("revision").Columns("revision")
|
|
|
|
runSelect = sb.Select("id", "grouppath", "phase").From("run")
|
|
runInsert = sb.Insert("run").Columns("id", "grouppath", "phase")
|
|
|
|
rundataInsert = sb.Insert("rundata").Columns("id", "data")
|
|
|
|
runeventSelect = sb.Select("data").From("runevent")
|
|
runeventInsert = sb.Insert("runevent").Columns("sequence", "data")
|
|
|
|
changegrouprevisionSelect = sb.Select("id, revision").From("changegrouprevision")
|
|
changegrouprevisionInsert = sb.Insert("changegrouprevision").Columns("id", "revision")
|
|
|
|
// readdb tables based on objectstorage data
|
|
revisionOSTSelect = sb.Select("revision").From("revision_ost")
|
|
revisionOSTInsert = sb.Insert("revision_ost").Columns("revision")
|
|
|
|
runOSTSelect = sb.Select("id", "grouppath", "phase").From("run_ost")
|
|
runOSTInsert = sb.Insert("run_ost").Columns("id", "grouppath", "phase")
|
|
|
|
rundataOSTInsert = sb.Insert("rundata_ost").Columns("id", "data")
|
|
|
|
committedwalsequenceOSTSelect = sb.Select("seq").From("committedwalsequence_ost")
|
|
committedwalsequenceOSTInsert = sb.Insert("committedwalsequence_ost").Columns("seq")
|
|
|
|
changegrouprevisionOSTSelect = sb.Select("id, revision").From("changegrouprevision_ost")
|
|
changegrouprevisionOSTInsert = sb.Insert("changegrouprevision_ost").Columns("id", "revision")
|
|
|
|
runcounterOSTSelect = sb.Select("groupid", "counter").From("runcounter_ost")
|
|
runcounterOSTInsert = sb.Insert("runcounter_ost").Columns("groupid", "counter")
|
|
)
|
|
|
|
type ReadDB struct {
|
|
log *zap.SugaredLogger
|
|
dataDir string
|
|
e *etcd.Store
|
|
rdb *db.DB
|
|
ost *objectstorage.ObjStorage
|
|
dm *datamanager.DataManager
|
|
|
|
Initialized bool
|
|
initLock sync.Mutex
|
|
|
|
// dbWriteLock is used to have only one concurrent write transaction or sqlite
|
|
// will return a deadlock error (since we are using the unlock/notify api) if
|
|
// two write transactions acquire a lock on each other (we cannot specificy
|
|
// that a transaction will be a write tx so it'll start as a read tx, can
|
|
// acquire a lock on another read tx, when both become write tx the deadlock
|
|
// detector will return an error)
|
|
dbWriteLock sync.Mutex
|
|
}
|
|
|
|
func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.Store, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) (*ReadDB, error) {
|
|
if err := os.MkdirAll(dataDir, 0770); err != nil {
|
|
return nil, err
|
|
}
|
|
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(dataDir, "db"))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// populate readdb
|
|
if err := rdb.Create(Stmts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
readDB := &ReadDB{
|
|
log: logger.Sugar(),
|
|
e: e,
|
|
dataDir: dataDir,
|
|
ost: ost,
|
|
dm: dm,
|
|
rdb: rdb,
|
|
}
|
|
|
|
return readDB, nil
|
|
}
|
|
|
|
func (r *ReadDB) SetInitialized(initialized bool) {
|
|
r.initLock.Lock()
|
|
r.Initialized = initialized
|
|
r.initLock.Unlock()
|
|
}
|
|
|
|
func (r *ReadDB) IsInitialized() bool {
|
|
r.initLock.Lock()
|
|
defer r.initLock.Unlock()
|
|
return r.Initialized
|
|
}
|
|
|
|
// Initialize populates the readdb with the current etcd data and save the
|
|
// revision to then feed it with the etcd events
|
|
func (r *ReadDB) Initialize(ctx context.Context) error {
|
|
if err := r.ResetDB(); err != nil {
|
|
return errors.Errorf("failed to reset db: %w", err)
|
|
}
|
|
if err := r.SyncObjectStorage(ctx); err != nil {
|
|
return errors.Errorf("error syncing objectstorage db: %w", err)
|
|
}
|
|
if err := r.SyncRDB(ctx); err != nil {
|
|
return errors.Errorf("error syncing run db: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) ResetDB() error {
|
|
// TODO(sgotti) this needs to be protected by a mutex
|
|
r.rdb.Close()
|
|
|
|
// drop rdb
|
|
if err := os.Remove(filepath.Join(r.dataDir, "db")); err != nil {
|
|
return err
|
|
}
|
|
|
|
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(r.dataDir, "db"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// populate readdb
|
|
if err := rdb.Create(Stmts); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.rdb = rdb
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) SyncRDB(ctx context.Context) error {
|
|
err := r.rdb.Do(func(tx *db.Tx) error {
|
|
// Do pagination to limit the number of keys per request
|
|
var revision int64
|
|
key := common.EtcdRunsDir
|
|
|
|
var continuation *etcd.ListPagedContinuation
|
|
for {
|
|
listResp, err := r.e.ListPaged(ctx, key, revision, paginationSize, continuation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp := listResp.Resp
|
|
continuation = listResp.Continuation
|
|
|
|
if revision == 0 {
|
|
revision = resp.Header.Revision
|
|
}
|
|
|
|
for _, kv := range resp.Kvs {
|
|
var run *types.Run
|
|
if err := json.Unmarshal(kv.Value, &run); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := insertRun(tx, run, kv.Value); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !listResp.HasMore {
|
|
break
|
|
}
|
|
}
|
|
|
|
// sync changegroups, use the same revision of previous operations
|
|
key = common.EtcdChangeGroupsDir
|
|
continuation = nil
|
|
for {
|
|
listResp, err := r.e.ListPaged(ctx, key, revision, paginationSize, continuation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp := listResp.Resp
|
|
continuation = listResp.Continuation
|
|
|
|
for _, kv := range resp.Kvs {
|
|
changegroupID := path.Base(string(kv.Key))
|
|
|
|
if err := insertChangeGroupRevision(tx, changegroupID, kv.ModRevision); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if !listResp.HasMore {
|
|
break
|
|
}
|
|
}
|
|
|
|
if err := insertRevision(tx, revision); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *ReadDB) Run(ctx context.Context) error {
|
|
revision, err := r.GetRevision()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if revision == 0 {
|
|
for {
|
|
err := r.Initialize(ctx)
|
|
if err == nil {
|
|
break
|
|
}
|
|
r.log.Errorf("initialize err: %+v", err)
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
r.SetInitialized(true)
|
|
|
|
errCh := make(chan error)
|
|
for {
|
|
for {
|
|
initialized := r.IsInitialized()
|
|
if initialized {
|
|
break
|
|
}
|
|
err := r.Initialize(ctx)
|
|
if err == nil {
|
|
r.SetInitialized(true)
|
|
break
|
|
}
|
|
r.log.Errorf("initialize err: %+v", err)
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
wg := &sync.WaitGroup{}
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
r.log.Infof("starting handleEvents")
|
|
if err := r.handleEvents(ctx); err != nil {
|
|
r.log.Errorf("handleEvents err: %+v", err)
|
|
errCh <- err
|
|
}
|
|
wg.Done()
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
r.log.Infof("starting handleEventsOST")
|
|
if err := r.handleEventsOST(ctx); err != nil {
|
|
r.log.Errorf("handleEventsOST err: %+v", err)
|
|
errCh <- err
|
|
}
|
|
wg.Done()
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
r.log.Infof("readdb exiting")
|
|
cancel()
|
|
r.rdb.Close()
|
|
return nil
|
|
case <-errCh:
|
|
// cancel context and wait for the all the goroutines to exit
|
|
cancel()
|
|
wg.Wait()
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
|
|
func (r *ReadDB) handleEvents(ctx context.Context) error {
|
|
var revision int64
|
|
var lastRuns []*RunData
|
|
err := r.rdb.Do(func(tx *db.Tx) error {
|
|
var err error
|
|
revision, err = r.getRevision(tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
lastRuns, err = r.GetActiveRuns(tx, nil, true, nil, "", 1, types.SortOrderDesc)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
runSequence, _, err := sequence.CurSequence(ctx, r.e, common.EtcdRunSequenceKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var lastRun *types.Run
|
|
if len(lastRuns) > 0 {
|
|
lastRun = lastRuns[0].Run
|
|
}
|
|
if lastRun != nil {
|
|
if runSequence == nil {
|
|
r.SetInitialized(false)
|
|
return errors.Errorf("no runsequence in etcd, reinitializing.")
|
|
}
|
|
|
|
lastRunSequence, err := sequence.Parse(lastRun.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// check that the run sequence epoch isn't different than the current one (this means etcd
|
|
// has been reset, or worst, restored from a backup or manually deleted)
|
|
if runSequence == nil || runSequence.Epoch != lastRunSequence.Epoch {
|
|
r.SetInitialized(false)
|
|
return errors.Errorf("last run epoch %d is different than current epoch in etcd %d, reinitializing.", lastRunSequence.Epoch, runSequence.Epoch)
|
|
}
|
|
}
|
|
|
|
wctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
wctx = etcdclientv3.WithRequireLeader(wctx)
|
|
wch := r.e.Watch(wctx, common.EtcdSchedulerBaseDir+"/", revision+1)
|
|
for wresp := range wch {
|
|
if wresp.Canceled {
|
|
err = wresp.Err()
|
|
if err == etcdclientv3rpc.ErrCompacted {
|
|
r.log.Errorf("required events already compacted, reinitializing readdb")
|
|
r.SetInitialized(false)
|
|
}
|
|
return errors.Errorf("watch error: %w", err)
|
|
}
|
|
|
|
// a single transaction for every response (every response contains all the
|
|
// events happened in an etcd revision).
|
|
r.dbWriteLock.Lock()
|
|
err = r.rdb.Do(func(tx *db.Tx) error {
|
|
for _, ev := range wresp.Events {
|
|
if err := r.handleEvent(tx, ev, &wresp); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := insertRevision(tx, ev.Kv.ModRevision); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
r.dbWriteLock.Unlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) handleEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdclientv3.WatchResponse) error {
|
|
r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
|
key := string(ev.Kv.Key)
|
|
switch {
|
|
case strings.HasPrefix(key, common.EtcdRunsDir+"/"):
|
|
return r.handleRunEvent(tx, ev, wresp)
|
|
|
|
case strings.HasPrefix(key, common.EtcdChangeGroupsDir+"/"):
|
|
return r.handleChangeGroupEvent(tx, ev, wresp)
|
|
|
|
case key == common.EtcdRunEventKey:
|
|
return r.handleRunsEventEvent(tx, ev, wresp)
|
|
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (r *ReadDB) handleRunEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdclientv3.WatchResponse) error {
|
|
switch ev.Type {
|
|
case mvccpb.PUT:
|
|
var run *types.Run
|
|
if err := json.Unmarshal(ev.Kv.Value, &run); err != nil {
|
|
return errors.Errorf("failed to unmarshal run: %w", err)
|
|
}
|
|
|
|
return insertRun(tx, run, ev.Kv.Value)
|
|
|
|
case mvccpb.DELETE:
|
|
runID := path.Base(string(ev.Kv.Key))
|
|
|
|
if _, err := tx.Exec("delete from run where id = $1", runID); err != nil {
|
|
return errors.Errorf("failed to delete run: %w", err)
|
|
}
|
|
|
|
// Run has been deleted from etcd, this means that it was stored in the objectstorage
|
|
// TODO(sgotti) this is here just to avoid a window where the run is not in
|
|
// run table and in the run_os table but should be changed/removed when we'll
|
|
// implement run removal
|
|
run, err := store.OSTGetRun(r.dm, runID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return r.insertRunOST(tx, run, []byte{})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) handleRunsEventEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdclientv3.WatchResponse) error {
|
|
switch ev.Type {
|
|
case mvccpb.PUT:
|
|
var runEvent *types.RunEvent
|
|
if err := json.Unmarshal(ev.Kv.Value, &runEvent); err != nil {
|
|
return errors.Errorf("failed to unmarshal run: %w", err)
|
|
}
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from runevent where sequence = $1", runEvent.Sequence); err != nil {
|
|
return errors.Errorf("failed to delete run: %w", err)
|
|
}
|
|
q, args, err := runeventInsert.Values(runEvent.Sequence, ev.Kv.Value).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) handleChangeGroupEvent(tx *db.Tx, ev *etcdclientv3.Event, wresp *etcdclientv3.WatchResponse) error {
|
|
changegroupID := path.Base(string(ev.Kv.Key))
|
|
|
|
switch ev.Type {
|
|
case mvccpb.PUT:
|
|
return insertChangeGroupRevision(tx, changegroupID, ev.Kv.ModRevision)
|
|
|
|
case mvccpb.DELETE:
|
|
if _, err := tx.Exec("delete from changegrouprevision where id = $1", changegroupID); err != nil {
|
|
return errors.Errorf("failed to delete change group revision: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) SyncObjectStorage(ctx context.Context) error {
|
|
// get the last committed storage wal sequence saved in the rdb
|
|
curWalSeq := ""
|
|
err := r.rdb.Do(func(tx *db.Tx) error {
|
|
var err error
|
|
curWalSeq, err = r.GetCommittedWalSequenceOST(tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
lastCommittedStorageWal, _, err := r.dm.LastCommittedStorageWal(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
doFullSync := false
|
|
if curWalSeq == "" {
|
|
doFullSync = true
|
|
r.log.Warn("no startWalSeq in db, doing a full sync")
|
|
} else {
|
|
ok, err := r.dm.HasOSTWal(curWalSeq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !ok {
|
|
r.log.Warnf("no wal with seq %q in objectstorage, doing a full sync", curWalSeq)
|
|
doFullSync = true
|
|
}
|
|
|
|
// if the epoch of the wals has changed this means etcd has been reset. If so
|
|
// we should do a full resync since we are saving in the rdb also data that
|
|
// was not yet committed to objectstorage so we should have the rdb ahead of
|
|
// the current objectstorage data
|
|
// TODO(sgotti) improve this to avoid doing a full resync
|
|
curWalSequence, err := sequence.Parse(curWalSeq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
curWalEpoch := curWalSequence.Epoch
|
|
|
|
lastCommittedStorageWalSequence, err := sequence.Parse(lastCommittedStorageWal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if curWalEpoch != lastCommittedStorageWalSequence.Epoch {
|
|
r.log.Warnf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, doing a full sync", curWalEpoch, lastCommittedStorageWalSequence.Epoch)
|
|
doFullSync = true
|
|
}
|
|
}
|
|
|
|
if doFullSync {
|
|
r.log.Infof("doing a full sync from dump")
|
|
if err := r.ResetDB(); err != nil {
|
|
return err
|
|
}
|
|
|
|
var err error
|
|
curWalSeq, err = r.SyncFromDump()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
r.log.Debugf("startWalSeq: %s", curWalSeq)
|
|
|
|
// Sync from wals
|
|
// sync from objectstorage until the current known lastCommittedStorageWal in etcd
|
|
// since wals are first committed to objectstorage and then in etcd we would like to
|
|
// avoid to store in rdb something that is not yet marked as committedstorage
|
|
// in etcd
|
|
curWalSeq, err = r.SyncFromWals(curWalSeq, lastCommittedStorageWal)
|
|
if err != nil {
|
|
return errors.Errorf("failed to sync from wals: %w", err)
|
|
}
|
|
|
|
// Get the first available wal from etcd and check that our current walseq
|
|
// from wals on objectstorage is >=
|
|
// if not (this happens when syncFromWals takes some time and in the meantime
|
|
// many new wals are written, the next sync should be faster and able to continue
|
|
firstAvailableWalData, revision, err := r.dm.FirstAvailableWalData(ctx)
|
|
if err != nil {
|
|
return errors.Errorf("failed to get first available wal data: %w", err)
|
|
}
|
|
r.log.Debugf("firstAvailableWalData: %s", util.Dump(firstAvailableWalData))
|
|
r.log.Debugf("revision: %d", revision)
|
|
if firstAvailableWalData == nil {
|
|
if curWalSeq != "" {
|
|
// this happens if etcd has been reset
|
|
return errors.Errorf("our curwalseq is %q but there's no wal data on etcd", curWalSeq)
|
|
}
|
|
}
|
|
if firstAvailableWalData != nil {
|
|
if curWalSeq < firstAvailableWalData.WalSequence {
|
|
return errors.Errorf("current applied wal seq %q is smaller than the first available wal on etcd %q", curWalSeq, firstAvailableWalData.WalSequence)
|
|
}
|
|
}
|
|
|
|
err = r.rdb.Do(func(tx *db.Tx) error {
|
|
if err := insertRevisionOST(tx, revision); err != nil {
|
|
return err
|
|
}
|
|
|
|
// use the same revision as previous operation
|
|
for walElement := range r.dm.ListEtcdWals(ctx, revision) {
|
|
if walElement.Err != nil {
|
|
return err
|
|
}
|
|
if walElement.WalData.WalSequence <= curWalSeq {
|
|
continue
|
|
}
|
|
|
|
if err := r.insertCommittedWalSequenceOST(tx, walElement.WalData.WalSequence); err != nil {
|
|
return err
|
|
}
|
|
|
|
// update readdb only when the wal has been committed to etcd
|
|
if walElement.WalData.WalStatus != datamanager.WalStatusCommitted {
|
|
return nil
|
|
}
|
|
|
|
r.log.Debugf("applying wal to db")
|
|
if err := r.applyWal(tx, walElement.WalData.WalDataFileID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// sync changegroups, use the same revision of previous operations
|
|
changeGroupsRevisions, err := r.dm.ListEtcdChangeGroups(ctx, revision)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for changeGroupID, changeGroupRevision := range changeGroupsRevisions {
|
|
if err := r.insertChangeGroupRevisionOST(tx, changeGroupID, changeGroupRevision); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
func (r *ReadDB) SyncFromDump() (string, error) {
|
|
dumpIndex, err := r.dm.GetLastDataStatus()
|
|
if err != nil && err != ostypes.ErrNotExist {
|
|
return "", err
|
|
}
|
|
if err == ostypes.ErrNotExist {
|
|
return "", nil
|
|
}
|
|
for dataType, files := range dumpIndex.Files {
|
|
for _, file := range files {
|
|
dumpf, err := r.ost.ReadObject(datamanager.DataFilePath(dataType, file.ID))
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
dumpEntries := []*datamanager.DataEntry{}
|
|
dec := json.NewDecoder(dumpf)
|
|
for {
|
|
var de *datamanager.DataEntry
|
|
|
|
err := dec.Decode(&de)
|
|
if err == io.EOF {
|
|
// all done
|
|
break
|
|
}
|
|
if err != nil {
|
|
dumpf.Close()
|
|
return "", err
|
|
}
|
|
dumpEntries = append(dumpEntries, de)
|
|
}
|
|
dumpf.Close()
|
|
|
|
err = r.rdb.Do(func(tx *db.Tx) error {
|
|
for _, de := range dumpEntries {
|
|
action := &datamanager.Action{
|
|
ActionType: datamanager.ActionTypePut,
|
|
ID: de.ID,
|
|
DataType: dataType,
|
|
Data: de.Data,
|
|
}
|
|
if err := r.applyAction(tx, action); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
}
|
|
|
|
return dumpIndex.WalSequence, nil
|
|
}
|
|
|
|
func (r *ReadDB) SyncFromWals(startWalSeq, endWalSeq string) (string, error) {
|
|
insertfunc := func(walFiles []*datamanager.WalFile) error {
|
|
err := r.rdb.Do(func(tx *db.Tx) error {
|
|
for _, walFile := range walFiles {
|
|
walFilef, err := r.dm.ReadWal(walFile.WalSequence)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dec := json.NewDecoder(walFilef)
|
|
var header *datamanager.WalHeader
|
|
if err = dec.Decode(&header); err != nil && err != io.EOF {
|
|
walFilef.Close()
|
|
return err
|
|
}
|
|
walFilef.Close()
|
|
if err := r.insertCommittedWalSequenceOST(tx, walFile.WalSequence); err != nil {
|
|
return err
|
|
}
|
|
if err := r.applyWal(tx, header.WalDataFileID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
return err
|
|
}
|
|
|
|
lastWalSeq := startWalSeq
|
|
walFiles := []*datamanager.WalFile{}
|
|
count := 0
|
|
|
|
doneCh := make(chan struct{})
|
|
defer close(doneCh)
|
|
|
|
for walFile := range r.dm.ListOSTWals(startWalSeq) {
|
|
if walFile.Err != nil {
|
|
return "", walFile.Err
|
|
}
|
|
|
|
walFiles = append(walFiles, walFile)
|
|
lastWalSeq = walFile.WalSequence
|
|
|
|
if count > 100 {
|
|
if err := insertfunc(walFiles); err != nil {
|
|
return "", err
|
|
}
|
|
count = 0
|
|
walFiles = []*datamanager.WalFile{}
|
|
} else {
|
|
count++
|
|
}
|
|
}
|
|
if err := insertfunc(walFiles); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return lastWalSeq, nil
|
|
}
|
|
|
|
func (r *ReadDB) handleEventsOST(ctx context.Context) error {
|
|
var revision int64
|
|
err := r.rdb.Do(func(tx *db.Tx) error {
|
|
err := tx.QueryRow("select revision from revision order by revision desc limit 1").Scan(&revision)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
revision = 0
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
wctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
r.log.Debugf("revision: %d", revision)
|
|
wch := r.dm.Watch(wctx, revision+1)
|
|
for we := range wch {
|
|
r.log.Debugf("we: %s", util.Dump(we))
|
|
if we.Err != nil {
|
|
err := we.Err
|
|
if err == datamanager.ErrCompacted {
|
|
r.log.Warnf("required events already compacted, reinitializing readdb")
|
|
r.Initialized = false
|
|
return nil
|
|
}
|
|
return errors.Errorf("watch error: %w", err)
|
|
}
|
|
|
|
// a single transaction for every response (every response contains all the
|
|
// events happened in an etcd revision).
|
|
r.dbWriteLock.Lock()
|
|
err = r.rdb.Do(func(tx *db.Tx) error {
|
|
|
|
// if theres a wal seq epoch change something happened to etcd, usually (if
|
|
// the user hasn't messed up with etcd keys) this means etcd has been reset
|
|
// in such case we should resync from the objectstorage state to ensure we
|
|
// apply all the wal marked as committedstorage (since they could have been
|
|
// lost from etcd)
|
|
curWalSeq, err := r.GetCommittedWalSequenceOST(tx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.log.Debugf("curWalSeq: %q", curWalSeq)
|
|
if curWalSeq != "" && we.WalData != nil {
|
|
curWalSequence, err := sequence.Parse(curWalSeq)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
curWalEpoch := curWalSequence.Epoch
|
|
|
|
weWalSequence, err := sequence.Parse(we.WalData.WalSequence)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
r.log.Debugf("we.WalData.WalSequence: %q", we.WalData.WalSequence)
|
|
weWalEpoch := weWalSequence.Epoch
|
|
if curWalEpoch != weWalEpoch {
|
|
r.Initialized = false
|
|
return errors.Errorf("current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from objectstorage", curWalEpoch, weWalEpoch)
|
|
}
|
|
}
|
|
|
|
if err := r.handleEventOST(tx, we); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := insertRevisionOST(tx, we.Revision); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
r.dbWriteLock.Unlock()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) applyWal(tx *db.Tx, walDataFileID string) error {
|
|
walFile, err := r.dm.ReadWalData(walDataFileID)
|
|
if err != nil {
|
|
return errors.Errorf("cannot read wal data file %q: %w", walDataFileID, err)
|
|
}
|
|
defer walFile.Close()
|
|
|
|
dec := json.NewDecoder(walFile)
|
|
for {
|
|
var action *datamanager.Action
|
|
|
|
err := dec.Decode(&action)
|
|
if err == io.EOF {
|
|
// all done
|
|
break
|
|
}
|
|
if err != nil {
|
|
return errors.Errorf("failed to decode wal file: %w", err)
|
|
}
|
|
|
|
if err := r.applyAction(tx, action); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) applyAction(tx *db.Tx, action *datamanager.Action) error {
|
|
r.log.Debugf("action: dataType: %s, ID: %s", action.DataType, action.ID)
|
|
switch action.ActionType {
|
|
case datamanager.ActionTypePut:
|
|
switch action.DataType {
|
|
case string(common.DataTypeRun):
|
|
var run *types.Run
|
|
if err := json.Unmarshal(action.Data, &run); err != nil {
|
|
return err
|
|
}
|
|
if err := r.insertRunOST(tx, run, action.Data); err != nil {
|
|
return err
|
|
}
|
|
case string(common.DataTypeRunCounter):
|
|
var runCounter uint64
|
|
if err := json.Unmarshal(action.Data, &runCounter); err != nil {
|
|
return err
|
|
}
|
|
r.log.Debugf("inserting run counter %q, c: %d", action.ID, runCounter)
|
|
if err := r.insertRunCounterOST(tx, action.ID, runCounter); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
case datamanager.ActionTypeDelete:
|
|
switch action.DataType {
|
|
case string(common.DataTypeRun):
|
|
case string(common.DataTypeRunCounter):
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) handleEventOST(tx *db.Tx, we *datamanager.WatchElement) error {
|
|
//r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
|
|
//key := string(ev.Kv.Key)
|
|
|
|
if err := r.handleWalEvent(tx, we); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) handleWalEvent(tx *db.Tx, we *datamanager.WatchElement) error {
|
|
for cgName, cgRev := range we.ChangeGroupsRevisions {
|
|
if err := r.insertChangeGroupRevisionOST(tx, cgName, cgRev); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if we.WalData != nil {
|
|
// update readdb only when the wal has been committed to etcd
|
|
if we.WalData.WalStatus != datamanager.WalStatusCommitted {
|
|
return nil
|
|
}
|
|
|
|
if err := r.insertCommittedWalSequenceOST(tx, we.WalData.WalSequence); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.log.Debugf("applying wal to db")
|
|
return r.applyWal(tx, we.WalData.WalDataFileID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) Do(f func(tx *db.Tx) error) error {
|
|
if !r.IsInitialized() {
|
|
return errors.Errorf("db not initialized")
|
|
}
|
|
return r.rdb.Do(f)
|
|
}
|
|
|
|
func insertRevision(tx *db.Tx, revision int64) error {
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from revision"); err != nil {
|
|
return errors.Errorf("failed to delete revision: %w", err)
|
|
}
|
|
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
|
|
//q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql()
|
|
q, args, err := revisionInsert.Values(revision).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func insertRevisionOST(tx *db.Tx, revision int64) error {
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from revision_ost"); err != nil {
|
|
return errors.Errorf("failed to delete revision: %w", err)
|
|
}
|
|
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
|
|
//q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql()
|
|
q, args, err := revisionOSTInsert.Values(revision).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func insertRun(tx *db.Tx, run *types.Run, data []byte) error {
|
|
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
|
|
groupPath := run.Group
|
|
if !strings.HasSuffix(groupPath, "/") {
|
|
groupPath += "/"
|
|
}
|
|
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from run where id = $1", run.ID); err != nil {
|
|
return errors.Errorf("failed to delete run: %w", err)
|
|
}
|
|
q, args, err := runInsert.Values(run.ID, groupPath, run.Phase).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from rundata where id = $1", run.ID); err != nil {
|
|
return errors.Errorf("failed to delete rundata: %w", err)
|
|
}
|
|
q, args, err = rundataInsert.Values(run.ID, data).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) insertRunOST(tx *db.Tx, run *types.Run, data []byte) error {
|
|
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
|
|
groupPath := run.Group
|
|
if !strings.HasSuffix(groupPath, "/") {
|
|
groupPath += "/"
|
|
}
|
|
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from run_ost where id = $1", run.ID); err != nil {
|
|
return errors.Errorf("failed to delete run objectstorage: %w", err)
|
|
}
|
|
q, args, err := runOSTInsert.Values(run.ID, groupPath, run.Phase).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from rundata_ost where id = $1", run.ID); err != nil {
|
|
return errors.Errorf("failed to delete rundata: %w", err)
|
|
}
|
|
q, args, err = rundataOSTInsert.Values(run.ID, data).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func insertChangeGroupRevision(tx *db.Tx, changegroupID string, revision int64) error {
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from changegrouprevision where id = $1", changegroupID); err != nil {
|
|
return errors.Errorf("failed to delete run: %w", err)
|
|
}
|
|
q, args, err := changegrouprevisionInsert.Values(changegroupID, revision).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) GetRevision() (int64, error) {
|
|
var revision int64
|
|
|
|
err := r.rdb.Do(func(tx *db.Tx) error {
|
|
var err error
|
|
revision, err = r.getRevision(tx)
|
|
return err
|
|
})
|
|
return revision, err
|
|
}
|
|
|
|
func (r *ReadDB) getRevision(tx *db.Tx) (int64, error) {
|
|
var revision int64
|
|
|
|
q, args, err := revisionSelect.ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return 0, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
err = tx.QueryRow(q, args...).Scan(&revision)
|
|
if err == sql.ErrNoRows {
|
|
return 0, nil
|
|
}
|
|
return revision, err
|
|
}
|
|
|
|
func (r *ReadDB) GetChangeGroupsUpdateTokens(tx *db.Tx, groups []string) (*types.ChangeGroupsUpdateToken, error) {
|
|
s := changegrouprevisionSelect.Where(sq.Eq{"id": groups})
|
|
q, args, err := s.ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return nil, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
changeGroupsRevisions, err := fetchChangeGroupsRevision(tx, q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
revision, err := r.getRevision(tx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// for non existing changegroups use a changegroup with revision = 0
|
|
for _, g := range groups {
|
|
if _, ok := changeGroupsRevisions[g]; !ok {
|
|
changeGroupsRevisions[g] = 0
|
|
}
|
|
}
|
|
|
|
return &types.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: changeGroupsRevisions}, nil
|
|
}
|
|
|
|
func (r *ReadDB) GetActiveRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) {
|
|
return r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder)
|
|
}
|
|
|
|
func (r *ReadDB) GetRuns(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*types.Run, error) {
|
|
useObjectStorage := false
|
|
for _, phase := range phaseFilter {
|
|
if phase == types.RunPhaseFinished || phase == types.RunPhaseCancelled {
|
|
useObjectStorage = true
|
|
}
|
|
}
|
|
if len(phaseFilter) == 0 {
|
|
useObjectStorage = true
|
|
}
|
|
|
|
runDataRDB, err := r.getRunsFilteredActive(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
lastRunsMap := map[string]*RunData{}
|
|
runsMap := map[string]*RunData{}
|
|
for _, r := range runDataRDB {
|
|
runsMap[r.ID] = r
|
|
lastRunsMap[r.GroupPath] = r
|
|
}
|
|
|
|
if useObjectStorage {
|
|
// skip if the phase requested is not finished
|
|
runDataOST, err := r.GetRunsFilteredOST(tx, groups, lastRun, phaseFilter, startRunID, limit, sortOrder)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, rd := range runDataOST {
|
|
if lastRun {
|
|
if lr, ok := lastRunsMap[rd.GroupPath]; ok {
|
|
switch sortOrder {
|
|
case types.SortOrderAsc:
|
|
if rd.ID < lr.ID {
|
|
lastRunsMap[rd.GroupPath] = rd
|
|
}
|
|
case types.SortOrderDesc:
|
|
if rd.ID > lr.ID {
|
|
lastRunsMap[rd.GroupPath] = rd
|
|
}
|
|
}
|
|
} else {
|
|
lastRunsMap[rd.GroupPath] = rd
|
|
runsMap[rd.ID] = rd
|
|
}
|
|
} else {
|
|
runsMap[rd.ID] = rd
|
|
}
|
|
}
|
|
}
|
|
|
|
var keys []string
|
|
for k := range runsMap {
|
|
keys = append(keys, k)
|
|
}
|
|
switch sortOrder {
|
|
case types.SortOrderAsc:
|
|
sort.Sort(sort.StringSlice(keys))
|
|
case types.SortOrderDesc:
|
|
sort.Sort(sort.Reverse(sort.StringSlice(keys)))
|
|
}
|
|
|
|
aruns := make([]*types.Run, 0, len(runsMap))
|
|
|
|
count := 0
|
|
for _, runID := range keys {
|
|
if count >= limit {
|
|
break
|
|
}
|
|
count++
|
|
|
|
rd := runsMap[runID]
|
|
if rd.Run != nil {
|
|
aruns = append(aruns, rd.Run)
|
|
continue
|
|
}
|
|
|
|
// get run from objectstorage
|
|
run, err := store.OSTGetRun(r.dm, runID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
aruns = append(aruns, run)
|
|
}
|
|
|
|
return aruns, nil
|
|
}
|
|
|
|
func (r *ReadDB) getRunsFilteredQuery(phaseFilter []types.RunPhase, groups []string, lastRun bool, startRunID string, limit int, sortOrder types.SortOrder, objectstorage bool) sq.SelectBuilder {
|
|
runt := "run"
|
|
rundatat := "rundata"
|
|
fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"}
|
|
if len(groups) > 0 && lastRun {
|
|
fields = []string{"max(run.id)", "run.grouppath", "run.phase", "rundata.data"}
|
|
}
|
|
if objectstorage {
|
|
runt = "run_ost"
|
|
rundatat = "rundata_ost"
|
|
}
|
|
|
|
r.log.Debugf("runt: %s", runt)
|
|
s := sb.Select(fields...).From(runt + " as run")
|
|
switch sortOrder {
|
|
case types.SortOrderAsc:
|
|
s = s.OrderBy("run.id asc")
|
|
case types.SortOrderDesc:
|
|
s = s.OrderBy("run.id desc")
|
|
}
|
|
if len(phaseFilter) > 0 {
|
|
s = s.Where(sq.Eq{"phase": phaseFilter})
|
|
}
|
|
if startRunID != "" {
|
|
if lastRun {
|
|
switch sortOrder {
|
|
case types.SortOrderAsc:
|
|
s = s.Having(sq.Gt{"run.id": startRunID})
|
|
case types.SortOrderDesc:
|
|
s = s.Having(sq.Lt{"run.id": startRunID})
|
|
}
|
|
} else {
|
|
switch sortOrder {
|
|
case types.SortOrderAsc:
|
|
s = s.Where(sq.Gt{"run.id": startRunID})
|
|
case types.SortOrderDesc:
|
|
s = s.Where(sq.Lt{"run.id": startRunID})
|
|
}
|
|
}
|
|
}
|
|
if limit > 0 {
|
|
s = s.Limit(uint64(limit))
|
|
}
|
|
|
|
s = s.Join(fmt.Sprintf("%s as rundata on rundata.id = run.id", rundatat))
|
|
if len(groups) > 0 {
|
|
cond := sq.Or{}
|
|
for _, groupPath := range groups {
|
|
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
|
|
if !strings.HasSuffix(groupPath, "/") {
|
|
groupPath += "/"
|
|
}
|
|
|
|
cond = append(cond, sq.Like{"run.grouppath": groupPath + "%"})
|
|
}
|
|
s = s.Where(sq.Or{cond})
|
|
if lastRun {
|
|
s = s.GroupBy("run.grouppath")
|
|
}
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (r *ReadDB) getRunsFilteredActive(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) {
|
|
s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, false)
|
|
|
|
q, args, err := s.ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return nil, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
return fetchRuns(tx, q, args...)
|
|
}
|
|
|
|
func (r *ReadDB) GetRunsFilteredOST(tx *db.Tx, groups []string, lastRun bool, phaseFilter []types.RunPhase, startRunID string, limit int, sortOrder types.SortOrder) ([]*RunData, error) {
|
|
s := r.getRunsFilteredQuery(phaseFilter, groups, lastRun, startRunID, limit, sortOrder, true)
|
|
|
|
q, args, err := s.ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return nil, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
return fetchRuns(tx, q, args...)
|
|
}
|
|
|
|
func (r *ReadDB) GetRun(tx *db.Tx, runID string) (*types.Run, error) {
|
|
run, err := r.getRun(tx, runID, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if run != nil {
|
|
return run, nil
|
|
}
|
|
|
|
// try to fetch from ost
|
|
return r.getRun(tx, runID, true)
|
|
}
|
|
|
|
func (r *ReadDB) getRun(tx *db.Tx, runID string, ost bool) (*types.Run, error) {
|
|
s := r.getRunQuery(runID, ost)
|
|
|
|
q, args, err := s.ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return nil, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
runsData, err := fetchRuns(tx, q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(runsData) > 1 {
|
|
return nil, errors.Errorf("too many rows returned")
|
|
}
|
|
if len(runsData) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
run := runsData[0].Run
|
|
if run == nil {
|
|
var err error
|
|
if !ost {
|
|
return nil, errors.Errorf("nil active run data. This should never happen")
|
|
}
|
|
// get run from objectstorage
|
|
run, err = store.OSTGetRun(r.dm, runID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return run, nil
|
|
}
|
|
|
|
func (r *ReadDB) getRunQuery(runID string, objectstorage bool) sq.SelectBuilder {
|
|
runt := "run"
|
|
rundatat := "rundata"
|
|
fields := []string{"run.id", "run.grouppath", "run.phase", "rundata.data"}
|
|
if objectstorage {
|
|
runt = "run_ost"
|
|
rundatat = "rundata_ost"
|
|
}
|
|
|
|
s := sb.Select(fields...).From(runt + " as run").Where(sq.Eq{"run.id": runID})
|
|
s = s.Join(fmt.Sprintf("%s as rundata on rundata.id = run.id", rundatat))
|
|
|
|
return s
|
|
}
|
|
|
|
type RunData struct {
|
|
ID string
|
|
GroupPath string
|
|
Phase string
|
|
Run *types.Run
|
|
}
|
|
|
|
func fetchRuns(tx *db.Tx, q string, args ...interface{}) ([]*RunData, error) {
|
|
rows, err := tx.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanRuns(rows)
|
|
}
|
|
|
|
func scanRun(rows *sql.Rows) (*RunData, error) {
|
|
r := &RunData{}
|
|
var data []byte
|
|
if err := rows.Scan(&r.ID, &r.GroupPath, &r.Phase, &data); err != nil {
|
|
return nil, errors.Errorf("failed to scan rows: %w", err)
|
|
}
|
|
if len(data) > 0 {
|
|
if err := json.Unmarshal(data, &r.Run); err != nil {
|
|
return nil, errors.Errorf("failed to unmarshal run: %w", err)
|
|
}
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
func scanRuns(rows *sql.Rows) ([]*RunData, error) {
|
|
runs := []*RunData{}
|
|
for rows.Next() {
|
|
r, err := scanRun(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
runs = append(runs, r)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return runs, nil
|
|
}
|
|
|
|
func fetchChangeGroupsRevision(tx *db.Tx, q string, args ...interface{}) (types.ChangeGroupsRevisions, error) {
|
|
rows, err := tx.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanChangeGroupsRevision(rows)
|
|
}
|
|
|
|
func scanChangeGroupsRevision(rows *sql.Rows) (types.ChangeGroupsRevisions, error) {
|
|
changegroups := types.ChangeGroupsRevisions{}
|
|
for rows.Next() {
|
|
var (
|
|
id string
|
|
revision int64
|
|
)
|
|
if err := rows.Scan(&id, &revision); err != nil {
|
|
return nil, errors.Errorf("failed to scan rows: %w", err)
|
|
}
|
|
changegroups[id] = revision
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return changegroups, nil
|
|
}
|
|
|
|
func (r *ReadDB) insertCommittedWalSequenceOST(tx *db.Tx, seq string) error {
|
|
r.log.Debugf("insert seq: %s", seq)
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from committedwalsequence_ost"); err != nil {
|
|
return errors.Errorf("failed to delete committedwalsequence: %w", err)
|
|
}
|
|
q, args, err := committedwalsequenceOSTInsert.Values(seq).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) GetCommittedWalSequenceOST(tx *db.Tx) (string, error) {
|
|
var seq string
|
|
|
|
q, args, err := committedwalsequenceOSTSelect.OrderBy("seq").Limit(1).ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return "", errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
err = tx.QueryRow(q, args...).Scan(&seq)
|
|
if err == sql.ErrNoRows {
|
|
return "", nil
|
|
}
|
|
return seq, err
|
|
}
|
|
|
|
func (r *ReadDB) insertChangeGroupRevisionOST(tx *db.Tx, changegroup string, revision int64) error {
|
|
r.log.Debugf("insertChangeGroupRevision: %s %d", changegroup, revision)
|
|
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from changegrouprevision_ost where id = $1", changegroup); err != nil {
|
|
return errors.Errorf("failed to delete run: %w", err)
|
|
}
|
|
// insert only if revision > 0
|
|
if revision > 0 {
|
|
q, args, err := changegrouprevisionOSTInsert.Values(changegroup, revision).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) GetChangeGroupsUpdateTokensOST(tx *db.Tx, groups []string) (*datamanager.ChangeGroupsUpdateToken, error) {
|
|
s := changegrouprevisionOSTSelect.Where(sq.Eq{"id": groups})
|
|
q, args, err := s.ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return nil, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
cgr, err := fetchChangeGroupsRevisionOST(tx, q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
revision, err := r.getRevision(tx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// for non existing changegroups use a changegroup with revision = 0
|
|
for _, g := range groups {
|
|
if _, ok := cgr[g]; !ok {
|
|
cgr[g] = 0
|
|
}
|
|
}
|
|
|
|
return &datamanager.ChangeGroupsUpdateToken{CurRevision: revision, ChangeGroupsRevisions: cgr}, nil
|
|
}
|
|
|
|
func fetchChangeGroupsRevisionOST(tx *db.Tx, q string, args ...interface{}) (map[string]int64, error) {
|
|
rows, err := tx.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanChangeGroupsRevision(rows)
|
|
}
|
|
|
|
func scanChangeGroupsRevisionOST(rows *sql.Rows) (map[string]int64, error) {
|
|
changegroups := map[string]int64{}
|
|
for rows.Next() {
|
|
var (
|
|
id string
|
|
revision int64
|
|
)
|
|
if err := rows.Scan(&id, &revision); err != nil {
|
|
return nil, errors.Errorf("failed to scan rows: %w", err)
|
|
}
|
|
changegroups[id] = revision
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return changegroups, nil
|
|
}
|
|
|
|
func (r *ReadDB) insertRunCounterOST(tx *db.Tx, group string, counter uint64) error {
|
|
// poor man insert or update that works because transaction isolation level is serializable
|
|
if _, err := tx.Exec("delete from runcounter_ost where groupid = $1", group); err != nil {
|
|
return errors.Errorf("failed to delete revision: %w", err)
|
|
}
|
|
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
|
|
//q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql()
|
|
q, args, err := runcounterOSTInsert.Values(group, counter).ToSql()
|
|
if err != nil {
|
|
return errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
if _, err = tx.Exec(q, args...); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *ReadDB) GetRunCounterOST(tx *db.Tx, group string) (uint64, error) {
|
|
var g string
|
|
var counter uint64
|
|
|
|
q, args, err := runcounterOSTSelect.Where(sq.Eq{"groupid": group}).ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return 0, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
err = tx.QueryRow(q, args...).Scan(&g, &counter)
|
|
if err == sql.ErrNoRows {
|
|
return 0, nil
|
|
}
|
|
return counter, err
|
|
}
|
|
|
|
func (r *ReadDB) GetRunCountersOST(tx *db.Tx, start string, limit int) ([]*types.RunCounter, error) {
|
|
s := runcounterOSTSelect.Where(sq.Gt{"groupid": start})
|
|
if limit > 0 {
|
|
s = s.Limit(uint64(limit))
|
|
}
|
|
s = s.OrderBy("groupid asc")
|
|
|
|
q, args, err := s.ToSql()
|
|
r.log.Debugf("q: %s, args: %s", q, util.Dump(args))
|
|
if err != nil {
|
|
return nil, errors.Errorf("failed to build query: %w", err)
|
|
}
|
|
|
|
return fetchRunCounters(tx, q, args...)
|
|
}
|
|
|
|
func fetchRunCounters(tx *db.Tx, q string, args ...interface{}) ([]*types.RunCounter, error) {
|
|
rows, err := tx.Query(q, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
return scanRunCounters(rows)
|
|
}
|
|
|
|
func scanRunCounter(rows *sql.Rows) (*types.RunCounter, error) {
|
|
r := &types.RunCounter{}
|
|
if err := rows.Scan(&r.Group, &r.Counter); err != nil {
|
|
return nil, errors.Errorf("failed to scan rows: %w", err)
|
|
}
|
|
|
|
return r, nil
|
|
}
|
|
|
|
func scanRunCounters(rows *sql.Rows) ([]*types.RunCounter, error) {
|
|
runCounters := []*types.RunCounter{}
|
|
for rows.Next() {
|
|
r, err := scanRunCounter(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
runCounters = append(runCounters, r)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return runCounters, nil
|
|
}
|