Merge pull request #61 from sgotti/datamanager_maintenance_mode
datamanager: implement maintenance mode
This commit is contained in:
commit
af4aa58903
|
@ -87,6 +87,7 @@ type DataManagerConfig struct {
|
|||
// MinCheckpointWalsNum is the minimum number of wals required before doing a checkpoint
|
||||
MinCheckpointWalsNum int
|
||||
MaxDataFileSize int64
|
||||
MaintenanceMode bool
|
||||
}
|
||||
|
||||
type DataManager struct {
|
||||
|
@ -100,6 +101,7 @@ type DataManager struct {
|
|||
checkpointInterval time.Duration
|
||||
minCheckpointWalsNum int
|
||||
maxDataFileSize int64
|
||||
maintenanceMode bool
|
||||
}
|
||||
|
||||
func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerConfig) (*DataManager, error) {
|
||||
|
@ -133,6 +135,7 @@ func NewDataManager(ctx context.Context, logger *zap.Logger, conf *DataManagerCo
|
|||
checkpointInterval: conf.CheckpointInterval,
|
||||
minCheckpointWalsNum: conf.MinCheckpointWalsNum,
|
||||
maxDataFileSize: conf.MaxDataFileSize,
|
||||
maintenanceMode: conf.MaintenanceMode,
|
||||
}
|
||||
|
||||
// add trailing slash the basepath
|
||||
|
@ -171,6 +174,12 @@ func etcdWalKey(walSeq string) string {
|
|||
return path.Join(etcdWalsDir, walSeq)
|
||||
}
|
||||
|
||||
// SetMaintenanceMode sets the datamanager in maintenance mode. This method must
|
||||
// be called before invoking the Run method
|
||||
func (d *DataManager) SetMaintenanceMode(maintenanceMode bool) {
|
||||
d.maintenanceMode = maintenanceMode
|
||||
}
|
||||
|
||||
// deleteEtcd deletes all etcd data excluding keys used for locking
|
||||
func (d *DataManager) deleteEtcd(ctx context.Context) error {
|
||||
prefixes := []string{
|
||||
|
@ -192,26 +201,32 @@ func (d *DataManager) deleteEtcd(ctx context.Context) error {
|
|||
}
|
||||
|
||||
func (d *DataManager) Run(ctx context.Context, readyCh chan struct{}) error {
|
||||
for {
|
||||
err := d.InitEtcd(ctx, nil)
|
||||
if err == nil {
|
||||
break
|
||||
if !d.maintenanceMode {
|
||||
for {
|
||||
err := d.InitEtcd(ctx, nil)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
d.log.Errorf("failed to initialize etcd: %+v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
d.log.Errorf("failed to initialize etcd: %+v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
readyCh <- struct{}{}
|
||||
|
||||
go d.watcherLoop(ctx)
|
||||
go d.syncLoop(ctx)
|
||||
go d.checkpointLoop(ctx)
|
||||
go d.walCleanerLoop(ctx)
|
||||
go d.compactChangeGroupsLoop(ctx)
|
||||
go d.etcdPingerLoop(ctx)
|
||||
|
||||
} else {
|
||||
d.log.Infof("datamanager starting in maintenance mode")
|
||||
readyCh <- struct{}{}
|
||||
}
|
||||
|
||||
readyCh <- struct{}{}
|
||||
|
||||
go d.watcherLoop(ctx)
|
||||
go d.syncLoop(ctx)
|
||||
go d.checkpointLoop(ctx)
|
||||
go d.walCleanerLoop(ctx)
|
||||
go d.compactChangeGroupsLoop(ctx)
|
||||
go d.etcdPingerLoop(ctx)
|
||||
|
||||
<-ctx.Done()
|
||||
d.log.Infof("walmanager exiting")
|
||||
d.log.Infof("datamanager exiting")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue