2019-02-22 07:45:59 +00:00
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
2019-04-26 14:00:03 +00:00
package datamanager
2019-02-22 07:45:59 +00:00
import (
2019-07-17 15:16:35 +00:00
"bytes"
2019-02-22 07:45:59 +00:00
"context"
2019-06-03 14:17:27 +00:00
"encoding/json"
2019-02-22 07:45:59 +00:00
"fmt"
2019-06-03 14:17:27 +00:00
"io"
2019-02-22 07:45:59 +00:00
"io/ioutil"
"os"
2019-11-05 16:48:11 +00:00
"path"
2019-06-03 14:17:27 +00:00
"reflect"
"sort"
2019-11-05 16:48:11 +00:00
"strings"
2019-02-22 07:45:59 +00:00
"testing"
"time"
2019-07-01 09:40:20 +00:00
slog "agola.io/agola/internal/log"
"agola.io/agola/internal/objectstorage"
"agola.io/agola/internal/testutil"
2019-11-06 12:29:42 +00:00
"agola.io/agola/internal/util"
2019-02-22 07:45:59 +00:00
2019-11-06 12:29:42 +00:00
"github.com/google/go-cmp/cmp"
2019-02-22 07:45:59 +00:00
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
2019-11-06 12:29:42 +00:00
errors "golang.org/x/xerrors"
2019-02-22 07:45:59 +00:00
)
var level = zap . NewAtomicLevelAt ( zapcore . InfoLevel )
var logger = slog . New ( level )
func setupEtcd ( t * testing . T , dir string ) * testutil . TestEmbeddedEtcd {
tetcd , err := testutil . NewTestEmbeddedEtcd ( t , logger , dir )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if err := tetcd . Start ( ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if err := tetcd . WaitUp ( 30 * time . Second ) ; err != nil {
t . Fatalf ( "error waiting on store up: %v" , err )
}
return tetcd
}
func shutdownEtcd ( tetcd * testutil . TestEmbeddedEtcd ) {
if tetcd . Etcd != nil {
2019-07-02 12:27:51 +00:00
_ = tetcd . Kill ( )
2019-02-22 07:45:59 +00:00
}
}
func TestEtcdReset ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx , cancel := context . WithCancel ( context . Background ( ) )
2019-04-27 13:16:48 +00:00
ostDir , err := ioutil . TempDir ( dir , "ost" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-02-22 07:45:59 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-26 14:00:03 +00:00
dmConfig := & DataManagerConfig {
2019-02-22 07:45:59 +00:00
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
2019-04-27 13:16:48 +00:00
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
2019-02-22 07:45:59 +00:00
EtcdWalsKeepNum : 10 ,
2019-04-26 14:00:03 +00:00
DataTypes : [ ] string { "datatype01" } ,
2019-02-22 07:45:59 +00:00
}
2019-04-26 14:00:03 +00:00
dm , err := NewDataManager ( ctx , logger , dmConfig )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-26 14:00:03 +00:00
dmReadyCh := make ( chan struct { } )
2019-04-01 10:54:43 +00:00
2019-04-26 14:00:03 +00:00
t . Logf ( "starting datamanager" )
2019-07-02 12:27:51 +00:00
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
2019-04-26 14:00:03 +00:00
<- dmReadyCh
2019-02-22 07:45:59 +00:00
actions := [ ] * Action {
{
ActionType : ActionTypePut ,
2019-04-26 14:00:03 +00:00
DataType : "datatype01" ,
2019-02-22 07:45:59 +00:00
Data : [ ] byte ( "{}" ) ,
} ,
}
for i := 0 ; i < 20 ; i ++ {
2019-04-01 10:54:43 +00:00
objectID := fmt . Sprintf ( "object%02d" , i )
actions [ 0 ] . ID = objectID
2019-04-26 14:00:03 +00:00
if _ , err := dm . WriteWal ( ctx , actions , nil ) ; err != nil {
2019-02-22 07:45:59 +00:00
t . Fatalf ( "unexpected err: %v" , err )
}
}
// wait for wal to be committed storage
time . Sleep ( 5 * time . Second )
2019-04-26 14:00:03 +00:00
t . Logf ( "stopping datamanager" )
2019-04-01 10:54:43 +00:00
cancel ( )
t . Logf ( "stopping etcd" )
2019-02-22 07:45:59 +00:00
// Reset etcd
shutdownEtcd ( tetcd )
2019-07-02 12:27:51 +00:00
if err := tetcd . WaitDown ( 10 * time . Second ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-01 10:54:43 +00:00
t . Logf ( "resetting etcd" )
2019-02-22 07:45:59 +00:00
os . RemoveAll ( etcdDir )
2019-04-01 10:54:43 +00:00
t . Logf ( "starting etcd" )
tetcd = setupEtcd ( t , etcdDir )
2019-02-22 07:45:59 +00:00
if err := tetcd . Start ( ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer shutdownEtcd ( tetcd )
2019-04-01 10:54:43 +00:00
ctx , cancel = context . WithCancel ( context . Background ( ) )
2019-04-26 14:00:03 +00:00
defer cancel ( )
dmConfig = & DataManagerConfig {
2019-04-01 10:54:43 +00:00
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
2019-04-27 13:16:48 +00:00
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
2019-04-01 10:54:43 +00:00
EtcdWalsKeepNum : 10 ,
2019-04-26 14:00:03 +00:00
DataTypes : [ ] string { "datatype01" } ,
2019-04-01 10:54:43 +00:00
}
2019-04-26 14:00:03 +00:00
dm , err = NewDataManager ( ctx , logger , dmConfig )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-26 14:00:03 +00:00
dmReadyCh = make ( chan struct { } )
2019-04-01 10:54:43 +00:00
2019-06-03 14:17:27 +00:00
t . Logf ( "starting datamanager" )
2019-07-02 12:27:51 +00:00
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
2019-04-26 14:00:03 +00:00
<- dmReadyCh
2019-02-22 07:45:59 +00:00
2019-04-01 10:54:43 +00:00
time . Sleep ( 5 * time . Second )
2019-04-26 14:00:03 +00:00
for i := 0 ; i < 20 ; i ++ {
objectID := fmt . Sprintf ( "object%02d" , i )
_ , _ , err = dm . ReadObject ( "datatype01" , objectID , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
2019-02-22 07:45:59 +00:00
}
}
}
2019-11-05 16:48:11 +00:00
func TestEtcdResetWalsGap ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx , cancel := context . WithCancel ( context . Background ( ) )
ostDir , err := ioutil . TempDir ( dir , "ost" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-11-05 16:48:11 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmConfig := & DataManagerConfig {
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
EtcdWalsKeepNum : 10 ,
DataTypes : [ ] string { "datatype01" } ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh := make ( chan struct { } )
t . Logf ( "starting datamanager" )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
actions := [ ] * Action {
{
ActionType : ActionTypePut ,
DataType : "datatype01" ,
Data : [ ] byte ( "{}" ) ,
} ,
}
for i := 0 ; i < 20 ; i ++ {
objectID := fmt . Sprintf ( "object%02d" , i )
actions [ 0 ] . ID = objectID
if _ , err := dm . WriteWal ( ctx , actions , nil ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
}
// wait for wal to be committed storage
time . Sleep ( 5 * time . Second )
t . Logf ( "stopping datamanager" )
cancel ( )
t . Logf ( "stopping etcd" )
// Reset etcd
shutdownEtcd ( tetcd )
if err := tetcd . WaitDown ( 10 * time . Second ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
t . Logf ( "resetting etcd" )
os . RemoveAll ( etcdDir )
t . Logf ( "starting etcd" )
tetcd = setupEtcd ( t , etcdDir )
if err := tetcd . Start ( ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer shutdownEtcd ( tetcd )
// Remove a wal in the middle
doneCh := make ( chan struct { } )
defer close ( doneCh )
walStatusFiles := [ ] string { }
for object := range dm . ost . List ( path . Join ( dm . basePath , storageWalsStatusDir ) + "/" , "" , true , doneCh ) {
if object . Err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
walStatusFiles = append ( walStatusFiles , object . Path )
}
if len ( walStatusFiles ) < 20 {
t . Fatalf ( "exptected at least 20 wals, got: %d wals" , len ( walStatusFiles ) )
}
removeIndex := 10
if err := dm . ost . DeleteObject ( walStatusFiles [ removeIndex ] ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
errorWalSequence := strings . TrimSuffix ( path . Base ( walStatusFiles [ removeIndex + 1 ] ) , path . Ext ( walStatusFiles [ removeIndex + 1 ] ) )
prevWalSequence := strings . TrimSuffix ( path . Base ( walStatusFiles [ removeIndex ] ) , path . Ext ( walStatusFiles [ removeIndex ] ) )
expectedPrevWalSequence := strings . TrimSuffix ( path . Base ( walStatusFiles [ removeIndex - 1 ] ) , path . Ext ( walStatusFiles [ removeIndex - 1 ] ) )
ctx , cancel = context . WithCancel ( context . Background ( ) )
defer cancel ( )
dmConfig = & DataManagerConfig {
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
EtcdWalsKeepNum : 10 ,
DataTypes : [ ] string { "datatype01" } ,
}
dm , err = NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh = make ( chan struct { } )
expectedErr := errors . Errorf ( "wal %q previousWalSequence %q is different than expected walSequence %q" , errorWalSequence , prevWalSequence , expectedPrevWalSequence )
err = dm . InitEtcd ( ctx , nil )
if err == nil {
t . Fatalf ( "expected err: %q, got nil error" , expectedErr )
}
if expectedErr . Error ( ) != err . Error ( ) {
t . Fatalf ( "expected err: %q, got err %q" , expectedErr , err )
}
}
2019-02-22 07:45:59 +00:00
func TestConcurrentUpdate ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx := context . Background ( )
2019-04-27 13:16:48 +00:00
ostDir , err := ioutil . TempDir ( dir , "ost" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-02-22 07:45:59 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-26 14:00:03 +00:00
dmConfig := & DataManagerConfig {
2019-02-22 07:45:59 +00:00
E : tetcd . TestEtcd . Store ,
2019-04-27 13:16:48 +00:00
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
2019-02-22 07:45:59 +00:00
EtcdWalsKeepNum : 10 ,
2019-04-26 14:00:03 +00:00
DataTypes : [ ] string { "datatype01" } ,
2019-02-22 07:45:59 +00:00
}
2019-04-26 14:00:03 +00:00
dm , err := NewDataManager ( ctx , logger , dmConfig )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
actions := [ ] * Action {
{
ActionType : ActionTypePut ,
2019-04-26 14:00:03 +00:00
ID : "object01" ,
DataType : "datatype01" ,
2019-02-22 07:45:59 +00:00
Data : [ ] byte ( "{}" ) ,
} ,
}
2019-04-26 14:00:03 +00:00
dmReadyCh := make ( chan struct { } )
2019-07-02 12:27:51 +00:00
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
2019-04-26 14:00:03 +00:00
<- dmReadyCh
2019-02-22 07:45:59 +00:00
2019-04-01 10:54:43 +00:00
time . Sleep ( 5 * time . Second )
2019-02-22 07:45:59 +00:00
cgNames := [ ] string { "changegroup01" , "changegroup02" }
2019-04-26 14:00:03 +00:00
cgt , err := dm . GetChangeGroupsUpdateToken ( cgNames )
2019-03-28 15:01:08 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
// populate with a wal
2019-04-26 14:00:03 +00:00
cgt , err = dm . WriteWal ( ctx , actions , cgt )
2019-02-22 07:45:59 +00:00
if err != nil {
2019-03-28 15:01:08 +00:00
t . Fatalf ( "unexpected err: %v" , err )
2019-02-22 07:45:59 +00:00
}
// this must work successfully
oldcgt := cgt
2019-04-26 14:00:03 +00:00
cgt , err = dm . WriteWal ( ctx , actions , cgt )
2019-02-22 07:45:59 +00:00
if err != nil {
2019-03-28 15:01:08 +00:00
t . Fatalf ( "unexpected err: %v" , err )
2019-02-22 07:45:59 +00:00
}
// this must fail since we are using the old cgt
2019-04-26 14:00:03 +00:00
_ , err = dm . WriteWal ( ctx , actions , oldcgt )
2019-02-22 07:45:59 +00:00
if err != ErrConcurrency {
t . Fatalf ( "expected err: %v, got %v" , ErrConcurrency , err )
}
oldcgt = cgt
// this must work successfully
2019-07-02 12:27:51 +00:00
_ , err = dm . WriteWal ( ctx , actions , cgt )
2019-02-22 07:45:59 +00:00
if err != nil {
2019-03-28 15:01:08 +00:00
t . Fatalf ( "unexpected err: %v" , err )
2019-02-22 07:45:59 +00:00
}
// this must fail since we are using the old cgt
2019-04-26 14:00:03 +00:00
_ , err = dm . WriteWal ( ctx , actions , oldcgt )
2019-02-22 07:45:59 +00:00
if err != ErrConcurrency {
t . Fatalf ( "expected err: %v, got %v" , ErrConcurrency , err )
}
}
2019-11-08 09:10:56 +00:00
func TestEtcdWalCleaner ( t * testing . T ) {
2019-02-22 07:45:59 +00:00
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx := context . Background ( )
2019-04-27 13:16:48 +00:00
ostDir , err := ioutil . TempDir ( dir , "ost" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-02-22 07:45:59 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
walKeepNum := 10
2019-04-26 14:00:03 +00:00
dmConfig := & DataManagerConfig {
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
EtcdWalsKeepNum : walKeepNum ,
DataTypes : [ ] string { "datatype01" } ,
MinCheckpointWalsNum : 1 ,
2019-02-22 07:45:59 +00:00
}
2019-04-26 14:00:03 +00:00
dm , err := NewDataManager ( ctx , logger , dmConfig )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
actions := [ ] * Action {
{
ActionType : ActionTypePut ,
2019-04-26 14:00:03 +00:00
ID : "object01" ,
DataType : "datatype01" ,
2019-02-22 07:45:59 +00:00
Data : [ ] byte ( "{}" ) ,
} ,
}
2019-04-26 14:00:03 +00:00
dmReadyCh := make ( chan struct { } )
2019-07-02 12:27:51 +00:00
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
2019-04-26 14:00:03 +00:00
<- dmReadyCh
2019-02-22 07:45:59 +00:00
for i := 0 ; i < 20 ; i ++ {
2019-04-26 14:00:03 +00:00
if _ , err := dm . WriteWal ( ctx , actions , nil ) ; err != nil {
2019-02-22 07:45:59 +00:00
t . Fatalf ( "unexpected err: %v" , err )
}
}
2019-07-18 12:58:42 +00:00
if err := dm . checkpoint ( ctx , true ) ; err != nil {
2019-06-03 14:17:27 +00:00
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 09:10:56 +00:00
if err := dm . etcdWalCleaner ( ctx ) ; err != nil {
2019-06-03 14:17:27 +00:00
t . Fatalf ( "unexpected err: %v" , err )
}
2019-02-22 07:45:59 +00:00
walsCount := 0
2019-04-26 14:00:03 +00:00
for range dm . ListEtcdWals ( ctx , 0 ) {
2019-02-22 07:45:59 +00:00
walsCount ++
}
if walsCount != walKeepNum {
t . Fatalf ( "expected %d wals in etcd, got %d wals" , walKeepNum , walsCount )
}
}
2019-04-26 14:00:03 +00:00
func TestReadObject ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-26 14:00:03 +00:00
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx := context . Background ( )
ostDir , err := ioutil . TempDir ( dir , "ost" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-04-26 14:00:03 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmConfig := & DataManagerConfig {
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" } ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-26 14:00:03 +00:00
2019-06-03 14:17:27 +00:00
dmReadyCh := make ( chan struct { } )
2019-07-02 12:27:51 +00:00
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
2019-06-03 14:17:27 +00:00
<- dmReadyCh
time . Sleep ( 5 * time . Second )
2019-04-26 14:00:03 +00:00
actions := [ ] * Action { }
for i := 0 ; i < 20 ; i ++ {
actions = append ( actions , & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d" } ` , i ) ) ,
} )
}
// populate with a wal
_ , err = dm . WriteWal ( ctx , actions , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// wait for the event to be read
time . Sleep ( 500 * time . Millisecond )
// should read it
_ , _ , err = dm . ReadObject ( "datatype01" , "object1" , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
_ , _ , err = dm . ReadObject ( "datatype01" , "object19" , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
actions = [ ] * Action { }
for i := 0 ; i < 10 ; i ++ {
actions = append ( actions , & Action {
ActionType : ActionTypeDelete ,
ID : fmt . Sprintf ( "object%d" , i ) ,
DataType : "datatype01" ,
} )
}
_ , err = dm . WriteWal ( ctx , actions , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// wait for the event to be read
time . Sleep ( 500 * time . Millisecond )
// test read from changes (since not checkpoint yet)
// should not exists
_ , _ , err = dm . ReadObject ( "datatype01" , "object1" , nil )
2019-11-06 12:29:42 +00:00
if ! util . IsNotExist ( err ) {
t . Fatalf ( "expected err %v, got: %v" , & util . ErrNotExist { } , err )
2019-04-26 14:00:03 +00:00
}
// should exist
_ , _ , err = dm . ReadObject ( "datatype01" , "object19" , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// do a checkpoint and wal clean
2019-07-18 12:58:42 +00:00
if err := dm . checkpoint ( ctx , true ) ; err != nil {
2019-06-03 14:17:27 +00:00
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 09:10:56 +00:00
if err := dm . etcdWalCleaner ( ctx ) ; err != nil {
2019-06-03 14:17:27 +00:00
t . Fatalf ( "unexpected err: %v" , err )
}
2019-04-26 14:00:03 +00:00
// wait for the event to be read
time . Sleep ( 500 * time . Millisecond )
// test read from data
// should not exists
_ , _ , err = dm . ReadObject ( "datatype01" , "object1" , nil )
2019-11-06 12:29:42 +00:00
if ! util . IsNotExist ( err ) {
t . Fatalf ( "expected err %v, got: %v" , & util . ErrNotExist { } , err )
2019-04-26 14:00:03 +00:00
}
// should exist
_ , _ , err = dm . ReadObject ( "datatype01" , "object19" , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
}
2019-06-03 14:17:27 +00:00
2019-07-03 15:03:37 +00:00
func doAndCheckCheckpoint ( t * testing . T , ctx context . Context , dm * DataManager , actionGroups [ ] [ ] * Action , currentEntries map [ string ] * DataEntry ) ( map [ string ] * DataEntry , error ) {
2019-06-03 14:17:27 +00:00
expectedEntries := map [ string ] * DataEntry { }
for _ , e := range currentEntries {
expectedEntries [ e . ID ] = e
}
for _ , actionGroup := range actionGroups {
for _ , action := range actionGroup {
switch action . ActionType {
case ActionTypePut :
expectedEntries [ action . ID ] = & DataEntry { ID : action . ID , DataType : action . DataType , Data : action . Data }
case ActionTypeDelete :
delete ( expectedEntries , action . ID )
}
}
}
for _ , actionGroup := range actionGroups {
// populate with a wal
_ , err := dm . WriteWal ( ctx , actionGroup , nil )
if err != nil {
return nil , err
}
}
// wait for the event to be read
time . Sleep ( 500 * time . Millisecond )
// do a checkpoint
2019-07-18 12:58:42 +00:00
if err := dm . checkpoint ( ctx , true ) ; err != nil {
2019-06-03 14:17:27 +00:00
return nil , err
}
if err := checkDataFiles ( ctx , t , dm , expectedEntries ) ; err != nil {
return nil , err
}
return expectedEntries , nil
}
2019-10-29 12:23:42 +00:00
func checkDataFiles ( ctx context . Context , t * testing . T , dm * DataManager , expectedEntriesMap map [ string ] * DataEntry ) error {
// read the data file
curDataStatus , err := dm . GetLastDataStatus ( )
if err != nil {
return err
}
allEntriesMap := map [ string ] * DataEntry { }
for dataType := range curDataStatus . Files {
var prevLastEntryID string
for i , file := range curDataStatus . Files [ dataType ] {
dataFileIndexf , err := dm . ost . ReadObject ( dm . DataFileIndexPath ( dataType , file . ID ) )
if err != nil {
return err
}
var dataFileIndex * DataFileIndex
dec := json . NewDecoder ( dataFileIndexf )
err = dec . Decode ( & dataFileIndex )
if err != nil {
dataFileIndexf . Close ( )
return err
}
dataFileIndexf . Close ( )
dataEntriesMap := map [ string ] * DataEntry { }
dataEntries := [ ] * DataEntry { }
dataf , err := dm . ost . ReadObject ( dm . DataFilePath ( dataType , file . ID ) )
if err != nil {
return err
}
dec = json . NewDecoder ( dataf )
var prevEntryID string
for {
var de * DataEntry
err := dec . Decode ( & de )
if err == io . EOF {
// all done
break
}
if err != nil {
dataf . Close ( )
return err
}
// check that there are no duplicate entries
if _ , ok := allEntriesMap [ de . ID ] ; ok {
return fmt . Errorf ( "duplicate entry id: %s" , de . ID )
}
// check that the entries are in order
if de . ID < prevEntryID {
return fmt . Errorf ( "previous entry id: %s greater than entry id: %s" , prevEntryID , de . ID )
}
dataEntriesMap [ de . ID ] = de
dataEntries = append ( dataEntries , de )
allEntriesMap [ de . ID ] = de
}
dataf . Close ( )
// check that the index matches the entries
if len ( dataFileIndex . Index ) != len ( dataEntriesMap ) {
return fmt . Errorf ( "index entries: %d different than data entries: %d" , len ( dataFileIndex . Index ) , len ( dataEntriesMap ) )
}
indexIDs := make ( [ ] string , len ( dataFileIndex . Index ) )
entriesIDs := make ( [ ] string , len ( dataEntriesMap ) )
for id := range dataFileIndex . Index {
indexIDs = append ( indexIDs , id )
}
for id := range dataEntriesMap {
entriesIDs = append ( entriesIDs , id )
}
sort . Strings ( indexIDs )
sort . Strings ( entriesIDs )
if ! reflect . DeepEqual ( indexIDs , entriesIDs ) {
return fmt . Errorf ( "index entries ids don't match data entries ids: index: %v, data: %v" , indexIDs , entriesIDs )
}
if file . LastEntryID != dataEntries [ len ( dataEntries ) - 1 ] . ID {
return fmt . Errorf ( "lastEntryID for datafile %d: %s is different than real last entry id: %s" , i , file . LastEntryID , dataEntries [ len ( dataEntries ) - 1 ] . ID )
}
// check that all the files are in order
if file . LastEntryID == prevLastEntryID {
return fmt . Errorf ( "lastEntryID for datafile %d is equal than previous file lastEntryID: %s == %s" , i , file . LastEntryID , prevLastEntryID )
}
if file . LastEntryID < prevLastEntryID {
return fmt . Errorf ( "lastEntryID for datafile %d is less than previous file lastEntryID: %s < %s" , i , file . LastEntryID , prevLastEntryID )
}
prevLastEntryID = file . LastEntryID
}
}
// check that the number of entries is right
if len ( allEntriesMap ) != len ( expectedEntriesMap ) {
return fmt . Errorf ( "expected %d total entries, got %d" , len ( expectedEntriesMap ) , len ( allEntriesMap ) )
}
if ! reflect . DeepEqual ( expectedEntriesMap , allEntriesMap ) {
return fmt . Errorf ( "expected entries don't match current entries" )
}
return nil
}
2019-06-03 14:17:27 +00:00
// TODO(sgotti) some fuzzy testing will be really good
func TestCheckpoint ( t * testing . T ) {
2019-07-03 15:03:37 +00:00
tests := [ ] struct {
name string
basePath string
} {
{
name : "test with empty basepath" ,
basePath : "" ,
} ,
{
name : "test with relative basepath" ,
basePath : "base/path" ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
testCheckpoint ( t , tt . basePath )
} )
}
}
func testCheckpoint ( t * testing . T , basePath string ) {
2019-06-03 14:17:27 +00:00
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-06-03 14:17:27 +00:00
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx := context . Background ( )
ostDir , err := ioutil . TempDir ( dir , "ost" )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmConfig := & DataManagerConfig {
2019-07-03 15:03:37 +00:00
BasePath : basePath ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
2019-06-03 14:17:27 +00:00
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
2019-07-02 12:27:51 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-06-03 14:17:27 +00:00
dmReadyCh := make ( chan struct { } )
2019-07-02 12:27:51 +00:00
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
2019-06-03 14:17:27 +00:00
<- dmReadyCh
time . Sleep ( 5 * time . Second )
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
// test insert from scratch (no current entries)
actions := [ ] * Action { }
for i := 200 ; i < 400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
2019-07-03 15:03:37 +00:00
currentEntries , err := doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , nil )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// test delete of all existing entries
actions = [ ] * Action { }
for i := 200 ; i < 400 ; i ++ {
actions = append ( actions , & Action {
ActionType : ActionTypeDelete ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
} )
}
2019-07-03 15:03:37 +00:00
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// test insert from scratch again (no current entries)
actions = [ ] * Action { }
for i := 200 ; i < 400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
2019-07-03 15:03:37 +00:00
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// test delete some existing entries in the middle
actions = [ ] * Action { }
for i := 250 ; i < 350 ; i ++ {
action := & Action {
ActionType : ActionTypeDelete ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
}
actions = append ( actions , action )
}
2019-07-03 15:03:37 +00:00
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// test delete of unexisting entries
actions = [ ] * Action { }
for i := 1000 ; i < 1010 ; i ++ {
action := & Action {
ActionType : ActionTypeDelete ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
}
actions = append ( actions , action )
}
2019-07-03 15:03:37 +00:00
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// test update and insert at the end
actions = [ ] * Action { }
for i := 300 ; i < 500 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
2019-07-03 15:03:37 +00:00
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// test update and insert at the start
actions = [ ] * Action { }
for i := 0 ; i < 300 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
2019-07-03 15:03:37 +00:00
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// test multiple wals with different insert, updated, deletes
actionGroups := [ ] [ ] * Action { }
for i := 0 ; i < 150 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
actionGroups = append ( actionGroups , actions )
for i := 50 ; i < 100 ; i ++ {
action := & Action {
ActionType : ActionTypeDelete ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
actionGroups = append ( actionGroups , actions )
for i := 250 ; i < 300 ; i ++ {
action := & Action {
ActionType : ActionTypeDelete ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
for i := 70 ; i < 80 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
actionGroups = append ( actionGroups , actions )
2019-07-03 15:03:37 +00:00
_ , err = doAndCheckCheckpoint ( t , ctx , dm , actionGroups , currentEntries )
2019-06-03 14:17:27 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-10-29 12:23:42 +00:00
if err := dm . CleanOldCheckpoints ( ctx ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-06-03 14:17:27 +00:00
}
2019-11-06 12:33:01 +00:00
func TestRead ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx := context . Background ( )
ostDir , err := ioutil . TempDir ( dir , "ost" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-11-06 12:33:01 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmConfig := & DataManagerConfig {
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh := make ( chan struct { } )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
time . Sleep ( 5 * time . Second )
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
// test insert from scratch (no current entries)
actions := [ ] * Action { }
for i := 0 ; i < 2000 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
currentEntries , err := doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// ensure that at least three datafiles are created
curDataStatus , err := dm . GetLastDataStatus ( )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if len ( curDataStatus . Files [ "datatype01" ] ) < 3 {
t . Fatalf ( "expected at least 3 datafiles, got: %d" , len ( curDataStatus . Files [ "datatype01" ] ) )
}
for i := 0 ; i < 2000 ; i ++ {
id := fmt . Sprintf ( "object%04d" , i )
er , err := dm . Read ( "datatype01" , id )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
data , err := ioutil . ReadAll ( er )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if ! reflect . DeepEqual ( data , currentEntries [ id ] . Data ) {
t . Fatalf ( "expected data: %v, got data: %v" , currentEntries [ id ] . Data , data )
}
}
}
2019-10-29 12:23:42 +00:00
func TestClean ( t * testing . T ) {
tests := [ ] struct {
name string
basePath string
} {
{
name : "test with empty basepath" ,
basePath : "" ,
} ,
{
name : "test with relative basepath" ,
basePath : "base/path" ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
testClean ( t , tt . basePath )
} )
}
}
func testClean ( t * testing . T , basePath string ) {
dir , err := ioutil . TempDir ( "" , "agola" )
2019-06-03 14:17:27 +00:00
if err != nil {
2019-10-29 12:23:42 +00:00
t . Fatalf ( "unexpected err: %v" , err )
2019-06-03 14:17:27 +00:00
}
2019-10-29 12:23:42 +00:00
defer os . RemoveAll ( dir )
2019-06-03 14:17:27 +00:00
2019-10-29 12:23:42 +00:00
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
2019-06-03 14:17:27 +00:00
2019-10-29 12:23:42 +00:00
ctx := context . Background ( )
2019-07-17 15:16:35 +00:00
2019-10-29 12:23:42 +00:00
ostDir , err := ioutil . TempDir ( dir , "ost" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-10-29 12:23:42 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-07-17 15:16:35 +00:00
2019-10-29 12:23:42 +00:00
dmConfig := & DataManagerConfig {
BasePath : basePath ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh := make ( chan struct { } )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
2019-07-17 15:16:35 +00:00
2019-10-29 12:23:42 +00:00
time . Sleep ( 5 * time . Second )
2019-06-03 14:17:27 +00:00
2019-10-29 12:23:42 +00:00
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
2019-07-17 15:16:35 +00:00
2019-10-29 12:23:42 +00:00
var currentEntries map [ string ] * DataEntry
actions := [ ] * Action { }
for n := 0 ; n < 10 ; n ++ {
for i := 0 ; i < 400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
2019-07-17 15:16:35 +00:00
}
2019-10-29 12:23:42 +00:00
actions = append ( actions , action )
}
2019-07-17 15:16:35 +00:00
2019-10-29 12:23:42 +00:00
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
}
// get the last data status sequence
lastDataStatusSequences , err := dm . GetLastDataStatusSequences ( dataStatusToKeep )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if err := dm . CleanOldCheckpoints ( ctx ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// check last data file
if err := checkDataFiles ( ctx , t , dm , currentEntries ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// check that only the last dataStatusToKeep status files are left
curDataStatusSequences , err := dm . GetLastDataStatusSequences ( 1000 )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if len ( curDataStatusSequences ) != dataStatusToKeep {
t . Fatalf ( "expected %d data status files, got %d: %s" , dataStatusToKeep , len ( curDataStatusSequences ) , curDataStatusSequences )
}
if diff := cmp . Diff ( lastDataStatusSequences , curDataStatusSequences ) ; diff != "" {
t . Fatalf ( "different data status sequences: %v" , diff )
}
}
func TestCleanConcurrentCheckpoint ( t * testing . T ) {
tests := [ ] struct {
name string
basePath string
} {
{
name : "test with empty basepath" ,
basePath : "" ,
} ,
{
name : "test with relative basepath" ,
basePath : "base/path" ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
testCleanConcurrentCheckpoint ( t , tt . basePath )
} )
}
}
func testCleanConcurrentCheckpoint ( t * testing . T , basePath string ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx := context . Background ( )
ostDir , err := ioutil . TempDir ( dir , "ost" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-10-29 12:23:42 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmConfig := & DataManagerConfig {
BasePath : basePath ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh := make ( chan struct { } )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
time . Sleep ( 5 * time . Second )
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
var currentEntries map [ string ] * DataEntry
actions := [ ] * Action { }
for n := 0 ; n < 10 ; n ++ {
for i := 0 ; i < 400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
2019-07-17 15:16:35 +00:00
}
2019-10-29 12:23:42 +00:00
actions = append ( actions , action )
}
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
}
2019-10-29 12:23:42 +00:00
// get the current last data status sequences before doing other actions and checkpoints
dataStatusSequences , err := dm . GetLastDataStatusSequences ( dataStatusToKeep )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
2019-07-17 15:16:35 +00:00
}
2019-10-29 12:23:42 +00:00
for i := 0 ; i < 400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
2019-07-17 15:16:35 +00:00
}
2019-10-29 12:23:42 +00:00
if _ , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if err := dm . cleanOldCheckpoints ( ctx , dataStatusSequences ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// check the datastatus after clean
curDataStatus , err := dm . GetLastDataStatus ( )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if curDataStatus . DataSequence <= dataStatusSequences [ 0 ] . String ( ) {
t . Fatalf ( "expected data status sequence greater than %q" , dataStatusSequences [ 0 ] )
}
// check last data file
if err := checkDataFiles ( ctx , t , dm , currentEntries ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-07-17 15:16:35 +00:00
}
2019-11-08 09:10:56 +00:00
func TestStorageWalCleaner ( t * testing . T ) {
tests := [ ] struct {
name string
basePath string
} {
{
name : "test with empty basepath" ,
basePath : "" ,
} ,
{
name : "test with relative basepath" ,
basePath : "base/path" ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
testStorageWalCleaner ( t , tt . basePath )
} )
}
}
func testStorageWalCleaner ( t * testing . T , basePath string ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx := context . Background ( )
ostDir , err := ioutil . TempDir ( dir , "ost" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-11-08 09:10:56 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmConfig := & DataManagerConfig {
BasePath : basePath ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh := make ( chan struct { } )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
time . Sleep ( 5 * time . Second )
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
var currentEntries map [ string ] * DataEntry
actions := [ ] * Action { }
for n := 0 ; n < 10 ; n ++ {
for i := 0 ; i < 400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
}
actions = append ( actions , action )
}
currentEntries , err = doAndCheckCheckpoint ( t , ctx , dm , [ ] [ ] * Action { actions } , currentEntries )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
}
// get the last data status sequence
lastDataStatusSequences , err := dm . GetLastDataStatusSequences ( dataStatusToKeep )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// Use the first dataStatusToKeep data status
dataStatus , err := dm . GetDataStatus ( lastDataStatusSequences [ dataStatusToKeep - 1 ] )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
// get the list of expected wals
doneCh := make ( chan struct { } )
defer close ( doneCh )
expectedWalStatusFiles := [ ] string { }
expectedWalDataFiles := [ ] string { }
for object := range dm . ost . List ( dm . storageWalStatusDir ( ) + "/" , "" , true , doneCh ) {
if object . Err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
name := path . Base ( object . Path )
ext := path . Ext ( name )
walSequence := strings . TrimSuffix ( name , ext )
if walSequence < dataStatus . WalSequence {
continue
}
header , err := dm . ReadWal ( walSequence )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
expectedWalStatusFiles = append ( expectedWalStatusFiles , object . Path )
expectedWalDataFiles = append ( expectedWalDataFiles , dm . storageWalDataFile ( header . WalDataFileID ) )
}
sort . Strings ( expectedWalDataFiles )
if err := dm . CleanOldCheckpoints ( ctx ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if err := dm . storageWalCleaner ( ctx ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
currentWalStatusFiles := [ ] string { }
currentWalDataFiles := [ ] string { }
for object := range dm . ost . List ( dm . storageWalStatusDir ( ) + "/" , "" , true , doneCh ) {
if object . Err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
currentWalStatusFiles = append ( currentWalStatusFiles , object . Path )
}
for object := range dm . ost . List ( dm . storageWalDataDir ( ) + "/" , "" , true , doneCh ) {
if object . Err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
currentWalDataFiles = append ( currentWalDataFiles , object . Path )
}
sort . Strings ( currentWalDataFiles )
if diff := cmp . Diff ( currentWalStatusFiles , expectedWalStatusFiles ) ; diff != "" {
t . Fatalf ( "different wal status files: %v" , diff )
}
if diff := cmp . Diff ( currentWalDataFiles , expectedWalDataFiles ) ; diff != "" {
t . Fatalf ( "different wal data files: %v" , diff )
}
}
2019-07-17 15:16:35 +00:00
func TestExportImport ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "agola" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer os . RemoveAll ( dir )
etcdDir , err := ioutil . TempDir ( dir , "etcd" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
tetcd := setupEtcd ( t , etcdDir )
defer shutdownEtcd ( tetcd )
ctx , cancel := context . WithCancel ( context . Background ( ) )
ostDir , err := ioutil . TempDir ( dir , "ost" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err := objectstorage . NewPosix ( ostDir )
2019-07-17 15:16:35 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmConfig := & DataManagerConfig {
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" , "datatype02" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
}
dm , err := NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh := make ( chan struct { } )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
time . Sleep ( 5 * time . Second )
contents := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
2019-06-03 14:17:27 +00:00
2019-07-17 15:16:35 +00:00
expectedEntries := map [ string ] * DataEntry { }
// test insert from scratch (no current entries)
actionGroups := [ ] [ ] * Action { }
actions := [ ] * Action { }
for i := 200 ; i < 400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
actions = append ( actions , action )
}
actionGroups = append ( actionGroups , actions )
actions = [ ] * Action { }
for i := 600 ; i < 1000 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype02" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
actions = append ( actions , action )
}
actionGroups = append ( actionGroups , actions )
for _ , actionGroup := range actionGroups {
for _ , action := range actionGroup {
switch action . ActionType {
case ActionTypePut :
expectedEntries [ action . ID ] = & DataEntry { ID : action . ID , DataType : action . DataType , Data : action . Data }
case ActionTypeDelete :
delete ( expectedEntries , action . ID )
}
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
}
for _ , actionGroup := range actionGroups {
// populate with a wal
_ , err := dm . WriteWal ( ctx , actionGroup , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
}
// wait for the event to be read
time . Sleep ( 500 * time . Millisecond )
var export bytes . Buffer
if err := dm . Export ( ctx , & export ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
t . Logf ( "stopping datamanager" )
cancel ( )
2019-06-03 14:17:27 +00:00
2019-07-17 15:16:35 +00:00
time . Sleep ( 5 * time . Second )
t . Logf ( "stopping etcd" )
// Reset etcd
shutdownEtcd ( tetcd )
if err := tetcd . WaitDown ( 10 * time . Second ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
t . Logf ( "resetting etcd" )
os . RemoveAll ( etcdDir )
t . Logf ( "starting etcd" )
tetcd = setupEtcd ( t , etcdDir )
if err := tetcd . Start ( ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
defer shutdownEtcd ( tetcd )
ostDir , err = ioutil . TempDir ( dir , "ost" )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-11-08 15:25:53 +00:00
ost , err = objectstorage . NewPosix ( ostDir )
2019-07-17 15:16:35 +00:00
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
ctx , cancel = context . WithCancel ( context . Background ( ) )
dmConfig = & DataManagerConfig {
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" , "datatype02" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
MaintenanceMode : true ,
}
dm , err = NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh = make ( chan struct { } )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
time . Sleep ( 5 * time . Second )
if err := dm . Import ( ctx , & export ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
if err := checkDataFiles ( ctx , t , dm , expectedEntries ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
t . Logf ( "stopping datamanager" )
cancel ( )
time . Sleep ( 5 * time . Second )
ctx = context . Background ( )
// restart datamanager in normal mode
dmConfig = & DataManagerConfig {
BasePath : "basepath" ,
E : tetcd . TestEtcd . Store ,
OST : objectstorage . NewObjStorage ( ost , "/" ) ,
// remove almost all wals to see that they are removed also from changes
EtcdWalsKeepNum : 1 ,
DataTypes : [ ] string { "datatype01" , "datatype02" } ,
// checkpoint also with only one wal
MinCheckpointWalsNum : 1 ,
// use a small maxDataFileSize
MaxDataFileSize : 10 * 1024 ,
}
dm , err = NewDataManager ( ctx , logger , dmConfig )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
dmReadyCh = make ( chan struct { } )
go func ( ) { _ = dm . Run ( ctx , dmReadyCh ) } ( )
<- dmReadyCh
time . Sleep ( 5 * time . Second )
if err := checkDataFiles ( ctx , t , dm , expectedEntries ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
actionGroups = [ ] [ ] * Action { }
actions = [ ] * Action { }
for i := 400 ; i < 600 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype01" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
actions = append ( actions , action )
}
actionGroups = append ( actionGroups , actions )
2019-06-03 14:17:27 +00:00
2019-07-17 15:16:35 +00:00
actions = [ ] * Action { }
for i := 1000 ; i < 1400 ; i ++ {
action := & Action {
ActionType : ActionTypePut ,
ID : fmt . Sprintf ( "object%04d" , i ) ,
DataType : "datatype02" ,
Data : [ ] byte ( fmt . Sprintf ( ` { "ID": "%d", "Contents": %s } ` , i , contents ) ) ,
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
actions = append ( actions , action )
}
actionGroups = append ( actionGroups , actions )
for _ , actionGroup := range actionGroups {
for _ , action := range actionGroup {
switch action . ActionType {
case ActionTypePut :
expectedEntries [ action . ID ] = & DataEntry { ID : action . ID , DataType : action . DataType , Data : action . Data }
case ActionTypeDelete :
delete ( expectedEntries , action . ID )
}
2019-06-03 14:17:27 +00:00
}
}
2019-07-17 15:16:35 +00:00
for _ , actionGroup := range actionGroups {
// populate with a wal
_ , err := dm . WriteWal ( ctx , actionGroup , nil )
if err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
// wait for the event to be read
time . Sleep ( 500 * time . Millisecond )
if err := dm . checkpoint ( ctx , false ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
2019-06-03 14:17:27 +00:00
}
2019-07-17 15:16:35 +00:00
if err := checkDataFiles ( ctx , t , dm , expectedEntries ) ; err != nil {
t . Fatalf ( "unexpected err: %v" , err )
}
2019-06-03 14:17:27 +00:00
}