From fd2a8d869cea7e3e1c8886f14ead8750201ab101 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 26 Sep 2024 05:38:13 +0000 Subject: [PATCH] fix/TD-32316-add-try-at-trans-sync-context --- source/dnode/mnode/impl/src/mndTrans.c | 29 +++++++++++++++++++++----- source/dnode/mnode/sdb/src/sdbHash.c | 6 +++--- source/libs/sync/src/syncPipeline.c | 2 ++ 3 files changed, 29 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 347f38193f..9e0a60034a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1299,7 +1299,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi } else { pAction->errCode = (terrno != 0) ? terrno : code; mError("trans:%d, %s:%d failed to write sdb since %s, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), - pAction->id, terrstr(), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); + pAction->id, tstrerror(code), sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); mndSetTransLastAction(pTrans, pAction); } @@ -1519,7 +1519,13 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr } mndSetTransLastAction(pTrans, pAction); - if (mndCannotExecuteTransAction(pMnode, topHalf)) break; + if (mndCannotExecuteTransAction(pMnode, topHalf)) { + pTrans->lastErrorNo = code; + pTrans->code = code; + mInfo("trans:%d, %s:%d, topHalf:%d, not execute next action, code:%s", pTrans->id, mndTransStr(pAction->stage), + action, topHalf, tstrerror(code)); + break; + } if (code == 0) { pTrans->code = 0; @@ -1617,7 +1623,20 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf); } - if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; + if (mndCannotExecuteTransAction(pMnode, topHalf)) { + pTrans->lastErrorNo = code; + pTrans->code = code; + bool continueExec = true; + if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { + continueExec = true; + } else { + continueExec = false; + } + mInfo("trans:%d, cannot execute redo action stage, topHalf:%d, continueExec:%d, code:%s", pTrans->id, topHalf, + continueExec, tstrerror(code)); + + return continueExec; + } terrno = code; if (code == 0) { @@ -1834,13 +1853,13 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) { // start trans, pullup, receive rsp, kill void mndTransExecute(SMnode *pMnode, STrans *pTrans) { bool topHalf = true; - return mndTransExecuteImp(pMnode, pTrans, topHalf); + mndTransExecuteImp(pMnode, pTrans, topHalf); } // update trans void mndTransRefresh(SMnode *pMnode, STrans *pTrans) { bool topHalf = false; - return mndTransExecuteImp(pMnode, pTrans, topHalf); + mndTransExecuteImp(pMnode, pTrans, topHalf); } static int32_t mndProcessTransTimer(SRpcMsg *pReq) { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index b83554c6f9..ea44a7c549 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -162,13 +162,13 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * pRow->status = pRaw->status; sdbPrintOper(pSdb, pRow, "insert"); - if (taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *)) != 0) { + int32_t code = 0; + if ((code = taosHashPut(hash, pRow->pObj, keySize, &pRow, sizeof(void *))) != 0) { sdbUnLock(pSdb, type); sdbFreeRow(pSdb, pRow, false); - return terrno; + return code; } - int32_t code = 0; SdbInsertFp insertFp = pSdb->insertFps[pRow->type]; if (insertFp != NULL) { code = (*insertFp)(pSdb, pRow->pObj); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 016ee5cf20..9f6acf6d83 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -728,6 +728,8 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe sDebug("vgId:%d, get response info, seqNum:%" PRId64 ", num:%d", pNode->vgId, cbMeta.seqNum, num); code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); + sDebug("vgId:%d, fsm execute, index:%" PRId64 ", term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId, + pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry); if (retry) { taosMsleep(10); sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);