This commit is contained in:
54liuyao 2023-12-27 15:57:53 +08:00
commit 6e761232b2
2 changed files with 30 additions and 14 deletions

View File

@ -558,22 +558,31 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
pEntry->term);
}
SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
int32_t code = 0;
bool retry = false;
do {
SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = applyCode;
cbMeta.state = role;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = term;
cbMeta.flag = -1;
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = applyCode;
cbMeta.state = role;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = term;
cbMeta.flag = -1;
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
if (retry) {
taosMsleep(10);
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
}
} while (retry);
return code;
}

View File

@ -806,6 +806,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
goto _SEND_REPLY;
}
SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
if (pReceiver->snapshotParam.start != beginIndex) {
sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
pReceiver->snapshotParam.start, beginIndex);
goto _SEND_REPLY;
}
code = 0;
_SEND_REPLY:
if (code != 0 && terrno != 0) {