From 10e9d79201d5a6c577336ea8d6500ad7c8686daf Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 23 Apr 2024 16:44:05 +0800 Subject: [PATCH 1/4] refact: rename STrans redoActionPos to actionPos --- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index acd0a2009c..68c55e235f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -166,7 +166,7 @@ typedef struct { int32_t failedTimes; void* rpcRsp; int32_t rpcRspLen; - int32_t redoActionPos; + int32_t actionPos; SArray* prepareActions; SArray* redoActions; SArray* undoActions; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7b6563f4b4..3da60dcd82 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -169,7 +169,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) { SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) - SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->actionPos, _OVER) int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); @@ -317,7 +317,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) - SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pTrans->actionPos, _OVER) if (sver > TRANS_VER1_NUMBER) { SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER) @@ -525,7 +525,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions); pOld->stage = pNew->stage; - pOld->redoActionPos = pNew->redoActionPos; + pOld->actionPos = pNew->actionPos; if (pOld->stage == TRN_STAGE_COMMIT) { pOld->stage = TRN_STAGE_COMMIT_ACTION; @@ -1367,15 +1367,15 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, taosThreadMutexLock(&pTrans->mutex); - if (pTrans->redoActionPos >= numOfActions) { + if (pTrans->actionPos >= numOfActions) { taosThreadMutexUnlock(&pTrans->mutex); return code; } - mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); + mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos); - for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); + for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->actionPos); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { @@ -1409,14 +1409,14 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, if (code == 0) { pTrans->code = 0; - pTrans->redoActionPos++; + pTrans->actionPos++; mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), pAction->id); taosThreadMutexUnlock(&pTrans->mutex); code = mndTransSync(pMnode, pTrans); taosThreadMutexLock(&pTrans->mutex); if (code != 0) { - pTrans->redoActionPos--; + pTrans->actionPos--; pTrans->code = terrno; mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr()); From edcd1b87d8281e478d9cbe02c9914ca3e28a5587 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 23 Apr 2024 17:02:49 +0800 Subject: [PATCH 2/4] refact: add mndTransExecuteActionsSerial --- source/dnode/mnode/impl/src/mndTrans.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3da60dcd82..2ef742f372 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1360,22 +1360,19 @@ static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool return code; } -static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { +static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArray *pActions, bool topHalf) { int32_t code = 0; - int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); + int32_t numOfActions = taosArrayGetSize(pActions); if (numOfActions == 0) return code; - taosThreadMutexLock(&pTrans->mutex); - if (pTrans->actionPos >= numOfActions) { - taosThreadMutexUnlock(&pTrans->mutex); return code; } mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos); for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->actionPos); + STransAction *pAction = taosArrayGet(pActions, pTrans->actionPos); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { @@ -1442,8 +1439,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, } } - taosThreadMutexUnlock(&pTrans->mutex); + return code; +} +static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = 0; + taosThreadMutexLock(&pTrans->mutex); + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { + code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->redoActions, topHalf); + } + taosThreadMutexUnlock(&pTrans->mutex); return code; } From 396c08e14a49df52c076821e11e8ad7ec29d54fd Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 23 Apr 2024 17:22:16 +0800 Subject: [PATCH 3/4] fix: execute undo actions of TRN_EXEC_SERIAL trans sequentially --- source/dnode/mnode/impl/src/mndTrans.c | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 2ef742f372..1ecc16aed7 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1452,6 +1452,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, return code; } +static int32_t mndTransExecuteUndoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = 0; + taosThreadMutexLock(&pTrans->mutex); + if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { + code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->undoActions, topHalf); + } + taosThreadMutexUnlock(&pTrans->mutex); + return code; +} + bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; @@ -1568,13 +1578,22 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, boo static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); + int32_t code = 0; + + if (pTrans->exec == TRN_EXEC_SERIAL) { + code = mndTransExecuteUndoActionsSerial(pMnode, pTrans, topHalf); + } else { + code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); + } + + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; + terrno = code; if (code == 0) { pTrans->stage = TRN_STAGE_PRE_FINISH; mInfo("trans:%d, stage from undoAction to pre-finish", pTrans->id); continueExec = true; - } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) { mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else { From 9f8e970a323589f2d7326976755a56a8be15b411 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 24 Apr 2024 11:20:52 +0800 Subject: [PATCH 4/4] enh: set default errcode as action in progress in executing serial trans --- source/dnode/mnode/impl/src/mndTrans.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 1ecc16aed7..84940e01d4 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1443,7 +1443,7 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr } static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { - int32_t code = 0; + int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; taosThreadMutexLock(&pTrans->mutex); if (pTrans->stage == TRN_STAGE_REDO_ACTION) { code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->redoActions, topHalf); @@ -1453,7 +1453,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, } static int32_t mndTransExecuteUndoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { - int32_t code = 0; + int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; taosThreadMutexLock(&pTrans->mutex); if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->undoActions, topHalf);