From 18ca6d93e7dc0af7dd530b8db8ce6f2e50eaadbb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 31 May 2022 16:55:16 +0800 Subject: [PATCH] refactor: make trans support multi steps --- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/inc/mndTrans.h | 2 +- source/dnode/mnode/impl/src/mndMnode.c | 4 +- source/dnode/mnode/impl/src/mndSma.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 114 ++++++++++++++----------- 5 files changed, 70 insertions(+), 54 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fd0f54c66b..9a60ad860f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -129,7 +129,7 @@ typedef enum { typedef enum { TRN_EXEC_PARALLEL = 0, - TRN_EXEC_ONE_BY_ONE = 1, + TRN_EXEC_NO_PARALLEL = 1, } ETrnExecType; typedef enum { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index a7e1f7cd02..ba6f5faf1e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); -void mndTransSetExecOneByOne(STrans *pTrans); +void mndTransSetNoParallel(STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransProcessRsp(SRpcMsg *pRsp); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 8c5ea840af..5b8ba6deaa 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -367,7 +367,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); - mndTransSetExecOneByOne(pTrans); + mndTransSetNoParallel(pTrans); if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; @@ -539,7 +539,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - mndTransSetExecOneByOne(pTrans); + mndTransSetNoParallel(pTrans); if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 7b5d1b6c32..0493b00d33 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -507,7 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mndTransSetDbInfo(pTrans, pDb); - mndTransSetExecOneByOne(pTrans); + mndTransSetNoParallel(pTrans); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c5a1e0ba5a..ad6388c585 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -120,8 +120,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) - SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); @@ -261,8 +261,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans->type = type; pTrans->parallel = parallel; SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) - SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER) @@ -567,6 +567,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions); pOld->stage = pNew->stage; + pOld->redoActionPos = pNew->redoActionPos; if (pOld->stage == TRN_STAGE_COMMIT) { pOld->stage = TRN_STAGE_COMMIT_ACTION; @@ -694,11 +695,10 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void * } void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { - pTrans->dbUid = pDb->uid; memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); } -void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; } +void mndTransSetNoParallel(STrans *pTrans) { pTrans->parallel = TRN_EXEC_NO_PARALLEL; } static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); @@ -708,7 +708,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("trans:%d, sync to other nodes", pTrans->id); + mDebug("trans:%d, sync to other mnodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -761,7 +761,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); conflict = true; } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { - if (pNewTrans->dbUid == pTrans->dbUid) { + if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); conflict = true; } @@ -774,7 +774,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); conflict = true; } else if (mndIsDbTrans(pTrans)) { - if (pNewTrans->dbUid == pTrans->dbUid) { + if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); conflict = true; } @@ -856,7 +856,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } if (pTrans->policy == TRN_POLICY_ROLLBACK) { - if (pTrans->stage == pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { + if (pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; sendRsp = true; } @@ -876,12 +876,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage), pTrans->rpcInfo.ahandle); - SRpcMsg rspMsg = { - .code = code, - .pCont = rpcCont, - .contLen = pTrans->rpcRspLen, - .info = pTrans->rpcInfo, - }; + SRpcMsg rspMsg = {.code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, .info = pTrans->rpcInfo}; tmsgSendRsp(&rspMsg); pTrans->rpcInfo.handle = NULL; pTrans->rpcRsp = NULL; @@ -944,7 +939,6 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - if (pAction == NULL) continue; if (pAction->msgSent && pAction->msgReceived && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue; @@ -1017,16 +1011,14 @@ static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAc } } -static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); int32_t code = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); code = mndTransExecSingleAction(pMnode, pTrans, pAction); - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - break; - } + if (code != 0) break; } return code; @@ -1036,7 +1028,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA int32_t numOfActions = taosArrayGetSize(pArray); if (numOfActions == 0) return 0; - if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) { + if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) { return -1; } @@ -1044,8 +1036,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA int32_t errCode = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - if (pAction == NULL) continue; - if ((pAction->msgSent && pAction->msgReceived) || pAction->rawWritten) { + if (pAction->msgReceived || pAction->rawWritten) { numOfExecuted++; if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { errCode = pAction->errCode; @@ -1087,12 +1078,61 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); - if (code != 0) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute commitActions since %s", terrstr()); } return code; } +static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTrans) { + int32_t code = 0; + int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); + if (numOfActions == 0) return code; + if (pTrans->redoActionPos >= numOfActions) return code; + + for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); + + code = mndTransExecSingleAction(pMnode, pTrans, pAction); + if (code == 0) { + if (pAction->msgSent) { + if (pAction->msgReceived) { + if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { + code = pAction->errCode; + } + } else { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + } + if (pAction->rawWritten) { + if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { + code = pAction->errCode; + } + } + } + + if (code == 0) { + pTrans->redoActionPos++; + mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), + pAction->id); + code = mndTransSync(pMnode, pTrans); + if (code != 0) { + mError("trans:%d, failed to sync redoActionPos since %s", pTrans->id, terrstr()); + break; + } + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); + break; + } else { + mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, + terrstr()); + break; + } + } + + return code; +} + static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; pTrans->stage = TRN_STAGE_REDO_ACTION; @@ -1100,36 +1140,12 @@ static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; - if (pTrans->redoActionPos >= taosArrayGetSize(pTrans->redoActions)) return continueExec; - - STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); - int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction); - if (code == 0) { - pTrans->redoActionPos++; - mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), - pAction->id); - - // todo sync these infos - } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); - continueExec = false; - } else { - mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, - terrstr()); - continueExec = false; - } - - return continueExec; -} - static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; int32_t code = 0; - if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { - code = mndTransExecuteRedoActionsOneByOne(pMnode, pTrans); + if (pTrans->parallel == TRN_EXEC_NO_PARALLEL) { + code = mndTransExecuteRedoActionsNoParallel(pMnode, pTrans); } else { code = mndTransExecuteRedoActions(pMnode, pTrans); }