enh: retry on fsm commit when rpc out of memory in syncFsmExecute
This commit is contained in:
parent
aff8e1c9a6
commit
7b967ffdc6
|
@ -558,22 +558,31 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
|
|||
pEntry->term);
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {.code = applyCode};
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
int32_t code = 0;
|
||||
bool retry = false;
|
||||
do {
|
||||
SRpcMsg rpcMsg = {.code = applyCode};
|
||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
cbMeta.code = applyCode;
|
||||
cbMeta.state = role;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.currentTerm = term;
|
||||
cbMeta.flag = -1;
|
||||
SFsmCbMeta cbMeta = {0};
|
||||
cbMeta.index = pEntry->index;
|
||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
|
||||
cbMeta.isWeak = pEntry->isWeak;
|
||||
cbMeta.code = applyCode;
|
||||
cbMeta.state = role;
|
||||
cbMeta.seqNum = pEntry->seqNum;
|
||||
cbMeta.term = pEntry->term;
|
||||
cbMeta.currentTerm = term;
|
||||
cbMeta.flag = -1;
|
||||
|
||||
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
|
||||
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||
code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
|
||||
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
|
||||
if (retry) {
|
||||
taosMsleep(10);
|
||||
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
|
||||
}
|
||||
} while (retry);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue