diff --git a/internal/datamanager/wal.go b/internal/datamanager/wal.go index 847aaf3..689ea6f 100644 --- a/internal/datamanager/wal.go +++ b/internal/datamanager/wal.go @@ -590,7 +590,7 @@ func (d *DataManager) sync(ctx context.Context) error { switch walData.WalStatus { case WalStatusCommitted: walFilePath := d.storageWalStatusFile(walData.WalSequence) - d.log.Debugf("syncing committed wal to storage") + d.log.Debugf("syncing committed wal %q to storage", walData.WalSequence) header := &WalHeader{ WalDataFileID: walData.WalDataFileID, PreviousWalSequence: walData.PreviousWalSequence, @@ -924,7 +924,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { walData := &WalData{ WalSequence: wal.WalSequence, WalDataFileID: header.WalDataFileID, - WalStatus: WalStatusCommitted, + WalStatus: WalStatusCommittedStorage, } if wal.Checkpointed { walData.WalStatus = WalStatusCheckpointed @@ -1063,9 +1063,53 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { } } + // insert an empty wal and make it already committedstorage + walSequence, err := sequence.IncSequence(ctx, d.e, etcdWalSeqKey) + if err != nil { + return err + } + + walDataFileID := uuid.NewV4().String() + walDataFilePath := d.storageWalDataFile(walDataFileID) + walKey := etcdWalKey(walSequence.String()) + + if err := d.ost.WriteObject(walDataFilePath, bytes.NewReader([]byte{}), 0, true); err != nil { + return err + } + d.log.Debugf("wrote wal file: %s", walDataFilePath) + + walFilePath := d.storageWalStatusFile(walSequence.String()) + d.log.Infof("syncing committed wal %q to storage", walSequence.String()) + header := &WalHeader{ + WalDataFileID: walDataFileID, + PreviousWalSequence: lastCommittedStorageWalSequence, + } + headerj, err := json.Marshal(header) + if err != nil { + return err + } + walFileCommittedPath := walFilePath + ".committed" + if err := d.ost.WriteObject(walFileCommittedPath, bytes.NewReader(headerj), int64(len(headerj)), true); err != nil { + return err + } + + lastCommittedStorageWalSequence = walSequence.String() + + walData := &WalData{ + WalSequence: walSequence.String(), + WalDataFileID: walDataFileID, + WalStatus: WalStatusCommittedStorage, + } + walsData := &WalsData{ LastCommittedWalSequence: lastCommittedStorageWalSequence, } + + walDataj, err := json.Marshal(walData) + if err != nil { + return err + } + walsDataj, err := json.Marshal(walsData) if err != nil { return err @@ -1081,6 +1125,7 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(etcdWalsDataKey), "=", 0)) then = append(then, etcdclientv3.OpPut(etcdWalsDataKey, string(walsDataj))) then = append(then, etcdclientv3.OpPut(etcdLastCommittedStorageWalSeqKey, lastCommittedStorageWalSequence)) + then = append(then, etcdclientv3.OpPut(walKey, string(walDataj))) txn = d.e.Client().Txn(ctx).If(cmp...).Then(then...) tresp, err := txn.Commit() if err != nil { @@ -1090,5 +1135,10 @@ func (d *DataManager) InitEtcd(ctx context.Context) error { return errors.Errorf("failed to sync etcd: walsdata already written") } + // force a checkpoint + if err := d.checkpoint(ctx, true); err != nil { + return err + } + return nil }