2019-02-21 14:54:50 +00:00
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package readdb
import (
"context"
"database/sql"
"encoding/json"
"fmt"
2019-03-29 11:15:48 +00:00
"io"
2019-02-21 14:54:50 +00:00
"os"
"path"
"path/filepath"
"sort"
"strings"
"sync"
"time"
2019-07-01 09:40:20 +00:00
"agola.io/agola/internal/datamanager"
"agola.io/agola/internal/db"
"agola.io/agola/internal/etcd"
"agola.io/agola/internal/objectstorage"
ostypes "agola.io/agola/internal/objectstorage/types"
"agola.io/agola/internal/sequence"
"agola.io/agola/internal/services/runservice/common"
"agola.io/agola/internal/services/runservice/store"
"agola.io/agola/internal/services/runservice/types"
"agola.io/agola/internal/util"
2019-02-21 14:54:50 +00:00
sq "github.com/Masterminds/squirrel"
etcdclientv3 "go.etcd.io/etcd/clientv3"
etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/mvcc/mvccpb"
2019-05-23 21:21:45 +00:00
"go.uber.org/zap"
2019-05-23 09:23:14 +00:00
errors "golang.org/x/xerrors"
2019-02-21 14:54:50 +00:00
)
const (
2019-03-29 11:15:48 +00:00
paginationSize = 100
2019-02-21 14:54:50 +00:00
)
var (
// Use postgresql $ placeholder. It'll be converted to ? from the provided db functions
sb = sq . StatementBuilder . PlaceholderFormat ( sq . Dollar )
2019-04-01 10:54:43 +00:00
// readdb tables based on etcd data
2019-02-21 14:54:50 +00:00
revisionSelect = sb . Select ( "revision" ) . From ( "revision" )
revisionInsert = sb . Insert ( "revision" ) . Columns ( "revision" )
2019-07-02 12:53:01 +00:00
//runSelect = sb.Select("id", "grouppath", "phase").From("run")
2019-03-29 11:15:48 +00:00
runInsert = sb . Insert ( "run" ) . Columns ( "id" , "grouppath" , "phase" )
2019-02-21 14:54:50 +00:00
2019-03-29 11:15:48 +00:00
rundataInsert = sb . Insert ( "rundata" ) . Columns ( "id" , "data" )
2019-02-21 14:54:50 +00:00
2019-07-02 12:53:01 +00:00
//runeventSelect = sb.Select("data").From("runevent")
2019-02-21 14:54:50 +00:00
runeventInsert = sb . Insert ( "runevent" ) . Columns ( "sequence" , "data" )
changegrouprevisionSelect = sb . Select ( "id, revision" ) . From ( "changegrouprevision" )
changegrouprevisionInsert = sb . Insert ( "changegrouprevision" ) . Columns ( "id" , "revision" )
2019-04-27 13:16:48 +00:00
// readdb tables based on objectstorage data
2019-07-02 12:53:01 +00:00
//revisionOSTSelect = sb.Select("revision").From("revision_ost")
2019-04-27 13:16:48 +00:00
revisionOSTInsert = sb . Insert ( "revision_ost" ) . Columns ( "revision" )
2019-03-29 11:15:48 +00:00
2019-07-02 12:53:01 +00:00
//runOSTSelect = sb.Select("id", "grouppath", "phase").From("run_ost")
2019-04-27 13:16:48 +00:00
runOSTInsert = sb . Insert ( "run_ost" ) . Columns ( "id" , "grouppath" , "phase" )
2019-03-29 11:15:48 +00:00
2019-04-27 13:16:48 +00:00
rundataOSTInsert = sb . Insert ( "rundata_ost" ) . Columns ( "id" , "data" )
2019-04-01 10:54:43 +00:00
2019-04-27 13:16:48 +00:00
committedwalsequenceOSTSelect = sb . Select ( "seq" ) . From ( "committedwalsequence_ost" )
committedwalsequenceOSTInsert = sb . Insert ( "committedwalsequence_ost" ) . Columns ( "seq" )
2019-04-01 10:54:43 +00:00
2019-04-27 13:16:48 +00:00
changegrouprevisionOSTSelect = sb . Select ( "id, revision" ) . From ( "changegrouprevision_ost" )
changegrouprevisionOSTInsert = sb . Insert ( "changegrouprevision_ost" ) . Columns ( "id" , "revision" )
2019-04-01 10:54:43 +00:00
2019-04-27 13:16:48 +00:00
runcounterOSTSelect = sb . Select ( "groupid" , "counter" ) . From ( "runcounter_ost" )
runcounterOSTInsert = sb . Insert ( "runcounter_ost" ) . Columns ( "groupid" , "counter" )
2019-02-21 14:54:50 +00:00
)
type ReadDB struct {
log * zap . SugaredLogger
dataDir string
e * etcd . Store
rdb * db . DB
2019-04-27 13:16:48 +00:00
ost * objectstorage . ObjStorage
2019-04-26 14:00:03 +00:00
dm * datamanager . DataManager
2019-02-21 14:54:50 +00:00
Initialized bool
2019-04-01 10:54:43 +00:00
initLock sync . Mutex
// dbWriteLock is used to have only one concurrent write transaction or sqlite
// will return a deadlock error (since we are using the unlock/notify api) if
// two write transactions acquire a lock on each other (we cannot specificy
// that a transaction will be a write tx so it'll start as a read tx, can
// acquire a lock on another read tx, when both become write tx the deadlock
// detector will return an error)
dbWriteLock sync . Mutex
2019-02-21 14:54:50 +00:00
}
2019-04-26 14:00:03 +00:00
func NewReadDB ( ctx context . Context , logger * zap . Logger , dataDir string , e * etcd . Store , ost * objectstorage . ObjStorage , dm * datamanager . DataManager ) ( * ReadDB , error ) {
2019-02-21 14:54:50 +00:00
if err := os . MkdirAll ( dataDir , 0770 ) ; err != nil {
return nil , err
}
rdb , err := db . NewDB ( db . Sqlite3 , filepath . Join ( dataDir , "db" ) )
if err != nil {
return nil , err
}
// populate readdb
if err := rdb . Create ( Stmts ) ; err != nil {
return nil , err
}
readDB := & ReadDB {
log : logger . Sugar ( ) ,
e : e ,
dataDir : dataDir ,
2019-04-27 13:16:48 +00:00
ost : ost ,
2019-04-26 14:00:03 +00:00
dm : dm ,
2019-02-21 14:54:50 +00:00
rdb : rdb ,
}
return readDB , nil
}
2019-03-29 11:20:54 +00:00
func ( r * ReadDB ) SetInitialized ( initialized bool ) {
2019-04-01 10:54:43 +00:00
r . initLock . Lock ( )
2019-03-29 11:20:54 +00:00
r . Initialized = initialized
2019-04-01 10:54:43 +00:00
r . initLock . Unlock ( )
2019-03-29 11:20:54 +00:00
}
func ( r * ReadDB ) IsInitialized ( ) bool {
2019-04-01 10:54:43 +00:00
r . initLock . Lock ( )
defer r . initLock . Unlock ( )
2019-03-29 11:20:54 +00:00
return r . Initialized
}
2019-02-21 14:54:50 +00:00
// Initialize populates the readdb with the current etcd data and save the
// revision to then feed it with the etcd events
func ( r * ReadDB ) Initialize ( ctx context . Context ) error {
2019-03-29 11:15:48 +00:00
if err := r . ResetDB ( ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to reset db: %w" , err )
2019-03-29 11:15:48 +00:00
}
2019-04-27 13:16:48 +00:00
if err := r . SyncObjectStorage ( ctx ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "error syncing objectstorage db: %w" , err )
2019-03-29 11:15:48 +00:00
}
2019-04-01 10:54:43 +00:00
if err := r . SyncRDB ( ctx ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "error syncing run db: %w" , err )
2019-03-29 11:15:48 +00:00
}
return nil
}
func ( r * ReadDB ) ResetDB ( ) error {
// TODO(sgotti) this needs to be protected by a mutex
2019-02-21 14:54:50 +00:00
r . rdb . Close ( )
// drop rdb
if err := os . Remove ( filepath . Join ( r . dataDir , "db" ) ) ; err != nil {
return err
}
rdb , err := db . NewDB ( db . Sqlite3 , filepath . Join ( r . dataDir , "db" ) )
if err != nil {
return err
}
// populate readdb
if err := rdb . Create ( Stmts ) ; err != nil {
return err
}
r . rdb = rdb
return nil
}
2019-04-01 10:54:43 +00:00
func ( r * ReadDB ) SyncRDB ( ctx context . Context ) error {
2019-02-21 14:54:50 +00:00
err := r . rdb . Do ( func ( tx * db . Tx ) error {
// Do pagination to limit the number of keys per request
2019-04-01 10:54:43 +00:00
var revision int64
2019-02-21 14:54:50 +00:00
key := common . EtcdRunsDir
var continuation * etcd . ListPagedContinuation
for {
2019-03-29 11:15:48 +00:00
listResp , err := r . e . ListPaged ( ctx , key , revision , paginationSize , continuation )
2019-02-21 14:54:50 +00:00
if err != nil {
return err
}
resp := listResp . Resp
continuation = listResp . Continuation
if revision == 0 {
revision = resp . Header . Revision
}
for _ , kv := range resp . Kvs {
var run * types . Run
if err := json . Unmarshal ( kv . Value , & run ) ; err != nil {
return err
}
if err := insertRun ( tx , run , kv . Value ) ; err != nil {
return err
}
}
if ! listResp . HasMore {
break
}
}
2019-04-29 08:16:19 +00:00
// sync changegroups, use the same revision of previous operations
2019-02-21 14:54:50 +00:00
key = common . EtcdChangeGroupsDir
continuation = nil
for {
2019-03-29 11:15:48 +00:00
listResp , err := r . e . ListPaged ( ctx , key , revision , paginationSize , continuation )
2019-02-21 14:54:50 +00:00
if err != nil {
return err
}
resp := listResp . Resp
continuation = listResp . Continuation
for _ , kv := range resp . Kvs {
changegroupID := path . Base ( string ( kv . Key ) )
if err := insertChangeGroupRevision ( tx , changegroupID , kv . ModRevision ) ; err != nil {
return err
}
}
if ! listResp . HasMore {
break
}
}
2019-03-29 11:15:48 +00:00
if err := insertRevision ( tx , revision ) ; err != nil {
return err
}
2019-02-21 14:54:50 +00:00
return nil
} )
return err
}
2019-03-29 11:15:48 +00:00
func ( r * ReadDB ) Run ( ctx context . Context ) error {
revision , err := r . GetRevision ( )
if err != nil {
return err
}
if revision == 0 {
for {
err := r . Initialize ( ctx )
if err == nil {
break
}
r . log . Errorf ( "initialize err: %+v" , err )
time . Sleep ( 1 * time . Second )
}
}
2019-03-29 11:20:54 +00:00
r . SetInitialized ( true )
2019-03-29 11:15:48 +00:00
2019-04-01 10:54:43 +00:00
errCh := make ( chan error )
2019-02-21 14:54:50 +00:00
for {
2019-03-29 11:15:48 +00:00
for {
2019-03-29 11:20:54 +00:00
initialized := r . IsInitialized ( )
2019-03-29 11:15:48 +00:00
if initialized {
break
}
err := r . Initialize ( ctx )
if err == nil {
2019-03-29 11:20:54 +00:00
r . SetInitialized ( true )
2019-03-29 11:15:48 +00:00
break
}
r . log . Errorf ( "initialize err: %+v" , err )
time . Sleep ( 1 * time . Second )
}
2019-04-01 10:54:43 +00:00
ctx , cancel := context . WithCancel ( ctx )
wg := & sync . WaitGroup { }
wg . Add ( 1 )
go func ( ) {
2019-04-27 13:16:48 +00:00
r . log . Infof ( "starting handleEvents" )
if err := r . handleEvents ( ctx ) ; err != nil {
r . log . Errorf ( "handleEvents err: %+v" , err )
2019-04-01 10:54:43 +00:00
errCh <- err
}
wg . Done ( )
} ( )
wg . Add ( 1 )
go func ( ) {
2019-04-27 13:16:48 +00:00
r . log . Infof ( "starting handleEventsOST" )
if err := r . handleEventsOST ( ctx ) ; err != nil {
r . log . Errorf ( "handleEventsOST err: %+v" , err )
2019-04-01 10:54:43 +00:00
errCh <- err
}
wg . Done ( )
} ( )
2019-02-21 14:54:50 +00:00
select {
case <- ctx . Done ( ) :
r . log . Infof ( "readdb exiting" )
2019-04-01 10:54:43 +00:00
cancel ( )
2019-03-29 11:15:48 +00:00
r . rdb . Close ( )
return nil
2019-04-01 10:54:43 +00:00
case <- errCh :
// cancel context and wait for the all the goroutines to exit
cancel ( )
wg . Wait ( )
2019-02-21 14:54:50 +00:00
}
time . Sleep ( 1 * time . Second )
}
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) handleEvents ( ctx context . Context ) error {
2019-02-21 14:54:50 +00:00
var revision int64
2019-03-29 11:00:18 +00:00
var lastRuns [ ] * RunData
2019-02-21 14:54:50 +00:00
err := r . rdb . Do ( func ( tx * db . Tx ) error {
var err error
revision , err = r . getRevision ( tx )
if err != nil {
return err
}
2019-03-29 11:00:18 +00:00
lastRuns , err = r . GetActiveRuns ( tx , nil , true , nil , "" , 1 , types . SortOrderDesc )
2019-02-21 14:54:50 +00:00
return err
} )
if err != nil {
return err
}
runSequence , _ , err := sequence . CurSequence ( ctx , r . e , common . EtcdRunSequenceKey )
if err != nil {
return err
}
var lastRun * types . Run
if len ( lastRuns ) > 0 {
2019-03-29 11:00:18 +00:00
lastRun = lastRuns [ 0 ] . Run
2019-02-21 14:54:50 +00:00
}
if lastRun != nil {
if runSequence == nil {
2019-03-29 11:20:54 +00:00
r . SetInitialized ( false )
2019-02-21 14:54:50 +00:00
return errors . Errorf ( "no runsequence in etcd, reinitializing." )
}
lastRunSequence , err := sequence . Parse ( lastRun . ID )
if err != nil {
return err
}
// check that the run sequence epoch isn't different than the current one (this means etcd
// has been reset, or worst, restored from a backup or manually deleted)
2019-07-02 12:53:01 +00:00
if runSequence . Epoch != lastRunSequence . Epoch {
2019-03-29 11:20:54 +00:00
r . SetInitialized ( false )
2019-02-21 14:54:50 +00:00
return errors . Errorf ( "last run epoch %d is different than current epoch in etcd %d, reinitializing." , lastRunSequence . Epoch , runSequence . Epoch )
}
}
wctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
wctx = etcdclientv3 . WithRequireLeader ( wctx )
2019-04-27 06:59:47 +00:00
wch := r . e . Watch ( wctx , common . EtcdSchedulerBaseDir + "/" , revision + 1 )
2019-02-21 14:54:50 +00:00
for wresp := range wch {
if wresp . Canceled {
err = wresp . Err ( )
if err == etcdclientv3rpc . ErrCompacted {
r . log . Errorf ( "required events already compacted, reinitializing readdb" )
2019-03-29 11:20:54 +00:00
r . SetInitialized ( false )
2019-02-21 14:54:50 +00:00
}
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "watch error: %w" , err )
2019-02-21 14:54:50 +00:00
}
// a single transaction for every response (every response contains all the
// events happened in an etcd revision).
2019-04-01 10:54:43 +00:00
r . dbWriteLock . Lock ( )
2019-02-21 14:54:50 +00:00
err = r . rdb . Do ( func ( tx * db . Tx ) error {
for _ , ev := range wresp . Events {
if err := r . handleEvent ( tx , ev , & wresp ) ; err != nil {
return err
}
if err := insertRevision ( tx , ev . Kv . ModRevision ) ; err != nil {
return err
}
}
return nil
} )
2019-04-01 10:54:43 +00:00
r . dbWriteLock . Unlock ( )
2019-02-21 14:54:50 +00:00
if err != nil {
return err
}
}
return nil
}
func ( r * ReadDB ) handleEvent ( tx * db . Tx , ev * etcdclientv3 . Event , wresp * etcdclientv3 . WatchResponse ) error {
r . log . Debugf ( "event: %s %q : %q\n" , ev . Type , ev . Kv . Key , ev . Kv . Value )
key := string ( ev . Kv . Key )
switch {
case strings . HasPrefix ( key , common . EtcdRunsDir + "/" ) :
return r . handleRunEvent ( tx , ev , wresp )
case strings . HasPrefix ( key , common . EtcdChangeGroupsDir + "/" ) :
return r . handleChangeGroupEvent ( tx , ev , wresp )
case key == common . EtcdRunEventKey :
return r . handleRunsEventEvent ( tx , ev , wresp )
default :
return nil
}
}
func ( r * ReadDB ) handleRunEvent ( tx * db . Tx , ev * etcdclientv3 . Event , wresp * etcdclientv3 . WatchResponse ) error {
switch ev . Type {
case mvccpb . PUT :
var run * types . Run
if err := json . Unmarshal ( ev . Kv . Value , & run ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to unmarshal run: %w" , err )
2019-02-21 14:54:50 +00:00
}
return insertRun ( tx , run , ev . Kv . Value )
case mvccpb . DELETE :
runID := path . Base ( string ( ev . Kv . Key ) )
if _ , err := tx . Exec ( "delete from run where id = $1" , runID ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete run: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-04-27 13:16:48 +00:00
// Run has been deleted from etcd, this means that it was stored in the objectstorage
2019-04-01 10:54:43 +00:00
// TODO(sgotti) this is here just to avoid a window where the run is not in
2019-04-27 13:16:48 +00:00
// run table and in the run_os table but should be changed/removed when we'll
2019-04-01 10:54:43 +00:00
// implement run removal
2019-04-26 14:00:03 +00:00
run , err := store . OSTGetRun ( r . dm , runID )
2019-02-21 14:54:50 +00:00
if err != nil {
return err
}
2019-04-27 13:16:48 +00:00
return r . insertRunOST ( tx , run , [ ] byte { } )
2019-02-21 14:54:50 +00:00
}
return nil
}
func ( r * ReadDB ) handleRunsEventEvent ( tx * db . Tx , ev * etcdclientv3 . Event , wresp * etcdclientv3 . WatchResponse ) error {
switch ev . Type {
case mvccpb . PUT :
2019-05-15 07:40:32 +00:00
var runEvent * types . RunEvent
2019-02-21 14:54:50 +00:00
if err := json . Unmarshal ( ev . Kv . Value , & runEvent ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to unmarshal run: %w" , err )
2019-02-21 14:54:50 +00:00
}
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from runevent where sequence = $1" , runEvent . Sequence ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete run: %w" , err )
2019-02-21 14:54:50 +00:00
}
q , args , err := runeventInsert . Values ( runEvent . Sequence , ev . Kv . Value ) . ToSql ( )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
}
}
return nil
}
func ( r * ReadDB ) handleChangeGroupEvent ( tx * db . Tx , ev * etcdclientv3 . Event , wresp * etcdclientv3 . WatchResponse ) error {
changegroupID := path . Base ( string ( ev . Kv . Key ) )
switch ev . Type {
case mvccpb . PUT :
return insertChangeGroupRevision ( tx , changegroupID , ev . Kv . ModRevision )
case mvccpb . DELETE :
if _ , err := tx . Exec ( "delete from changegrouprevision where id = $1" , changegroupID ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete change group revision: %w" , err )
2019-02-21 14:54:50 +00:00
}
}
return nil
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) SyncObjectStorage ( ctx context . Context ) error {
2019-04-01 10:54:43 +00:00
// get the last committed storage wal sequence saved in the rdb
curWalSeq := ""
err := r . rdb . Do ( func ( tx * db . Tx ) error {
var err error
2019-04-27 13:16:48 +00:00
curWalSeq , err = r . GetCommittedWalSequenceOST ( tx )
2019-04-01 10:54:43 +00:00
if err != nil {
return err
}
return nil
} )
if err != nil {
return err
}
2019-04-26 14:00:03 +00:00
lastCommittedStorageWal , _ , err := r . dm . LastCommittedStorageWal ( ctx )
2019-04-01 10:54:43 +00:00
if err != nil {
return err
}
doFullSync := false
if curWalSeq == "" {
doFullSync = true
r . log . Warn ( "no startWalSeq in db, doing a full sync" )
} else {
2019-04-26 14:00:03 +00:00
ok , err := r . dm . HasOSTWal ( curWalSeq )
2019-04-01 10:54:43 +00:00
if err != nil {
return err
}
if ! ok {
2019-04-27 13:16:48 +00:00
r . log . Warnf ( "no wal with seq %q in objectstorage, doing a full sync" , curWalSeq )
2019-04-01 10:54:43 +00:00
doFullSync = true
}
// if the epoch of the wals has changed this means etcd has been reset. If so
// we should do a full resync since we are saving in the rdb also data that
2019-04-27 13:16:48 +00:00
// was not yet committed to objectstorage so we should have the rdb ahead of
// the current objectstorage data
2019-04-01 10:54:43 +00:00
// TODO(sgotti) improve this to avoid doing a full resync
curWalSequence , err := sequence . Parse ( curWalSeq )
if err != nil {
return err
}
curWalEpoch := curWalSequence . Epoch
lastCommittedStorageWalSequence , err := sequence . Parse ( lastCommittedStorageWal )
if err != nil {
return err
}
if curWalEpoch != lastCommittedStorageWalSequence . Epoch {
r . log . Warnf ( "current rdb wal sequence epoch %d different than new wal sequence epoch %d, doing a full sync" , curWalEpoch , lastCommittedStorageWalSequence . Epoch )
doFullSync = true
}
}
if doFullSync {
2019-04-26 14:00:03 +00:00
r . log . Infof ( "doing a full sync from dump" )
2019-04-01 10:54:43 +00:00
if err := r . ResetDB ( ) ; err != nil {
return err
}
var err error
curWalSeq , err = r . SyncFromDump ( )
if err != nil {
return err
}
}
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "startWalSeq: %s" , curWalSeq )
2019-04-01 10:54:43 +00:00
// Sync from wals
2019-04-27 13:16:48 +00:00
// sync from objectstorage until the current known lastCommittedStorageWal in etcd
// since wals are first committed to objectstorage and then in etcd we would like to
2019-04-01 10:54:43 +00:00
// avoid to store in rdb something that is not yet marked as committedstorage
// in etcd
curWalSeq , err = r . SyncFromWals ( curWalSeq , lastCommittedStorageWal )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to sync from wals: %w" , err )
2019-04-01 10:54:43 +00:00
}
// Get the first available wal from etcd and check that our current walseq
2019-04-27 13:16:48 +00:00
// from wals on objectstorage is >=
2019-04-01 10:54:43 +00:00
// if not (this happens when syncFromWals takes some time and in the meantime
// many new wals are written, the next sync should be faster and able to continue
2019-04-26 14:00:03 +00:00
firstAvailableWalData , revision , err := r . dm . FirstAvailableWalData ( ctx )
2019-04-01 10:54:43 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to get first available wal data: %w" , err )
2019-04-01 10:54:43 +00:00
}
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "firstAvailableWalData: %s" , util . Dump ( firstAvailableWalData ) )
r . log . Debugf ( "revision: %d" , revision )
2019-04-01 10:54:43 +00:00
if firstAvailableWalData == nil {
if curWalSeq != "" {
// this happens if etcd has been reset
return errors . Errorf ( "our curwalseq is %q but there's no wal data on etcd" , curWalSeq )
}
}
if firstAvailableWalData != nil {
if curWalSeq < firstAvailableWalData . WalSequence {
return errors . Errorf ( "current applied wal seq %q is smaller than the first available wal on etcd %q" , curWalSeq , firstAvailableWalData . WalSequence )
}
}
err = r . rdb . Do ( func ( tx * db . Tx ) error {
2019-04-27 13:16:48 +00:00
if err := insertRevisionOST ( tx , revision ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
// use the same revision as previous operation
2019-04-26 14:00:03 +00:00
for walElement := range r . dm . ListEtcdWals ( ctx , revision ) {
2019-04-01 10:54:43 +00:00
if walElement . Err != nil {
return err
}
if walElement . WalData . WalSequence <= curWalSeq {
continue
}
2019-04-27 13:16:48 +00:00
if err := r . insertCommittedWalSequenceOST ( tx , walElement . WalData . WalSequence ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
2019-06-03 16:02:09 +00:00
// update readdb only when the wal has been committed to etcd
if walElement . WalData . WalStatus != datamanager . WalStatusCommitted {
return nil
}
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "applying wal to db" )
if err := r . applyWal ( tx , walElement . WalData . WalDataFileID ) ; err != nil {
return err
}
}
2019-04-29 08:16:19 +00:00
// sync changegroups, use the same revision of previous operations
changeGroupsRevisions , err := r . dm . ListEtcdChangeGroups ( ctx , revision )
if err != nil {
return err
}
for changeGroupID , changeGroupRevision := range changeGroupsRevisions {
if err := r . insertChangeGroupRevisionOST ( tx , changeGroupID , changeGroupRevision ) ; err != nil {
return err
}
}
2019-04-01 10:54:43 +00:00
return nil
} )
return err
}
func ( r * ReadDB ) SyncFromDump ( ) ( string , error ) {
2019-04-26 14:00:03 +00:00
dumpIndex , err := r . dm . GetLastDataStatus ( )
2019-05-21 13:17:53 +00:00
if err != nil && err != ostypes . ErrNotExist {
2019-05-23 09:23:14 +00:00
return "" , err
2019-04-01 10:54:43 +00:00
}
2019-05-21 13:17:53 +00:00
if err == ostypes . ErrNotExist {
2019-04-26 14:00:03 +00:00
return "" , nil
2019-04-01 10:54:43 +00:00
}
2019-04-26 14:00:03 +00:00
for dataType , files := range dumpIndex . Files {
2019-06-03 14:17:27 +00:00
for _ , file := range files {
dumpf , err := r . ost . ReadObject ( datamanager . DataFilePath ( dataType , file . ID ) )
2019-04-26 14:00:03 +00:00
if err != nil {
return "" , err
}
2019-06-03 14:17:27 +00:00
dumpEntries := [ ] * datamanager . DataEntry { }
dec := json . NewDecoder ( dumpf )
for {
var de * datamanager . DataEntry
err := dec . Decode ( & de )
if err == io . EOF {
// all done
break
2019-04-26 14:00:03 +00:00
}
2019-06-03 14:17:27 +00:00
if err != nil {
dumpf . Close ( )
return "" , err
2019-04-01 10:54:43 +00:00
}
2019-06-03 14:17:27 +00:00
dumpEntries = append ( dumpEntries , de )
}
dumpf . Close ( )
err = r . rdb . Do ( func ( tx * db . Tx ) error {
for _ , de := range dumpEntries {
action := & datamanager . Action {
ActionType : datamanager . ActionTypePut ,
ID : de . ID ,
DataType : dataType ,
Data : de . Data ,
}
if err := r . applyAction ( tx , action ) ; err != nil {
return err
}
}
return nil
} )
if err != nil {
return "" , err
2019-04-01 10:54:43 +00:00
}
}
}
2019-04-26 14:00:03 +00:00
return dumpIndex . WalSequence , nil
2019-04-01 10:54:43 +00:00
}
func ( r * ReadDB ) SyncFromWals ( startWalSeq , endWalSeq string ) ( string , error ) {
2019-04-26 14:00:03 +00:00
insertfunc := func ( walFiles [ ] * datamanager . WalFile ) error {
2019-04-01 10:54:43 +00:00
err := r . rdb . Do ( func ( tx * db . Tx ) error {
for _ , walFile := range walFiles {
2019-04-26 14:00:03 +00:00
walFilef , err := r . dm . ReadWal ( walFile . WalSequence )
2019-04-01 10:54:43 +00:00
if err != nil {
return err
}
dec := json . NewDecoder ( walFilef )
2019-04-26 14:00:03 +00:00
var header * datamanager . WalHeader
2019-04-01 10:54:43 +00:00
if err = dec . Decode ( & header ) ; err != nil && err != io . EOF {
walFilef . Close ( )
return err
}
walFilef . Close ( )
2019-04-27 13:16:48 +00:00
if err := r . insertCommittedWalSequenceOST ( tx , walFile . WalSequence ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
if err := r . applyWal ( tx , header . WalDataFileID ) ; err != nil {
return err
}
}
return nil
} )
return err
}
lastWalSeq := startWalSeq
2019-04-26 14:00:03 +00:00
walFiles := [ ] * datamanager . WalFile { }
2019-04-01 10:54:43 +00:00
count := 0
doneCh := make ( chan struct { } )
defer close ( doneCh )
2019-04-26 14:00:03 +00:00
for walFile := range r . dm . ListOSTWals ( startWalSeq ) {
2019-04-01 10:54:43 +00:00
if walFile . Err != nil {
return "" , walFile . Err
}
walFiles = append ( walFiles , walFile )
lastWalSeq = walFile . WalSequence
if count > 100 {
if err := insertfunc ( walFiles ) ; err != nil {
return "" , err
}
count = 0
2019-04-26 14:00:03 +00:00
walFiles = [ ] * datamanager . WalFile { }
2019-04-01 10:54:43 +00:00
} else {
count ++
}
}
if err := insertfunc ( walFiles ) ; err != nil {
return "" , err
}
return lastWalSeq , nil
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) handleEventsOST ( ctx context . Context ) error {
2019-04-01 10:54:43 +00:00
var revision int64
err := r . rdb . Do ( func ( tx * db . Tx ) error {
err := tx . QueryRow ( "select revision from revision order by revision desc limit 1" ) . Scan ( & revision )
if err != nil {
if err == sql . ErrNoRows {
revision = 0
} else {
return err
}
}
return nil
} )
if err != nil {
return err
}
wctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "revision: %d" , revision )
2019-04-26 14:00:03 +00:00
wch := r . dm . Watch ( wctx , revision + 1 )
2019-04-01 10:54:43 +00:00
for we := range wch {
r . log . Debugf ( "we: %s" , util . Dump ( we ) )
if we . Err != nil {
err := we . Err
2019-04-26 14:00:03 +00:00
if err == datamanager . ErrCompacted {
2019-04-01 10:54:43 +00:00
r . log . Warnf ( "required events already compacted, reinitializing readdb" )
r . Initialized = false
return nil
}
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "watch error: %w" , err )
2019-04-01 10:54:43 +00:00
}
// a single transaction for every response (every response contains all the
// events happened in an etcd revision).
r . dbWriteLock . Lock ( )
err = r . rdb . Do ( func ( tx * db . Tx ) error {
// if theres a wal seq epoch change something happened to etcd, usually (if
// the user hasn't messed up with etcd keys) this means etcd has been reset
2019-04-27 13:16:48 +00:00
// in such case we should resync from the objectstorage state to ensure we
// apply all the wal marked as committedstorage (since they could have been
// lost from etcd)
curWalSeq , err := r . GetCommittedWalSequenceOST ( tx )
2019-04-01 10:54:43 +00:00
if err != nil {
return err
}
r . log . Debugf ( "curWalSeq: %q" , curWalSeq )
if curWalSeq != "" && we . WalData != nil {
curWalSequence , err := sequence . Parse ( curWalSeq )
if err != nil {
return err
}
curWalEpoch := curWalSequence . Epoch
weWalSequence , err := sequence . Parse ( we . WalData . WalSequence )
if err != nil {
return err
}
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "we.WalData.WalSequence: %q" , we . WalData . WalSequence )
2019-04-01 10:54:43 +00:00
weWalEpoch := weWalSequence . Epoch
if curWalEpoch != weWalEpoch {
r . Initialized = false
2019-04-27 13:16:48 +00:00
return errors . Errorf ( "current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from objectstorage" , curWalEpoch , weWalEpoch )
2019-04-01 10:54:43 +00:00
}
}
2019-04-27 13:16:48 +00:00
if err := r . handleEventOST ( tx , we ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
2019-04-27 13:16:48 +00:00
if err := insertRevisionOST ( tx , we . Revision ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
return nil
} )
r . dbWriteLock . Unlock ( )
if err != nil {
return err
}
}
return nil
}
func ( r * ReadDB ) applyWal ( tx * db . Tx , walDataFileID string ) error {
2019-04-26 14:00:03 +00:00
walFile , err := r . dm . ReadWalData ( walDataFileID )
2019-04-01 10:54:43 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "cannot read wal data file %q: %w" , walDataFileID , err )
2019-04-01 10:54:43 +00:00
}
defer walFile . Close ( )
dec := json . NewDecoder ( walFile )
for {
2019-04-26 14:00:03 +00:00
var action * datamanager . Action
2019-04-01 10:54:43 +00:00
err := dec . Decode ( & action )
if err == io . EOF {
// all done
break
}
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to decode wal file: %w" , err )
2019-04-01 10:54:43 +00:00
}
if err := r . applyAction ( tx , action ) ; err != nil {
return err
}
}
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) applyAction ( tx * db . Tx , action * datamanager . Action ) error {
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "action: dataType: %s, ID: %s" , action . DataType , action . ID )
2019-04-01 10:54:43 +00:00
switch action . ActionType {
2019-04-26 14:00:03 +00:00
case datamanager . ActionTypePut :
2019-04-01 10:54:43 +00:00
switch action . DataType {
case string ( common . DataTypeRun ) :
var run * types . Run
if err := json . Unmarshal ( action . Data , & run ) ; err != nil {
return err
}
2019-04-27 13:16:48 +00:00
if err := r . insertRunOST ( tx , run , action . Data ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
case string ( common . DataTypeRunCounter ) :
var runCounter uint64
if err := json . Unmarshal ( action . Data , & runCounter ) ; err != nil {
return err
}
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "inserting run counter %q, c: %d" , action . ID , runCounter )
2019-04-27 13:16:48 +00:00
if err := r . insertRunCounterOST ( tx , action . ID , runCounter ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
}
2019-04-26 14:00:03 +00:00
case datamanager . ActionTypeDelete :
2019-04-01 10:54:43 +00:00
switch action . DataType {
case string ( common . DataTypeRun ) :
case string ( common . DataTypeRunCounter ) :
}
}
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) handleEventOST ( tx * db . Tx , we * datamanager . WatchElement ) error {
2019-04-01 10:54:43 +00:00
//r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//key := string(ev.Kv.Key)
if err := r . handleWalEvent ( tx , we ) ; err != nil {
return err
}
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) handleWalEvent ( tx * db . Tx , we * datamanager . WatchElement ) error {
2019-04-01 10:54:43 +00:00
for cgName , cgRev := range we . ChangeGroupsRevisions {
2019-04-27 13:16:48 +00:00
if err := r . insertChangeGroupRevisionOST ( tx , cgName , cgRev ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
}
if we . WalData != nil {
2019-06-03 16:02:09 +00:00
// update readdb only when the wal has been committed to etcd
2019-04-26 14:00:03 +00:00
if we . WalData . WalStatus != datamanager . WalStatusCommitted {
2019-04-01 10:54:43 +00:00
return nil
}
2019-04-27 13:16:48 +00:00
if err := r . insertCommittedWalSequenceOST ( tx , we . WalData . WalSequence ) ; err != nil {
2019-04-01 10:54:43 +00:00
return err
}
r . log . Debugf ( "applying wal to db" )
return r . applyWal ( tx , we . WalData . WalDataFileID )
}
return nil
}
2019-02-21 14:54:50 +00:00
func ( r * ReadDB ) Do ( f func ( tx * db . Tx ) error ) error {
2019-03-29 11:20:54 +00:00
if ! r . IsInitialized ( ) {
return errors . Errorf ( "db not initialized" )
}
2019-02-21 14:54:50 +00:00
return r . rdb . Do ( f )
}
func insertRevision ( tx * db . Tx , revision int64 ) error {
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from revision" ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete revision: %w" , err )
2019-02-21 14:54:50 +00:00
}
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
//q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql()
q , args , err := revisionInsert . Values ( revision ) . ToSql ( )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
2019-05-23 09:23:14 +00:00
return err
2019-02-21 14:54:50 +00:00
}
return nil
}
2019-04-27 13:16:48 +00:00
func insertRevisionOST ( tx * db . Tx , revision int64 ) error {
2019-04-01 10:54:43 +00:00
// poor man insert or update that works because transaction isolation level is serializable
2019-04-27 13:16:48 +00:00
if _ , err := tx . Exec ( "delete from revision_ost" ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete revision: %w" , err )
2019-04-01 10:54:43 +00:00
}
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
//q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql()
2019-04-27 13:16:48 +00:00
q , args , err := revisionOSTInsert . Values ( revision ) . ToSql ( )
2019-04-01 10:54:43 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
2019-05-23 09:23:14 +00:00
return err
2019-04-01 10:54:43 +00:00
}
return nil
}
2019-02-21 14:54:50 +00:00
func insertRun ( tx * db . Tx , run * types . Run , data [ ] byte ) error {
2019-03-29 11:15:48 +00:00
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
groupPath := run . Group
if ! strings . HasSuffix ( groupPath , "/" ) {
groupPath += "/"
}
2019-02-21 14:54:50 +00:00
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from run where id = $1" , run . ID ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete run: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:15:48 +00:00
q , args , err := runInsert . Values ( run . ID , groupPath , run . Phase ) . ToSql ( )
2019-02-21 14:54:50 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
}
2019-03-29 11:15:48 +00:00
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from rundata where id = $1" , run . ID ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete rundata: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:15:48 +00:00
q , args , err = rundataInsert . Values ( run . ID , data ) . ToSql ( )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-03-29 11:15:48 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
2019-02-21 14:54:50 +00:00
}
return nil
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) insertRunOST ( tx * db . Tx , run * types . Run , data [ ] byte ) error {
2019-03-29 11:15:48 +00:00
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
groupPath := run . Group
if ! strings . HasSuffix ( groupPath , "/" ) {
groupPath += "/"
}
2019-02-21 14:54:50 +00:00
// poor man insert or update that works because transaction isolation level is serializable
2019-04-27 13:16:48 +00:00
if _ , err := tx . Exec ( "delete from run_ost where id = $1" , run . ID ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete run objectstorage: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-04-27 13:16:48 +00:00
q , args , err := runOSTInsert . Values ( run . ID , groupPath , run . Phase ) . ToSql ( )
2019-02-21 14:54:50 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
}
2019-03-29 11:15:48 +00:00
// poor man insert or update that works because transaction isolation level is serializable
2019-04-27 13:16:48 +00:00
if _ , err := tx . Exec ( "delete from rundata_ost where id = $1" , run . ID ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete rundata: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-04-27 13:16:48 +00:00
q , args , err = rundataOSTInsert . Values ( run . ID , data ) . ToSql ( )
2019-03-29 11:15:48 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-03-29 11:15:48 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
2019-02-21 14:54:50 +00:00
}
return nil
}
func insertChangeGroupRevision ( tx * db . Tx , changegroupID string , revision int64 ) error {
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from changegrouprevision where id = $1" , changegroupID ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete run: %w" , err )
2019-02-21 14:54:50 +00:00
}
q , args , err := changegrouprevisionInsert . Values ( changegroupID , revision ) . ToSql ( )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
}
return nil
}
func ( r * ReadDB ) GetRevision ( ) ( int64 , error ) {
var revision int64
err := r . rdb . Do ( func ( tx * db . Tx ) error {
var err error
revision , err = r . getRevision ( tx )
return err
} )
return revision , err
}
func ( r * ReadDB ) getRevision ( tx * db . Tx ) ( int64 , error ) {
var revision int64
q , args , err := revisionSelect . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return 0 , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-04-01 10:54:43 +00:00
err = tx . QueryRow ( q , args ... ) . Scan ( & revision )
if err == sql . ErrNoRows {
2019-02-21 14:54:50 +00:00
return 0 , nil
}
return revision , err
}
func ( r * ReadDB ) GetChangeGroupsUpdateTokens ( tx * db . Tx , groups [ ] string ) ( * types . ChangeGroupsUpdateToken , error ) {
s := changegrouprevisionSelect . Where ( sq . Eq { "id" : groups } )
q , args , err := s . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
changeGroupsRevisions , err := fetchChangeGroupsRevision ( tx , q , args ... )
if err != nil {
return nil , err
}
revision , err := r . getRevision ( tx )
if err != nil {
return nil , err
}
// for non existing changegroups use a changegroup with revision = 0
for _ , g := range groups {
if _ , ok := changeGroupsRevisions [ g ] ; ! ok {
changeGroupsRevisions [ g ] = 0
}
}
return & types . ChangeGroupsUpdateToken { CurRevision : revision , ChangeGroupsRevisions : changeGroupsRevisions } , nil
}
2019-03-29 11:00:18 +00:00
func ( r * ReadDB ) GetActiveRuns ( tx * db . Tx , groups [ ] string , lastRun bool , phaseFilter [ ] types . RunPhase , startRunID string , limit int , sortOrder types . SortOrder ) ( [ ] * RunData , error ) {
return r . getRunsFilteredActive ( tx , groups , lastRun , phaseFilter , startRunID , limit , sortOrder )
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
func ( r * ReadDB ) GetRuns ( tx * db . Tx , groups [ ] string , lastRun bool , phaseFilter [ ] types . RunPhase , startRunID string , limit int , sortOrder types . SortOrder ) ( [ ] * types . Run , error ) {
2019-04-27 13:16:48 +00:00
useObjectStorage := false
2019-02-21 14:54:50 +00:00
for _ , phase := range phaseFilter {
2019-03-29 11:15:48 +00:00
if phase == types . RunPhaseFinished || phase == types . RunPhaseCancelled {
2019-04-27 13:16:48 +00:00
useObjectStorage = true
2019-02-21 14:54:50 +00:00
}
}
if len ( phaseFilter ) == 0 {
2019-04-27 13:16:48 +00:00
useObjectStorage = true
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
runDataRDB , err := r . getRunsFilteredActive ( tx , groups , lastRun , phaseFilter , startRunID , limit , sortOrder )
2019-02-21 14:54:50 +00:00
if err != nil {
return nil , err
}
2019-03-29 11:00:18 +00:00
lastRunsMap := map [ string ] * RunData { }
runsMap := map [ string ] * RunData { }
for _ , r := range runDataRDB {
runsMap [ r . ID ] = r
lastRunsMap [ r . GroupPath ] = r
2019-02-21 14:54:50 +00:00
}
2019-04-27 13:16:48 +00:00
if useObjectStorage {
2019-03-29 11:00:18 +00:00
// skip if the phase requested is not finished
2019-04-27 13:16:48 +00:00
runDataOST , err := r . GetRunsFilteredOST ( tx , groups , lastRun , phaseFilter , startRunID , limit , sortOrder )
2019-03-29 11:00:18 +00:00
if err != nil {
return nil , err
}
2019-02-21 14:54:50 +00:00
2019-04-27 13:16:48 +00:00
for _ , rd := range runDataOST {
2019-03-29 11:00:18 +00:00
if lastRun {
if lr , ok := lastRunsMap [ rd . GroupPath ] ; ok {
switch sortOrder {
case types . SortOrderAsc :
if rd . ID < lr . ID {
lastRunsMap [ rd . GroupPath ] = rd
}
case types . SortOrderDesc :
if rd . ID > lr . ID {
lastRunsMap [ rd . GroupPath ] = rd
}
}
} else {
lastRunsMap [ rd . GroupPath ] = rd
runsMap [ rd . ID ] = rd
}
} else {
runsMap [ rd . ID ] = rd
}
2019-02-21 14:54:50 +00:00
}
}
var keys [ ] string
for k := range runsMap {
keys = append ( keys , k )
}
switch sortOrder {
case types . SortOrderAsc :
sort . Sort ( sort . StringSlice ( keys ) )
case types . SortOrderDesc :
sort . Sort ( sort . Reverse ( sort . StringSlice ( keys ) ) )
}
aruns := make ( [ ] * types . Run , 0 , len ( runsMap ) )
count := 0
for _ , runID := range keys {
if count >= limit {
break
}
count ++
2019-03-29 11:00:18 +00:00
rd := runsMap [ runID ]
if rd . Run != nil {
aruns = append ( aruns , rd . Run )
2019-02-21 14:54:50 +00:00
continue
}
2019-04-27 13:16:48 +00:00
// get run from objectstorage
2019-04-26 14:00:03 +00:00
run , err := store . OSTGetRun ( r . dm , runID )
2019-02-21 14:54:50 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , err
2019-02-21 14:54:50 +00:00
}
aruns = append ( aruns , run )
}
return aruns , nil
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) getRunsFilteredQuery ( phaseFilter [ ] types . RunPhase , groups [ ] string , lastRun bool , startRunID string , limit int , sortOrder types . SortOrder , objectstorage bool ) sq . SelectBuilder {
2019-02-21 14:54:50 +00:00
runt := "run"
2019-03-29 11:15:48 +00:00
rundatat := "rundata"
fields := [ ] string { "run.id" , "run.grouppath" , "run.phase" , "rundata.data" }
if len ( groups ) > 0 && lastRun {
fields = [ ] string { "max(run.id)" , "run.grouppath" , "run.phase" , "rundata.data" }
}
2019-04-27 13:16:48 +00:00
if objectstorage {
runt = "run_ost"
rundatat = "rundata_ost"
2019-02-21 14:54:50 +00:00
}
r . log . Debugf ( "runt: %s" , runt )
s := sb . Select ( fields ... ) . From ( runt + " as run" )
switch sortOrder {
case types . SortOrderAsc :
s = s . OrderBy ( "run.id asc" )
case types . SortOrderDesc :
s = s . OrderBy ( "run.id desc" )
}
if len ( phaseFilter ) > 0 {
s = s . Where ( sq . Eq { "phase" : phaseFilter } )
}
if startRunID != "" {
2019-05-23 21:21:45 +00:00
if lastRun {
switch sortOrder {
case types . SortOrderAsc :
s = s . Having ( sq . Gt { "run.id" : startRunID } )
case types . SortOrderDesc :
s = s . Having ( sq . Lt { "run.id" : startRunID } )
}
} else {
switch sortOrder {
case types . SortOrderAsc :
s = s . Where ( sq . Gt { "run.id" : startRunID } )
case types . SortOrderDesc :
s = s . Where ( sq . Lt { "run.id" : startRunID } )
}
2019-02-21 14:54:50 +00:00
}
}
if limit > 0 {
s = s . Limit ( uint64 ( limit ) )
}
2019-03-29 11:15:48 +00:00
s = s . Join ( fmt . Sprintf ( "%s as rundata on rundata.id = run.id" , rundatat ) )
2019-02-21 14:54:50 +00:00
if len ( groups ) > 0 {
cond := sq . Or { }
2019-03-29 11:00:18 +00:00
for _ , groupPath := range groups {
// add ending slash to distinguish between final group (i.e project/projectid/branch/feature and project/projectid/branch/feature02)
if ! strings . HasSuffix ( groupPath , "/" ) {
groupPath += "/"
}
cond = append ( cond , sq . Like { "run.grouppath" : groupPath + "%" } )
2019-02-21 14:54:50 +00:00
}
s = s . Where ( sq . Or { cond } )
2019-03-29 11:00:18 +00:00
if lastRun {
s = s . GroupBy ( "run.grouppath" )
}
2019-02-21 14:54:50 +00:00
}
return s
}
2019-03-29 11:00:18 +00:00
func ( r * ReadDB ) getRunsFilteredActive ( tx * db . Tx , groups [ ] string , lastRun bool , phaseFilter [ ] types . RunPhase , startRunID string , limit int , sortOrder types . SortOrder ) ( [ ] * RunData , error ) {
s := r . getRunsFilteredQuery ( phaseFilter , groups , lastRun , startRunID , limit , sortOrder , false )
2019-02-21 14:54:50 +00:00
q , args , err := s . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
return fetchRuns ( tx , q , args ... )
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) GetRunsFilteredOST ( tx * db . Tx , groups [ ] string , lastRun bool , phaseFilter [ ] types . RunPhase , startRunID string , limit int , sortOrder types . SortOrder ) ( [ ] * RunData , error ) {
2019-03-29 11:00:18 +00:00
s := r . getRunsFilteredQuery ( phaseFilter , groups , lastRun , startRunID , limit , sortOrder , true )
2019-02-21 14:54:50 +00:00
q , args , err := s . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
return fetchRuns ( tx , q , args ... )
2019-02-21 14:54:50 +00:00
}
2019-05-06 12:55:10 +00:00
func ( r * ReadDB ) GetRun ( tx * db . Tx , runID string ) ( * types . Run , error ) {
run , err := r . getRun ( tx , runID , false )
if err != nil {
return nil , err
}
if run != nil {
return run , nil
}
2019-02-21 14:54:50 +00:00
2019-05-06 12:55:10 +00:00
// try to fetch from ost
return r . getRun ( tx , runID , true )
2019-02-21 14:54:50 +00:00
}
2019-05-06 12:55:10 +00:00
func ( r * ReadDB ) getRun ( tx * db . Tx , runID string , ost bool ) ( * types . Run , error ) {
s := r . getRunQuery ( runID , ost )
q , args , err := s . ToSql ( )
2019-02-21 14:54:50 +00:00
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-05-06 12:55:10 +00:00
runsData , err := fetchRuns ( tx , q , args ... )
2019-02-21 14:54:50 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , err
2019-02-21 14:54:50 +00:00
}
2019-05-06 12:55:10 +00:00
if len ( runsData ) > 1 {
2019-02-21 14:54:50 +00:00
return nil , errors . Errorf ( "too many rows returned" )
}
2019-05-06 12:55:10 +00:00
if len ( runsData ) == 0 {
2019-02-21 14:54:50 +00:00
return nil , nil
}
2019-05-06 12:55:10 +00:00
run := runsData [ 0 ] . Run
if run == nil {
var err error
if ! ost {
return nil , errors . Errorf ( "nil active run data. This should never happen" )
}
// get run from objectstorage
run , err = store . OSTGetRun ( r . dm , runID )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , err
2019-05-06 12:55:10 +00:00
}
}
return run , nil
}
func ( r * ReadDB ) getRunQuery ( runID string , objectstorage bool ) sq . SelectBuilder {
runt := "run"
rundatat := "rundata"
fields := [ ] string { "run.id" , "run.grouppath" , "run.phase" , "rundata.data" }
if objectstorage {
runt = "run_ost"
rundatat = "rundata_ost"
}
s := sb . Select ( fields ... ) . From ( runt + " as run" ) . Where ( sq . Eq { "run.id" : runID } )
s = s . Join ( fmt . Sprintf ( "%s as rundata on rundata.id = run.id" , rundatat ) )
return s
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
type RunData struct {
ID string
GroupPath string
Phase string
Run * types . Run
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
func fetchRuns ( tx * db . Tx , q string , args ... interface { } ) ( [ ] * RunData , error ) {
2019-02-21 14:54:50 +00:00
rows , err := tx . Query ( q , args ... )
if err != nil {
return nil , err
}
defer rows . Close ( )
2019-03-29 11:00:18 +00:00
return scanRuns ( rows )
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
func scanRun ( rows * sql . Rows ) ( * RunData , error ) {
r := & RunData { }
2019-02-21 14:54:50 +00:00
var data [ ] byte
2019-03-29 11:00:18 +00:00
if err := rows . Scan ( & r . ID , & r . GroupPath , & r . Phase , & data ) ; err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to scan rows: %w" , err )
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
if len ( data ) > 0 {
if err := json . Unmarshal ( data , & r . Run ) ; err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to unmarshal run: %w" , err )
2019-03-29 11:00:18 +00:00
}
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
return r , nil
2019-02-21 14:54:50 +00:00
}
2019-03-29 11:00:18 +00:00
func scanRuns ( rows * sql . Rows ) ( [ ] * RunData , error ) {
runs := [ ] * RunData { }
2019-02-21 14:54:50 +00:00
for rows . Next ( ) {
r , err := scanRun ( rows )
if err != nil {
return nil , err
}
runs = append ( runs , r )
}
if err := rows . Err ( ) ; err != nil {
return nil , err
}
return runs , nil
}
func fetchChangeGroupsRevision ( tx * db . Tx , q string , args ... interface { } ) ( types . ChangeGroupsRevisions , error ) {
rows , err := tx . Query ( q , args ... )
if err != nil {
return nil , err
}
defer rows . Close ( )
return scanChangeGroupsRevision ( rows )
}
2019-03-29 11:15:48 +00:00
func scanChangeGroupsRevision ( rows * sql . Rows ) ( types . ChangeGroupsRevisions , error ) {
changegroups := types . ChangeGroupsRevisions { }
2019-02-21 14:54:50 +00:00
for rows . Next ( ) {
var (
id string
revision int64
)
if err := rows . Scan ( & id , & revision ) ; err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to scan rows: %w" , err )
2019-02-21 14:54:50 +00:00
}
changegroups [ id ] = revision
}
if err := rows . Err ( ) ; err != nil {
return nil , err
}
return changegroups , nil
}
2019-04-01 10:54:43 +00:00
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) insertCommittedWalSequenceOST ( tx * db . Tx , seq string ) error {
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "insert seq: %s" , seq )
2019-04-01 10:54:43 +00:00
// poor man insert or update that works because transaction isolation level is serializable
2019-04-27 13:16:48 +00:00
if _ , err := tx . Exec ( "delete from committedwalsequence_ost" ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete committedwalsequence: %w" , err )
2019-04-01 10:54:43 +00:00
}
2019-04-27 13:16:48 +00:00
q , args , err := committedwalsequenceOSTInsert . Values ( seq ) . ToSql ( )
2019-04-01 10:54:43 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
2019-05-23 09:23:14 +00:00
return err
2019-04-01 10:54:43 +00:00
}
return nil
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) GetCommittedWalSequenceOST ( tx * db . Tx ) ( string , error ) {
2019-04-01 10:54:43 +00:00
var seq string
2019-04-27 13:16:48 +00:00
q , args , err := committedwalsequenceOSTSelect . OrderBy ( "seq" ) . Limit ( 1 ) . ToSql ( )
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return "" , errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
err = tx . QueryRow ( q , args ... ) . Scan ( & seq )
if err == sql . ErrNoRows {
return "" , nil
}
return seq , err
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) insertChangeGroupRevisionOST ( tx * db . Tx , changegroup string , revision int64 ) error {
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "insertChangeGroupRevision: %s %d" , changegroup , revision )
2019-04-01 10:54:43 +00:00
// poor man insert or update that works because transaction isolation level is serializable
2019-04-27 13:16:48 +00:00
if _ , err := tx . Exec ( "delete from changegrouprevision_ost where id = $1" , changegroup ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete run: %w" , err )
2019-04-01 10:54:43 +00:00
}
// insert only if revision > 0
if revision > 0 {
2019-04-27 13:16:48 +00:00
q , args , err := changegrouprevisionOSTInsert . Values ( changegroup , revision ) . ToSql ( )
2019-04-01 10:54:43 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
}
}
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) GetChangeGroupsUpdateTokensOST ( tx * db . Tx , groups [ ] string ) ( * datamanager . ChangeGroupsUpdateToken , error ) {
2019-04-27 13:16:48 +00:00
s := changegrouprevisionOSTSelect . Where ( sq . Eq { "id" : groups } )
2019-04-01 10:54:43 +00:00
q , args , err := s . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
2019-04-27 13:16:48 +00:00
cgr , err := fetchChangeGroupsRevisionOST ( tx , q , args ... )
2019-04-01 10:54:43 +00:00
if err != nil {
return nil , err
}
revision , err := r . getRevision ( tx )
if err != nil {
return nil , err
}
// for non existing changegroups use a changegroup with revision = 0
for _ , g := range groups {
if _ , ok := cgr [ g ] ; ! ok {
cgr [ g ] = 0
}
}
2019-04-26 14:00:03 +00:00
return & datamanager . ChangeGroupsUpdateToken { CurRevision : revision , ChangeGroupsRevisions : cgr } , nil
2019-04-01 10:54:43 +00:00
}
2019-04-27 13:16:48 +00:00
func fetchChangeGroupsRevisionOST ( tx * db . Tx , q string , args ... interface { } ) ( map [ string ] int64 , error ) {
2019-04-01 10:54:43 +00:00
rows , err := tx . Query ( q , args ... )
if err != nil {
return nil , err
}
defer rows . Close ( )
2019-07-02 12:53:01 +00:00
return scanChangeGroupsRevisionOST ( rows )
2019-04-01 10:54:43 +00:00
}
2019-04-27 13:16:48 +00:00
func scanChangeGroupsRevisionOST ( rows * sql . Rows ) ( map [ string ] int64 , error ) {
2019-04-01 10:54:43 +00:00
changegroups := map [ string ] int64 { }
for rows . Next ( ) {
var (
id string
revision int64
)
if err := rows . Scan ( & id , & revision ) ; err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to scan rows: %w" , err )
2019-04-01 10:54:43 +00:00
}
changegroups [ id ] = revision
}
if err := rows . Err ( ) ; err != nil {
return nil , err
}
return changegroups , nil
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) insertRunCounterOST ( tx * db . Tx , group string , counter uint64 ) error {
2019-04-01 10:54:43 +00:00
// poor man insert or update that works because transaction isolation level is serializable
2019-04-27 13:16:48 +00:00
if _ , err := tx . Exec ( "delete from runcounter_ost where groupid = $1" , group ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete revision: %w" , err )
2019-04-01 10:54:43 +00:00
}
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
//q, args, err = revisionInsert.Values(int64(wresp.Header.ClusterId), run.Revision).ToSql()
2019-04-27 13:16:48 +00:00
q , args , err := runcounterOSTInsert . Values ( group , counter ) . ToSql ( )
2019-04-01 10:54:43 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
2019-05-23 09:23:14 +00:00
return err
2019-04-01 10:54:43 +00:00
}
return nil
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) GetRunCounterOST ( tx * db . Tx , group string ) ( uint64 , error ) {
2019-04-01 10:54:43 +00:00
var g string
var counter uint64
2019-04-27 13:16:48 +00:00
q , args , err := runcounterOSTSelect . Where ( sq . Eq { "groupid" : group } ) . ToSql ( )
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return 0 , errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
err = tx . QueryRow ( q , args ... ) . Scan ( & g , & counter )
if err == sql . ErrNoRows {
return 0 , nil
}
return counter , err
}
2019-04-27 13:16:48 +00:00
func ( r * ReadDB ) GetRunCountersOST ( tx * db . Tx , start string , limit int ) ( [ ] * types . RunCounter , error ) {
s := runcounterOSTSelect . Where ( sq . Gt { "groupid" : start } )
2019-04-01 10:54:43 +00:00
if limit > 0 {
s = s . Limit ( uint64 ( limit ) )
}
s = s . OrderBy ( "groupid asc" )
q , args , err := s . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to build query: %w" , err )
2019-04-01 10:54:43 +00:00
}
return fetchRunCounters ( tx , q , args ... )
}
func fetchRunCounters ( tx * db . Tx , q string , args ... interface { } ) ( [ ] * types . RunCounter , error ) {
rows , err := tx . Query ( q , args ... )
if err != nil {
return nil , err
}
defer rows . Close ( )
return scanRunCounters ( rows )
}
func scanRunCounter ( rows * sql . Rows ) ( * types . RunCounter , error ) {
r := & types . RunCounter { }
if err := rows . Scan ( & r . Group , & r . Counter ) ; err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to scan rows: %w" , err )
2019-04-01 10:54:43 +00:00
}
return r , nil
}
func scanRunCounters ( rows * sql . Rows ) ( [ ] * types . RunCounter , error ) {
runCounters := [ ] * types . RunCounter { }
for rows . Next ( ) {
r , err := scanRunCounter ( rows )
if err != nil {
return nil , err
}
runCounters = append ( runCounters , r )
}
if err := rows . Err ( ) ; err != nil {
return nil , err
}
return runCounters , nil
}