2019-02-21 15:08:30 +00:00
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package readdb
import (
"context"
"database/sql"
"encoding/json"
"io"
"os"
"path/filepath"
2019-07-02 13:44:33 +00:00
"sync"
2019-02-21 15:08:30 +00:00
"time"
2019-07-01 09:40:20 +00:00
"agola.io/agola/internal/datamanager"
"agola.io/agola/internal/db"
"agola.io/agola/internal/etcd"
"agola.io/agola/internal/objectstorage"
ostypes "agola.io/agola/internal/objectstorage/types"
"agola.io/agola/internal/sequence"
2019-07-31 13:17:54 +00:00
"agola.io/agola/internal/services/configstore/types"
2019-07-01 09:40:20 +00:00
"agola.io/agola/internal/util"
2019-02-21 15:08:30 +00:00
sq "github.com/Masterminds/squirrel"
"go.uber.org/zap"
2019-05-23 09:23:14 +00:00
errors "golang.org/x/xerrors"
2019-02-21 15:08:30 +00:00
)
var (
// Use postgresql $ placeholder. It'll be converted to ? from the provided db functions
sb = sq . StatementBuilder . PlaceholderFormat ( sq . Dollar )
revisionSelect = sb . Select ( "revision" ) . From ( "revision" )
revisionInsert = sb . Insert ( "revision" ) . Columns ( "revision" )
committedwalsequenceSelect = sb . Select ( "seq" ) . From ( "committedwalsequence" )
committedwalsequenceInsert = sb . Insert ( "committedwalsequence" ) . Columns ( "seq" )
changegrouprevisionSelect = sb . Select ( "id, revision" ) . From ( "changegrouprevision" )
changegrouprevisionInsert = sb . Insert ( "changegrouprevision" ) . Columns ( "id" , "revision" )
)
type ReadDB struct {
log * zap . SugaredLogger
dataDir string
e * etcd . Store
rdb * db . DB
2019-04-27 13:16:48 +00:00
ost * objectstorage . ObjStorage
2019-04-26 14:00:03 +00:00
dm * datamanager . DataManager
2019-02-21 15:08:30 +00:00
Initialized bool
2019-07-02 13:44:33 +00:00
initLock sync . Mutex
2019-02-21 15:08:30 +00:00
}
2019-04-26 14:00:03 +00:00
func NewReadDB ( ctx context . Context , logger * zap . Logger , dataDir string , e * etcd . Store , ost * objectstorage . ObjStorage , dm * datamanager . DataManager ) ( * ReadDB , error ) {
2019-02-21 15:08:30 +00:00
if err := os . MkdirAll ( dataDir , 0770 ) ; err != nil {
return nil , err
}
readDB := & ReadDB {
log : logger . Sugar ( ) ,
dataDir : dataDir ,
e : e ,
2019-04-27 13:16:48 +00:00
ost : ost ,
2019-04-26 14:00:03 +00:00
dm : dm ,
2019-02-21 15:08:30 +00:00
}
return readDB , nil
}
2019-07-02 13:44:33 +00:00
func ( r * ReadDB ) SetInitialized ( initialized bool ) {
r . initLock . Lock ( )
r . Initialized = initialized
r . initLock . Unlock ( )
}
func ( r * ReadDB ) IsInitialized ( ) bool {
r . initLock . Lock ( )
defer r . initLock . Unlock ( )
return r . Initialized
}
2019-02-21 15:08:30 +00:00
// Initialize populates the readdb with the current etcd data and save the
// revision to then feed it with the etcd events
func ( r * ReadDB ) Initialize ( ctx context . Context ) error {
// sync the rdb
if err := r . SyncRDB ( ctx ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "error syncing db: %w" , err )
2019-02-21 15:08:30 +00:00
}
return nil
}
2019-07-25 08:46:02 +00:00
func ( r * ReadDB ) ResetDB ( ctx context . Context ) error {
2019-02-21 15:08:30 +00:00
// TODO(sgotti) this needs to be protected by a mutex
2019-07-24 13:20:41 +00:00
if r . rdb != nil {
r . rdb . Close ( )
}
2019-02-21 15:08:30 +00:00
// drop rdb
if err := os . Remove ( filepath . Join ( r . dataDir , "db" ) ) ; err != nil {
return err
}
rdb , err := db . NewDB ( db . Sqlite3 , filepath . Join ( r . dataDir , "db" ) )
if err != nil {
return err
}
// populate readdb
2019-07-25 08:46:02 +00:00
if err := rdb . Create ( ctx , Stmts ) ; err != nil {
2019-02-21 15:08:30 +00:00
return err
}
r . rdb = rdb
return nil
}
2019-07-25 08:46:02 +00:00
func ( r * ReadDB ) SyncFromDump ( ctx context . Context ) ( string , error ) {
2019-04-26 14:00:03 +00:00
dumpIndex , err := r . dm . GetLastDataStatus ( )
2019-05-21 13:17:53 +00:00
if err != nil && err != ostypes . ErrNotExist {
2019-05-23 09:23:14 +00:00
return "" , err
2019-02-21 15:08:30 +00:00
}
2019-05-21 13:17:53 +00:00
if err == ostypes . ErrNotExist {
2019-04-26 14:00:03 +00:00
return "" , nil
2019-02-21 15:08:30 +00:00
}
2019-04-26 14:00:03 +00:00
for dataType , files := range dumpIndex . Files {
2019-06-03 14:17:27 +00:00
for _ , file := range files {
2019-07-03 15:03:37 +00:00
dumpf , err := r . ost . ReadObject ( r . dm . DataFilePath ( dataType , file . ID ) )
2019-04-26 14:00:03 +00:00
if err != nil {
return "" , err
}
2019-06-03 14:17:27 +00:00
dumpEntries := [ ] * datamanager . DataEntry { }
dec := json . NewDecoder ( dumpf )
for {
var de * datamanager . DataEntry
err := dec . Decode ( & de )
if err == io . EOF {
// all done
break
2019-02-21 15:08:30 +00:00
}
2019-06-03 14:17:27 +00:00
if err != nil {
dumpf . Close ( )
return "" , err
2019-02-21 15:08:30 +00:00
}
2019-06-03 14:17:27 +00:00
dumpEntries = append ( dumpEntries , de )
}
dumpf . Close ( )
2019-07-25 08:46:02 +00:00
err = r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-06-03 14:17:27 +00:00
for _ , de := range dumpEntries {
action := & datamanager . Action {
ActionType : datamanager . ActionTypePut ,
ID : de . ID ,
DataType : dataType ,
Data : de . Data ,
}
if err := r . applyAction ( tx , action ) ; err != nil {
return err
}
}
return nil
} )
if err != nil {
return "" , err
2019-02-21 15:08:30 +00:00
}
}
}
2019-07-25 08:46:02 +00:00
err = r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-07-18 11:39:12 +00:00
if err := r . insertCommittedWalSequence ( tx , dumpIndex . WalSequence ) ; err != nil {
return err
}
return nil
} )
if err != nil {
return "" , err
}
2019-04-26 14:00:03 +00:00
return dumpIndex . WalSequence , nil
2019-02-21 15:08:30 +00:00
}
2019-07-25 08:46:02 +00:00
func ( r * ReadDB ) SyncFromWals ( ctx context . Context , startWalSeq , endWalSeq string ) ( string , error ) {
2019-04-26 14:00:03 +00:00
insertfunc := func ( walFiles [ ] * datamanager . WalFile ) error {
2019-07-25 08:46:02 +00:00
err := r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-02-21 15:08:30 +00:00
for _ , walFile := range walFiles {
2019-04-26 14:00:03 +00:00
walFilef , err := r . dm . ReadWal ( walFile . WalSequence )
2019-02-21 15:08:30 +00:00
if err != nil {
return err
}
dec := json . NewDecoder ( walFilef )
2019-04-26 14:00:03 +00:00
var header * datamanager . WalHeader
2019-02-21 15:08:30 +00:00
if err = dec . Decode ( & header ) ; err != nil && err != io . EOF {
walFilef . Close ( )
return err
}
walFilef . Close ( )
if err := r . insertCommittedWalSequence ( tx , walFile . WalSequence ) ; err != nil {
return err
}
if err := r . applyWal ( tx , header . WalDataFileID ) ; err != nil {
return err
}
}
return nil
} )
return err
}
lastWalSeq := startWalSeq
2019-04-26 14:00:03 +00:00
walFiles := [ ] * datamanager . WalFile { }
2019-02-21 15:08:30 +00:00
count := 0
doneCh := make ( chan struct { } )
defer close ( doneCh )
2019-04-26 14:00:03 +00:00
for walFile := range r . dm . ListOSTWals ( startWalSeq ) {
2019-02-21 15:08:30 +00:00
if walFile . Err != nil {
return "" , walFile . Err
}
walFiles = append ( walFiles , walFile )
lastWalSeq = walFile . WalSequence
if count > 100 {
if err := insertfunc ( walFiles ) ; err != nil {
return "" , err
}
count = 0
2019-04-26 14:00:03 +00:00
walFiles = [ ] * datamanager . WalFile { }
2019-02-21 15:08:30 +00:00
} else {
count ++
}
}
if err := insertfunc ( walFiles ) ; err != nil {
return "" , err
}
return lastWalSeq , nil
}
func ( r * ReadDB ) SyncRDB ( ctx context . Context ) error {
// get the last committed storage wal sequence saved in the rdb
curWalSeq := ""
2019-07-25 08:46:02 +00:00
err := r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-02-21 15:08:30 +00:00
var err error
curWalSeq , err = r . GetCommittedWalSequence ( tx )
if err != nil {
return err
}
return nil
} )
if err != nil {
return err
}
2019-04-26 14:00:03 +00:00
lastCommittedStorageWal , _ , err := r . dm . LastCommittedStorageWal ( ctx )
2019-02-21 15:08:30 +00:00
if err != nil {
return err
}
doFullSync := false
if curWalSeq == "" {
doFullSync = true
r . log . Warn ( "no startWalSeq in db, doing a full sync" )
} else {
2019-04-26 14:00:03 +00:00
ok , err := r . dm . HasOSTWal ( curWalSeq )
2019-02-21 15:08:30 +00:00
if err != nil {
return err
}
if ! ok {
2019-04-27 13:16:48 +00:00
r . log . Warnf ( "no wal with seq %q in objectstorage, doing a full sync" , curWalSeq )
2019-02-21 15:08:30 +00:00
doFullSync = true
}
2019-04-27 13:16:48 +00:00
// if the epoch of the wals has changed this means etcd has been reset. If so
// we should do a full resync since we are saving in the rdb also data that
// was not yet committed to objectstorage so we should have the rdb ahead of
// the current objectstorage data
2019-02-21 15:08:30 +00:00
// TODO(sgotti) improve this to avoid doing a full resync
curWalSequence , err := sequence . Parse ( curWalSeq )
if err != nil {
return err
}
curWalEpoch := curWalSequence . Epoch
lastCommittedStorageWalSequence , err := sequence . Parse ( lastCommittedStorageWal )
if err != nil {
return err
}
if curWalEpoch != lastCommittedStorageWalSequence . Epoch {
r . log . Warnf ( "current rdb wal sequence epoch %d different than new wal sequence epoch %d, doing a full sync" , curWalEpoch , lastCommittedStorageWalSequence . Epoch )
doFullSync = true
}
}
if doFullSync {
2019-04-26 14:00:03 +00:00
r . log . Infof ( "doing a full sync from dump" )
2019-07-25 08:46:02 +00:00
if err := r . ResetDB ( ctx ) ; err != nil {
2019-02-21 15:08:30 +00:00
return err
}
var err error
2019-07-25 08:46:02 +00:00
curWalSeq , err = r . SyncFromDump ( ctx )
2019-02-21 15:08:30 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return err
2019-02-21 15:08:30 +00:00
}
}
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "startWalSeq: %s" , curWalSeq )
2019-02-21 15:08:30 +00:00
// Sync from wals
2019-04-27 13:16:48 +00:00
// sync from objectstorage until the current known lastCommittedStorageWal in
// etcd since wals are first committed to objectstorage and then in etcd we
// would like to avoid to store in rdb something that is not yet marked as
// committedstorage in etcd
2019-07-25 08:46:02 +00:00
curWalSeq , err = r . SyncFromWals ( ctx , curWalSeq , lastCommittedStorageWal )
2019-02-21 15:08:30 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to sync from wals: %w" , err )
2019-02-21 15:08:30 +00:00
}
// Get the first available wal from etcd and check that our current walseq
2019-04-27 13:16:48 +00:00
// from wals on objectstorage is >=
2019-02-21 15:08:30 +00:00
// if not (this happens when syncFromWals takes some time and in the meantime
// many new wals are written, the next sync should be faster and able to continue
2019-04-26 14:00:03 +00:00
firstAvailableWalData , revision , err := r . dm . FirstAvailableWalData ( ctx )
2019-02-21 15:08:30 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to get first available wal data: %w" , err )
2019-02-21 15:08:30 +00:00
}
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "firstAvailableWalData: %s" , util . Dump ( firstAvailableWalData ) )
r . log . Debugf ( "revision: %d" , revision )
2019-02-21 15:08:30 +00:00
if firstAvailableWalData == nil {
2019-07-18 11:43:28 +00:00
return errors . Errorf ( "no wal data in etcd" )
2019-02-21 15:08:30 +00:00
}
2019-07-18 11:43:28 +00:00
if curWalSeq < firstAvailableWalData . WalSequence {
return errors . Errorf ( "current applied wal seq %q is smaller than the first available wal in etcd %q" , curWalSeq , firstAvailableWalData . WalSequence )
2019-02-21 15:08:30 +00:00
}
2019-07-17 14:58:43 +00:00
r . log . Infof ( "syncing from wals" )
2019-07-25 08:46:02 +00:00
err = r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-02-21 15:08:30 +00:00
if err := r . insertRevision ( tx , revision ) ; err != nil {
return err
}
// use the same revision as previous operation
2019-04-26 14:00:03 +00:00
for walElement := range r . dm . ListEtcdWals ( ctx , revision ) {
2019-02-21 15:08:30 +00:00
if walElement . Err != nil {
2019-07-17 14:58:43 +00:00
return walElement . Err
2019-02-21 15:08:30 +00:00
}
if walElement . WalData . WalSequence <= curWalSeq {
continue
}
2019-06-03 16:02:09 +00:00
// update readdb only when the wal has been committed to etcd
if walElement . WalData . WalStatus != datamanager . WalStatusCommitted {
return nil
}
2019-02-21 15:08:30 +00:00
2019-07-17 14:58:43 +00:00
if err := r . insertCommittedWalSequence ( tx , walElement . WalData . WalSequence ) ; err != nil {
return err
}
2019-02-21 15:08:30 +00:00
r . log . Debugf ( "applying wal to db" )
if err := r . applyWal ( tx , walElement . WalData . WalDataFileID ) ; err != nil {
return err
}
}
2019-04-29 08:15:44 +00:00
// sync changegroups, use the same revision of previous operations
changeGroupsRevisions , err := r . dm . ListEtcdChangeGroups ( ctx , revision )
if err != nil {
return err
}
for changeGroupID , changeGroupRevision := range changeGroupsRevisions {
if err := r . insertChangeGroupRevision ( tx , changeGroupID , changeGroupRevision ) ; err != nil {
return err
}
}
2019-02-21 15:08:30 +00:00
return nil
} )
return err
}
func ( r * ReadDB ) Run ( ctx context . Context ) error {
2019-07-24 13:20:41 +00:00
if r . rdb != nil {
r . rdb . Close ( )
}
rdb , err := db . NewDB ( db . Sqlite3 , filepath . Join ( r . dataDir , "db" ) )
if err != nil {
return err
}
r . rdb = rdb
// populate readdb
if err := r . rdb . Create ( ctx , Stmts ) ; err != nil {
return err
}
2019-07-25 08:46:02 +00:00
revision , err := r . GetRevision ( ctx )
2019-02-21 15:08:30 +00:00
if err != nil {
return err
}
2019-07-02 13:44:33 +00:00
if revision == 0 || ! r . Initialized {
2019-02-21 15:08:30 +00:00
for {
err := r . Initialize ( ctx )
if err == nil {
break
}
r . log . Errorf ( "initialize err: %+v" , err )
2019-03-14 13:36:18 +00:00
2019-07-25 13:53:26 +00:00
sleepCh := time . NewTimer ( 1 * time . Second ) . C
select {
case <- ctx . Done ( ) :
return nil
case <- sleepCh :
}
2019-02-21 15:08:30 +00:00
}
}
2019-07-02 13:44:33 +00:00
r . SetInitialized ( true )
2019-02-21 15:08:30 +00:00
for {
2019-07-02 13:44:33 +00:00
for {
initialized := r . IsInitialized ( )
if initialized {
break
}
err := r . Initialize ( ctx )
if err == nil {
r . SetInitialized ( true )
break
}
r . log . Errorf ( "initialize err: %+v" , err )
2019-07-25 13:53:26 +00:00
sleepCh := time . NewTimer ( 1 * time . Second ) . C
select {
case <- ctx . Done ( ) :
return nil
case <- sleepCh :
}
2019-02-21 15:08:30 +00:00
}
2019-07-23 12:24:21 +00:00
errCh := make ( chan error , 1 )
2019-07-25 13:53:26 +00:00
hctx , cancel := context . WithCancel ( ctx )
2019-07-02 13:44:33 +00:00
wg := & sync . WaitGroup { }
wg . Add ( 1 )
go func ( ) {
r . log . Infof ( "starting handleEvents" )
2019-07-25 13:53:26 +00:00
if err := r . handleEvents ( hctx ) ; err != nil {
2019-07-02 13:44:33 +00:00
r . log . Errorf ( "handleEvents err: %+v" , err )
errCh <- err
}
wg . Done ( )
} ( )
2019-02-21 15:08:30 +00:00
select {
case <- ctx . Done ( ) :
r . log . Infof ( "readdb exiting" )
2019-07-02 13:44:33 +00:00
cancel ( )
2019-02-21 15:08:30 +00:00
return nil
2019-07-02 13:44:33 +00:00
case <- errCh :
// cancel context and wait for the all the goroutines to exit
cancel ( )
wg . Wait ( )
2019-02-21 15:08:30 +00:00
}
2019-07-25 13:53:26 +00:00
sleepCh := time . NewTimer ( 1 * time . Second ) . C
select {
case <- ctx . Done ( ) :
return nil
case <- sleepCh :
}
2019-02-21 15:08:30 +00:00
}
}
// TODO(sgotti) improve to apply when the wal have been "committedstorage" and
// not only "committed", in this way we don't have to full resync when etcd is
// lost/reset
2019-07-02 13:44:33 +00:00
func ( r * ReadDB ) handleEvents ( ctx context . Context ) error {
2019-02-21 15:08:30 +00:00
var revision int64
2019-07-25 08:46:02 +00:00
err := r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-02-21 15:08:30 +00:00
err := tx . QueryRow ( "select revision from revision order by revision desc limit 1" ) . Scan ( & revision )
if err != nil {
if err == sql . ErrNoRows {
revision = 0
} else {
return err
}
}
return nil
} )
if err != nil {
return err
}
wctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "revision: %d" , revision )
2019-04-26 14:00:03 +00:00
wch := r . dm . Watch ( wctx , revision + 1 )
2019-02-21 15:08:30 +00:00
for we := range wch {
r . log . Debugf ( "we: %s" , util . Dump ( we ) )
if we . Err != nil {
err := we . Err
2019-04-26 14:00:03 +00:00
if err == datamanager . ErrCompacted {
2019-02-21 15:08:30 +00:00
r . log . Warnf ( "required events already compacted, reinitializing readdb" )
r . Initialized = false
return nil
}
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "watch error: %w" , err )
2019-02-21 15:08:30 +00:00
}
// a single transaction for every response (every response contains all the
// events happened in an etcd revision).
2019-07-25 08:46:02 +00:00
err = r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-02-21 15:08:30 +00:00
// if theres a wal seq epoch change something happened to etcd, usually (if
// the user hasn't messed up with etcd keys) this means etcd has been reset
2019-04-27 13:16:48 +00:00
// in such case we should resync from the objectstorage state to ensure we
// apply all the wal marked as committedstorage (since they could have been
// lost from etcd)
2019-02-21 15:08:30 +00:00
curWalSeq , err := r . GetCommittedWalSequence ( tx )
if err != nil {
return err
}
r . log . Debugf ( "curWalSeq: %q" , curWalSeq )
if curWalSeq != "" && we . WalData != nil {
curWalSequence , err := sequence . Parse ( curWalSeq )
if err != nil {
return err
}
curWalEpoch := curWalSequence . Epoch
weWalSequence , err := sequence . Parse ( we . WalData . WalSequence )
if err != nil {
return err
}
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "we.WalData.WalSequence: %q" , we . WalData . WalSequence )
2019-02-21 15:08:30 +00:00
weWalEpoch := weWalSequence . Epoch
if curWalEpoch != weWalEpoch {
r . Initialized = false
2019-04-27 13:16:48 +00:00
return errors . Errorf ( "current rdb wal sequence epoch %d different than new wal sequence epoch %d, resyncing from objectstorage" , curWalEpoch , weWalEpoch )
2019-02-21 15:08:30 +00:00
}
}
if err := r . handleEvent ( tx , we ) ; err != nil {
return err
}
if err := r . insertRevision ( tx , we . Revision ) ; err != nil {
return err
}
return nil
} )
if err != nil {
return err
}
}
r . log . Infof ( "wch closed" )
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) handleEvent ( tx * db . Tx , we * datamanager . WatchElement ) error {
2019-02-21 15:08:30 +00:00
//r.log.Debugf("event: %s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
//key := string(ev.Kv.Key)
if err := r . handleWalEvent ( tx , we ) ; err != nil {
return err
}
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) handleWalEvent ( tx * db . Tx , we * datamanager . WatchElement ) error {
2019-02-21 15:08:30 +00:00
for cgName , cgRev := range we . ChangeGroupsRevisions {
if err := r . insertChangeGroupRevision ( tx , cgName , cgRev ) ; err != nil {
return err
}
}
if we . WalData != nil {
2019-06-03 16:02:09 +00:00
// update readdb only when the wal has been committed to etcd
if we . WalData . WalStatus != datamanager . WalStatusCommitted {
return nil
}
if err := r . insertCommittedWalSequence ( tx , we . WalData . WalSequence ) ; err != nil {
return err
}
2019-02-21 15:08:30 +00:00
r . log . Debugf ( "applying wal to db" )
return r . applyWal ( tx , we . WalData . WalDataFileID )
}
return nil
}
func ( r * ReadDB ) applyWal ( tx * db . Tx , walDataFileID string ) error {
2019-04-26 14:00:03 +00:00
walFile , err := r . dm . ReadWalData ( walDataFileID )
2019-02-21 15:08:30 +00:00
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "cannot read wal data file %q: %w" , walDataFileID , err )
2019-02-21 15:08:30 +00:00
}
defer walFile . Close ( )
dec := json . NewDecoder ( walFile )
for {
2019-04-26 14:00:03 +00:00
var action * datamanager . Action
2019-02-21 15:08:30 +00:00
err := dec . Decode ( & action )
if err == io . EOF {
// all done
break
}
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to decode wal file: %w" , err )
2019-02-21 15:08:30 +00:00
}
if err := r . applyAction ( tx , action ) ; err != nil {
return err
}
}
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) applyAction ( tx * db . Tx , action * datamanager . Action ) error {
2019-02-21 15:08:30 +00:00
switch action . ActionType {
2019-04-26 14:00:03 +00:00
case datamanager . ActionTypePut :
2019-04-01 10:54:43 +00:00
switch types . ConfigType ( action . DataType ) {
2019-03-14 13:36:18 +00:00
case types . ConfigTypeUser :
2019-02-21 15:08:30 +00:00
if err := r . insertUser ( tx , action . Data ) ; err != nil {
return err
}
2019-03-14 13:36:18 +00:00
case types . ConfigTypeOrg :
2019-02-28 14:52:35 +00:00
if err := r . insertOrg ( tx , action . Data ) ; err != nil {
return err
}
2019-05-03 15:40:07 +00:00
case types . ConfigTypeOrgMember :
if err := r . insertOrgMember ( tx , action . Data ) ; err != nil {
return err
}
2019-03-14 13:36:18 +00:00
case types . ConfigTypeProjectGroup :
if err := r . insertProjectGroup ( tx , action . Data ) ; err != nil {
return err
}
case types . ConfigTypeProject :
if err := r . insertProject ( tx , action . Data ) ; err != nil {
return err
}
case types . ConfigTypeRemoteSource :
2019-02-21 15:08:30 +00:00
if err := r . insertRemoteSource ( tx , action . Data ) ; err != nil {
return err
}
2019-03-14 13:36:18 +00:00
case types . ConfigTypeSecret :
if err := r . insertSecret ( tx , action . Data ) ; err != nil {
return err
}
case types . ConfigTypeVariable :
if err := r . insertVariable ( tx , action . Data ) ; err != nil {
return err
}
2019-02-21 15:08:30 +00:00
}
2019-04-26 14:00:03 +00:00
case datamanager . ActionTypeDelete :
2019-04-01 10:54:43 +00:00
switch types . ConfigType ( action . DataType ) {
2019-03-14 13:36:18 +00:00
case types . ConfigTypeUser :
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "deleting user with id: %s" , action . ID )
if err := r . deleteUser ( tx , action . ID ) ; err != nil {
2019-02-21 15:08:30 +00:00
return err
}
2019-03-14 13:36:18 +00:00
case types . ConfigTypeOrg :
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "deleting org with id: %s" , action . ID )
if err := r . deleteOrg ( tx , action . ID ) ; err != nil {
2019-02-28 14:52:35 +00:00
return err
}
2019-05-03 15:40:07 +00:00
case types . ConfigTypeOrgMember :
r . log . Debugf ( "deleting orgmember with id: %s" , action . ID )
if err := r . deleteOrgMember ( tx , action . ID ) ; err != nil {
return err
}
2019-03-14 13:36:18 +00:00
case types . ConfigTypeProjectGroup :
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "deleting project group with id: %s" , action . ID )
if err := r . deleteProjectGroup ( tx , action . ID ) ; err != nil {
2019-03-14 13:36:18 +00:00
return err
}
case types . ConfigTypeProject :
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "deleting project with id: %s" , action . ID )
if err := r . deleteProject ( tx , action . ID ) ; err != nil {
2019-03-14 13:36:18 +00:00
return err
}
case types . ConfigTypeRemoteSource :
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "deleting remote source with id: %s" , action . ID )
if err := r . deleteRemoteSource ( tx , action . ID ) ; err != nil {
2019-02-21 15:08:30 +00:00
return err
}
2019-03-14 13:36:18 +00:00
case types . ConfigTypeSecret :
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "deleting secret with id: %s" , action . ID )
if err := r . deleteSecret ( tx , action . ID ) ; err != nil {
2019-03-14 13:36:18 +00:00
return err
}
case types . ConfigTypeVariable :
2019-04-01 10:54:43 +00:00
r . log . Debugf ( "deleting variable with id: %s" , action . ID )
if err := r . deleteVariable ( tx , action . ID ) ; err != nil {
2019-03-14 13:36:18 +00:00
return err
}
2019-02-21 15:08:30 +00:00
}
}
return nil
}
2019-07-25 08:46:02 +00:00
func ( r * ReadDB ) Do ( ctx context . Context , f func ( tx * db . Tx ) error ) error {
return r . rdb . Do ( ctx , f )
2019-02-21 15:08:30 +00:00
}
func ( r * ReadDB ) insertRevision ( tx * db . Tx , revision int64 ) error {
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from revision" ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete revision: %w" , err )
2019-02-21 15:08:30 +00:00
}
// TODO(sgotti) go database/sql and mattn/sqlite3 don't support uint64 types...
q , args , err := revisionInsert . Values ( revision ) . ToSql ( )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 15:08:30 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
2019-05-23 09:23:14 +00:00
return err
2019-02-21 15:08:30 +00:00
}
return nil
}
2019-07-25 08:46:02 +00:00
func ( r * ReadDB ) GetRevision ( ctx context . Context ) ( int64 , error ) {
2019-02-21 15:08:30 +00:00
var revision int64
2019-07-25 08:46:02 +00:00
err := r . rdb . Do ( ctx , func ( tx * db . Tx ) error {
2019-02-21 15:08:30 +00:00
var err error
revision , err = r . getRevision ( tx )
return err
} )
return revision , err
}
func ( r * ReadDB ) getRevision ( tx * db . Tx ) ( int64 , error ) {
var revision int64
q , args , err := revisionSelect . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return 0 , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 15:08:30 +00:00
}
err = tx . QueryRow ( q , args ... ) . Scan ( & revision )
if err == sql . ErrNoRows {
return 0 , nil
}
return revision , err
}
func ( r * ReadDB ) insertCommittedWalSequence ( tx * db . Tx , seq string ) error {
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "insert seq: %s" , seq )
2019-02-21 15:08:30 +00:00
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from committedwalsequence" ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete committedwalsequence: %w" , err )
2019-02-21 15:08:30 +00:00
}
q , args , err := committedwalsequenceInsert . Values ( seq ) . ToSql ( )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 15:08:30 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
2019-05-23 09:23:14 +00:00
return err
2019-02-21 15:08:30 +00:00
}
return nil
}
func ( r * ReadDB ) GetCommittedWalSequence ( tx * db . Tx ) ( string , error ) {
var seq string
q , args , err := committedwalsequenceSelect . OrderBy ( "seq" ) . Limit ( 1 ) . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return "" , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 15:08:30 +00:00
}
err = tx . QueryRow ( q , args ... ) . Scan ( & seq )
if err == sql . ErrNoRows {
return "" , nil
}
return seq , err
}
func ( r * ReadDB ) insertChangeGroupRevision ( tx * db . Tx , changegroup string , revision int64 ) error {
2019-06-14 09:28:00 +00:00
r . log . Debugf ( "insertChangeGroupRevision: %s %d" , changegroup , revision )
2019-02-21 15:08:30 +00:00
// poor man insert or update that works because transaction isolation level is serializable
if _ , err := tx . Exec ( "delete from changegrouprevision where id = $1" , changegroup ) ; err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to delete run: %w" , err )
2019-02-21 15:08:30 +00:00
}
// insert only if revision > 0
if revision > 0 {
q , args , err := changegrouprevisionInsert . Values ( changegroup , revision ) . ToSql ( )
if err != nil {
2019-05-23 09:23:14 +00:00
return errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 15:08:30 +00:00
}
if _ , err = tx . Exec ( q , args ... ) ; err != nil {
return err
}
}
return nil
}
2019-04-26 14:00:03 +00:00
func ( r * ReadDB ) GetChangeGroupsUpdateTokens ( tx * db . Tx , groups [ ] string ) ( * datamanager . ChangeGroupsUpdateToken , error ) {
2019-02-21 15:08:30 +00:00
s := changegrouprevisionSelect . Where ( sq . Eq { "id" : groups } )
q , args , err := s . ToSql ( )
r . log . Debugf ( "q: %s, args: %s" , q , util . Dump ( args ) )
if err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to build query: %w" , err )
2019-02-21 15:08:30 +00:00
}
cgr , err := fetchChangeGroupsRevision ( tx , q , args ... )
if err != nil {
return nil , err
}
revision , err := r . getRevision ( tx )
if err != nil {
return nil , err
}
// for non existing changegroups use a changegroup with revision = 0
for _ , g := range groups {
if _ , ok := cgr [ g ] ; ! ok {
cgr [ g ] = 0
}
}
2019-04-26 14:00:03 +00:00
return & datamanager . ChangeGroupsUpdateToken { CurRevision : revision , ChangeGroupsRevisions : cgr } , nil
2019-02-21 15:08:30 +00:00
}
func fetchChangeGroupsRevision ( tx * db . Tx , q string , args ... interface { } ) ( map [ string ] int64 , error ) {
rows , err := tx . Query ( q , args ... )
if err != nil {
return nil , err
}
defer rows . Close ( )
return scanChangeGroupsRevision ( rows )
}
func scanChangeGroupsRevision ( rows * sql . Rows ) ( map [ string ] int64 , error ) {
changegroups := map [ string ] int64 { }
for rows . Next ( ) {
var (
id string
revision int64
)
if err := rows . Scan ( & id , & revision ) ; err != nil {
2019-05-23 09:23:14 +00:00
return nil , errors . Errorf ( "failed to scan rows: %w" , err )
2019-02-21 15:08:30 +00:00
}
changegroups [ id ] = revision
}
if err := rows . Err ( ) ; err != nil {
return nil , err
}
return changegroups , nil
}