Merge pull request #65 from sgotti/readdb_close_open_on_run
readdb: close and open readdb on Run
This commit is contained in:
commit
335d1a2aad
|
@ -68,20 +68,10 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.
|
||||||
if err := os.MkdirAll(dataDir, 0770); err != nil {
|
if err := os.MkdirAll(dataDir, 0770); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(dataDir, "db"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// populate readdb
|
|
||||||
if err := rdb.Create(ctx, Stmts); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
readDB := &ReadDB{
|
readDB := &ReadDB{
|
||||||
log: logger.Sugar(),
|
log: logger.Sugar(),
|
||||||
dataDir: dataDir,
|
dataDir: dataDir,
|
||||||
rdb: rdb,
|
|
||||||
e: e,
|
e: e,
|
||||||
ost: ost,
|
ost: ost,
|
||||||
dm: dm,
|
dm: dm,
|
||||||
|
@ -114,7 +104,9 @@ func (r *ReadDB) Initialize(ctx context.Context) error {
|
||||||
|
|
||||||
func (r *ReadDB) ResetDB(ctx context.Context) error {
|
func (r *ReadDB) ResetDB(ctx context.Context) error {
|
||||||
// TODO(sgotti) this needs to be protected by a mutex
|
// TODO(sgotti) this needs to be protected by a mutex
|
||||||
r.rdb.Close()
|
if r.rdb != nil {
|
||||||
|
r.rdb.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// drop rdb
|
// drop rdb
|
||||||
if err := os.Remove(filepath.Join(r.dataDir, "db")); err != nil {
|
if err := os.Remove(filepath.Join(r.dataDir, "db")); err != nil {
|
||||||
|
@ -406,6 +398,20 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ReadDB) Run(ctx context.Context) error {
|
func (r *ReadDB) Run(ctx context.Context) error {
|
||||||
|
if r.rdb != nil {
|
||||||
|
r.rdb.Close()
|
||||||
|
}
|
||||||
|
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(r.dataDir, "db"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
r.rdb = rdb
|
||||||
|
|
||||||
|
// populate readdb
|
||||||
|
if err := r.rdb.Create(ctx, Stmts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
revision, err := r.GetRevision(ctx)
|
revision, err := r.GetRevision(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -468,7 +474,6 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
r.log.Infof("readdb exiting")
|
r.log.Infof("readdb exiting")
|
||||||
cancel()
|
cancel()
|
||||||
r.rdb.Close()
|
|
||||||
return nil
|
return nil
|
||||||
case <-errCh:
|
case <-errCh:
|
||||||
// cancel context and wait for the all the goroutines to exit
|
// cancel context and wait for the all the goroutines to exit
|
||||||
|
|
|
@ -113,15 +113,6 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.
|
||||||
if err := os.MkdirAll(dataDir, 0770); err != nil {
|
if err := os.MkdirAll(dataDir, 0770); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(dataDir, "db"))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// populate readdb
|
|
||||||
if err := rdb.Create(ctx, Stmts); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
readDB := &ReadDB{
|
readDB := &ReadDB{
|
||||||
log: logger.Sugar(),
|
log: logger.Sugar(),
|
||||||
|
@ -129,7 +120,6 @@ func NewReadDB(ctx context.Context, logger *zap.Logger, dataDir string, e *etcd.
|
||||||
dataDir: dataDir,
|
dataDir: dataDir,
|
||||||
ost: ost,
|
ost: ost,
|
||||||
dm: dm,
|
dm: dm,
|
||||||
rdb: rdb,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return readDB, nil
|
return readDB, nil
|
||||||
|
@ -164,7 +154,9 @@ func (r *ReadDB) Initialize(ctx context.Context) error {
|
||||||
|
|
||||||
func (r *ReadDB) ResetDB(ctx context.Context) error {
|
func (r *ReadDB) ResetDB(ctx context.Context) error {
|
||||||
// TODO(sgotti) this needs to be protected by a mutex
|
// TODO(sgotti) this needs to be protected by a mutex
|
||||||
r.rdb.Close()
|
if r.rdb != nil {
|
||||||
|
r.rdb.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// drop rdb
|
// drop rdb
|
||||||
if err := os.Remove(filepath.Join(r.dataDir, "db")); err != nil {
|
if err := os.Remove(filepath.Join(r.dataDir, "db")); err != nil {
|
||||||
|
@ -256,6 +248,21 @@ func (r *ReadDB) SyncRDB(ctx context.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ReadDB) Run(ctx context.Context) error {
|
func (r *ReadDB) Run(ctx context.Context) error {
|
||||||
|
if r.rdb != nil {
|
||||||
|
r.rdb.Close()
|
||||||
|
}
|
||||||
|
rdb, err := db.NewDB(db.Sqlite3, filepath.Join(r.dataDir, "db"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
r.rdb = rdb
|
||||||
|
|
||||||
|
// populate readdb
|
||||||
|
if err := r.rdb.Create(ctx, Stmts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
revision, err := r.GetRevision(ctx)
|
revision, err := r.GetRevision(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -328,7 +335,6 @@ func (r *ReadDB) Run(ctx context.Context) error {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
r.log.Infof("readdb exiting")
|
r.log.Infof("readdb exiting")
|
||||||
cancel()
|
cancel()
|
||||||
r.rdb.Close()
|
|
||||||
return nil
|
return nil
|
||||||
case <-errCh:
|
case <-errCh:
|
||||||
// cancel context and wait for the all the goroutines to exit
|
// cancel context and wait for the all the goroutines to exit
|
||||||
|
|
Loading…
Reference in New Issue