datamanager: create a new wal and checkpoint in initEtcd

When doing an initEtcd (new instance or etcd reset) create a new wal (that will
have a new sequence epoch) and do a checkpoint.

In this way:

* readdb will detect that an epoch change and do a full resync
* we always have a data file (also if empty) that provides the last checkpointed
wal. This information could be used by readdb to resync
This commit is contained in:
Simone Gotti 2019-07-18 13:34:10 +02:00
parent cb2a871be6
commit df716fccc6
1 changed files with 52 additions and 2 deletions

View File

@ -590,7 +590,7 @@ func (d *DataManager) sync(ctx context.Context) error {
switch walData.WalStatus { switch walData.WalStatus {
case WalStatusCommitted: case WalStatusCommitted:
walFilePath := d.storageWalStatusFile(walData.WalSequence) walFilePath := d.storageWalStatusFile(walData.WalSequence)
d.log.Debugf("syncing committed wal to storage") d.log.Debugf("syncing committed wal %q to storage", walData.WalSequence)
header := &WalHeader{ header := &WalHeader{
WalDataFileID: walData.WalDataFileID, WalDataFileID: walData.WalDataFileID,
PreviousWalSequence: walData.PreviousWalSequence, PreviousWalSequence: walData.PreviousWalSequence,
@ -924,7 +924,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
walData := &WalData{ walData := &WalData{
WalSequence: wal.WalSequence, WalSequence: wal.WalSequence,
WalDataFileID: header.WalDataFileID, WalDataFileID: header.WalDataFileID,
WalStatus: WalStatusCommitted, WalStatus: WalStatusCommittedStorage,
} }
if wal.Checkpointed { if wal.Checkpointed {
walData.WalStatus = WalStatusCheckpointed walData.WalStatus = WalStatusCheckpointed
@ -1063,9 +1063,53 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
} }
} }
// insert an empty wal and make it already committedstorage
walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey)
if err != nil {
return err
}
walDataFileID := uuid.NewV4().String()
walDataFilePath := d.storageWalDataFile(walDataFileID)
walKey := etcdWalKey(walSequence.String())
if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader([]byte{}), 0, true); err != nil {
return err
}
d.log.Debugf("wrote wal file: %s", walDataFilePath)
walFilePath := d.storageWalStatusFile(walSequence.String())
d.log.Infof("syncing committed wal %q to storage", walSequence.String())
header := &WalHeader{
WalDataFileID: walDataFileID,
PreviousWalSequence: lastCommittedStorageWalSequence,
}
headerj, err := json.Marshal(header)
if err != nil {
return err
}
walFileCommittedPath := walFilePath + ".committed"
if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil {
return err
}
lastCommittedStorageWalSequence = walSequence.String()
walData := &WalData{
WalSequence: walSequence.String(),
WalDataFileID: walDataFileID,
WalStatus: WalStatusCommittedStorage,
}
walsData := &WalsData{ walsData := &WalsData{
LastCommittedWalSequence: lastCommittedStorageWalSequence, LastCommittedWalSequence: lastCommittedStorageWalSequence,
} }
walDataj, err := json.Marshal(walData)
if err != nil {
return err
}
walsDataj, err := json.Marshal(walsData) walsDataj, err := json.Marshal(walsData)
if err != nil { if err != nil {
return err return err
@ -1081,6 +1125,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0)) cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0))
then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj)))
then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence)) then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence))
then = append(then, etcdclientv3.OpPut(walKey, string(walDataj)))
txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...) txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...)
tresp, err := txn.Commit() tresp, err := txn.Commit()
if err != nil { if err != nil {
@ -1090,5 +1135,10 @@ func (d *DataManager) InitEtcd(ctx context.Context) error {
return errors.Errorf("failed to sync etcd: walsdata already written") return errors.Errorf("failed to sync etcd: walsdata already written")
} }
// force a checkpoint
if err := d.checkpoint(ctx, true); err != nil {
return err
}
return nil return nil
} }