enh: finish restore with commit and applied indexes instead of num of items in apply queue

This commit is contained in:
Benguang Zhao 2023-03-09 19:29:09 +08:00
parent 552d0bc8a0
commit 5e53b1225b
4 changed files with 17 additions and 11 deletions

View File

@ -147,7 +147,7 @@ typedef struct SSyncFSM {
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx);
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);

View File

@ -162,7 +162,7 @@ static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
} }
void mndRestoreFinish(const SSyncFSM *pFsm) { void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) { if (!pMnode->deploy) {

View File

@ -521,21 +521,27 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p
return code; return code;
} }
static void vnodeRestoreFinish(const SSyncFSM *pFsm) { static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SyncIndex appliedIdx = -1;
do { do {
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); appliedIdx = vnodeSyncAppliedIndex(pFsm);
if (itemSize == 0) { ASSERT(appliedIdx <= commitIdx);
vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId); if (appliedIdx == commitIdx) {
vInfo("vgId:%d, no more items to be applied, restore finish", pVnode->config.vgId);
break; break;
} else { } else {
vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize); int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
vInfo("vgId:%d, restore not finish since %" PRId64
" items to be applied, and %d in apply queue. commit-index:%" PRId64 ", applied-index:%" PRId64,
pVnode->config.vgId, commitIdx - appliedIdx, itemSize, commitIdx, appliedIdx);
taosMsleep(10); taosMsleep(10);
} }
} while (true); } while (true);
walApplyVer(pVnode->pWal, pVnode->state.applied); ASSERT(appliedIdx == commitIdx);
walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true; pVnode->restored = true;
vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); vInfo("vgId:%d, sync restore finished", pVnode->config.vgId);

View File

@ -596,10 +596,10 @@ _out:
// mark as restored if needed // mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
currentTerm <= pEntry->term) { currentTerm <= pEntry->term) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex);
pNode->restoreFinish = true; pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
} }
if (!inBuf) { if (!inBuf) {