Merge pull request #24227 from taosdata/FEAT/TD-27125-3.0

enh: retry on fsm commit when rpc out of memory in syncFsmExecute
This commit is contained in:
wade zhang 2023-12-27 14:37:31 +08:00 committed by GitHub
commit 0f7ad90845
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 14 deletions

View File

@ -558,6 +558,9 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
pEntry->term); pEntry->term);
} }
int32_t code = 0;
bool retry = false;
do {
SRpcMsg rpcMsg = {.code = applyCode}; SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
@ -573,7 +576,13 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
cbMeta.flag = -1; cbMeta.flag = -1;
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
if (retry) {
taosMsleep(10);
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
}
} while (retry);
return code; return code;
} }

View File

@ -806,6 +806,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
goto _SEND_REPLY; goto _SEND_REPLY;
} }
SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
if (pReceiver->snapshotParam.start != beginIndex) {
sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
pReceiver->snapshotParam.start, beginIndex);
goto _SEND_REPLY;
}
code = 0; code = 0;
_SEND_REPLY: _SEND_REPLY:
if (code != 0 && terrno != 0) { if (code != 0 && terrno != 0) {