diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index acd0a2009c..68c55e235f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -166,7 +166,7 @@ typedef struct { int32_t failedTimes; void* rpcRsp; int32_t rpcRspLen; - int32_t redoActionPos; + int32_t actionPos; SArray* prepareActions; SArray* redoActions; SArray* undoActions; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7b6563f4b4..84940e01d4 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -169,7 +169,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) { SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) - SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->actionPos, _OVER) int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); @@ -317,7 +317,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) - SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pTrans->actionPos, _OVER) if (sver > TRANS_VER1_NUMBER) { SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER) @@ -525,7 +525,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions); pOld->stage = pNew->stage; - pOld->redoActionPos = pNew->redoActionPos; + pOld->actionPos = pNew->actionPos; if (pOld->stage == TRN_STAGE_COMMIT) { pOld->stage = TRN_STAGE_COMMIT_ACTION; @@ -1360,22 +1360,19 @@ static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool return code; } -static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { +static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArray *pActions, bool topHalf) { int32_t code = 0; - int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); + int32_t numOfActions = taosArrayGetSize(pActions); if (numOfActions == 0) return code; - taosThreadMutexLock(&pTrans->mutex); - - if (pTrans->redoActionPos >= numOfActions) { - taosThreadMutexUnlock(&pTrans->mutex); + if (pTrans->actionPos >= numOfActions) { return code; } - mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); + mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos); - for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); + for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pActions, pTrans->actionPos); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { @@ -1409,14 +1406,14 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, if (code == 0) { pTrans->code = 0; - pTrans->redoActionPos++; + pTrans->actionPos++; mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), pAction->id); taosThreadMutexUnlock(&pTrans->mutex); code = mndTransSync(pMnode, pTrans); taosThreadMutexLock(&pTrans->mutex); if (code != 0) { - pTrans->redoActionPos--; + pTrans->actionPos--; pTrans->code = terrno; mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr()); @@ -1442,8 +1439,26 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, } } - taosThreadMutexUnlock(&pTrans->mutex); + return code; +} +static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; + taosThreadMutexLock(&pTrans->mutex); + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { + code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->redoActions, topHalf); + } + taosThreadMutexUnlock(&pTrans->mutex); + return code; +} + +static int32_t mndTransExecuteUndoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; + taosThreadMutexLock(&pTrans->mutex); + if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { + code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->undoActions, topHalf); + } + taosThreadMutexUnlock(&pTrans->mutex); return code; } @@ -1563,13 +1578,22 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, boo static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); + int32_t code = 0; + + if (pTrans->exec == TRN_EXEC_SERIAL) { + code = mndTransExecuteUndoActionsSerial(pMnode, pTrans, topHalf); + } else { + code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); + } + + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; + terrno = code; if (code == 0) { pTrans->stage = TRN_STAGE_PRE_FINISH; mInfo("trans:%d, stage from undoAction to pre-finish", pTrans->id); continueExec = true; - } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) { mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else {