diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index f2386797c1..28ee5ba841 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -558,22 +558,31 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe pEntry->term); } - SRpcMsg rpcMsg = {.code = applyCode}; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + int32_t code = 0; + bool retry = false; + do { + SRpcMsg rpcMsg = {.code = applyCode}; + syncEntry2OriginalRpc(pEntry, &rpcMsg); - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = applyCode; - cbMeta.state = role; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = term; - cbMeta.flag = -1; + SFsmCbMeta cbMeta = {0}; + cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); + cbMeta.isWeak = pEntry->isWeak; + cbMeta.code = applyCode; + cbMeta.state = role; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = term; + cbMeta.flag = -1; - (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); - int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); + (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); + code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); + retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); + if (retry) { + taosMsleep(10); + sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); + } + } while (retry); return code; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 2277b70c8f..10a8734617 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -806,6 +806,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p goto _SEND_REPLY; } + SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode); + if (pReceiver->snapshotParam.start != beginIndex) { + sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64, + pReceiver->snapshotParam.start, beginIndex); + goto _SEND_REPLY; + } + code = 0; _SEND_REPLY: if (code != 0 && terrno != 0) {