From 7b967ffdc63c397e684b820f7e30dc1671a5ec86 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 25 Dec 2023 20:22:18 +0800 Subject: [PATCH 1/2] enh: retry on fsm commit when rpc out of memory in syncFsmExecute --- source/libs/sync/src/syncPipeline.c | 37 ++++++++++++++++++----------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index f2386797c1..28ee5ba841 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -558,22 +558,31 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe pEntry->term); } - SRpcMsg rpcMsg = {.code = applyCode}; - syncEntry2OriginalRpc(pEntry, &rpcMsg); + int32_t code = 0; + bool retry = false; + do { + SRpcMsg rpcMsg = {.code = applyCode}; + syncEntry2OriginalRpc(pEntry, &rpcMsg); - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = applyCode; - cbMeta.state = role; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = term; - cbMeta.flag = -1; + SFsmCbMeta cbMeta = {0}; + cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); + cbMeta.isWeak = pEntry->isWeak; + cbMeta.code = applyCode; + cbMeta.state = role; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = term; + cbMeta.flag = -1; - (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); - int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); + (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); + code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); + retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); + if (retry) { + taosMsleep(10); + sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); + } + } while (retry); return code; } From 0e6ea7af712f4c918df6b1a323b825ba1169f494 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 26 Dec 2023 10:54:40 +0800 Subject: [PATCH 2/2] enh: check snapshot begin index for unexpected change in syncNodeOnSnapshotBegin --- source/libs/sync/src/syncSnapshot.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 2277b70c8f..10a8734617 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -806,6 +806,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p goto _SEND_REPLY; } + SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode); + if (pReceiver->snapshotParam.start != beginIndex) { + sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64, + pReceiver->snapshotParam.start, beginIndex); + goto _SEND_REPLY; + } + code = 0; _SEND_REPLY: if (code != 0 && terrno != 0) {