From 0d11825d383c3b5dd116886c98198ecf32f436e4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 26 Dec 2021 23:46:22 +0800 Subject: [PATCH] refact trans --- include/common/tmsg.h | 4 + include/common/tmsgdef.h | 1 + source/dnode/mgmt/impl/src/dndTransport.c | 2 + source/dnode/mnode/impl/inc/mndDef.h | 21 +- source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/inc/mndTrans.h | 6 +- source/dnode/mnode/impl/src/mndMnode.c | 6 +- source/dnode/mnode/impl/src/mndStb.c | 6 +- source/dnode/mnode/impl/src/mndTopic.c | 6 +- source/dnode/mnode/impl/src/mndTrans.c | 440 +++++++++++++--------- source/dnode/mnode/impl/src/mndVgroup.c | 6 +- source/dnode/mnode/impl/src/mnode.c | 21 ++ 12 files changed, 323 insertions(+), 197 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 92b453bc1f..f0188a5b51 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -655,6 +655,10 @@ typedef struct { SVnodeLoads vnodeLoads; } SStatusMsg; +typedef struct { + int32_t reserved; +} STransMsg; + typedef struct { int32_t dnodeId; int32_t clusterId; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 15b5b9da28..ff22e077c0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -114,6 +114,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", NULL, NULL) diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index b8ebe3f884..a5be338a17 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -118,6 +118,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { // message from dnode to mnode pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index c6fb0cce1d..fbe7da49c6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -62,10 +62,14 @@ typedef enum { typedef enum { TRN_STAGE_PREPARE = 0, - TRN_STAGE_EXECUTE = 1, - TRN_STAGE_ROLLBACK = 2, - TRN_STAGE_COMMIT = 3, - TRN_STAGE_OVER = 4, + TRN_STAGE_REDO_LOG = 1, + TRN_STAGE_REDO_ACTION = 2, + TRN_STAGE_UNDO_LOG = 3, + TRN_STAGE_UNDO_ACTION = 4, + TRN_STAGE_COMMIT_LOG = 5, + TRN_STAGE_COMMIT = 6, + TRN_STAGE_ROLLBACK = 7, + TRN_STAGE_FINISHED = 8 } ETrnStage; typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; @@ -95,7 +99,8 @@ typedef struct { int32_t id; ETrnStage stage; ETrnPolicy policy; - int32_t retryTimes; + int32_t code; + int32_t failedTimes; void *rpcHandle; void *rpcAHandle; SArray *redoLogs; @@ -313,12 +318,6 @@ typedef struct SMnodeMsg { void *pCont; } SMnodeMsg; -typedef struct { - int32_t id; - int32_t code; - void *rpcHandle; -} STransMsg; - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index a9932ce048..01dd893e66 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -75,6 +75,7 @@ typedef struct SMnode { int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; tmr_h timer; + tmr_h transTimer; char *path; SMnodeCfg cfg; int64_t checkTime; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 2d57179f1c..201fcde1a9 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -44,11 +44,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); -void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); -void mndTransHandleActionRsp(SMnodeMsg *pMsg); - -char *mndTransStageStr(ETrnStage stage); -char *mndTransPolicyStr(ETrnPolicy policy); +void mndTransProcessRsp(SMnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 8f57a18a1d..1800fd8e83 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -567,17 +567,17 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { } static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 24d88068bf..bdc13f91b7 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -552,7 +552,7 @@ static int32_t mndProcessCreateStbMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessCreateStbInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -616,7 +616,7 @@ static int32_t mndProcessAlterStbMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessAlterStbInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -728,7 +728,7 @@ static int32_t mndProcessDropStbMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessDropStbInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 04f6907918..7106c79588 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -524,7 +524,7 @@ static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessAlterTopicInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -636,7 +636,7 @@ static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } @@ -706,7 +706,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { } static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 0b714c34ae..b3a584c682 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -27,7 +27,6 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans); static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); -static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); @@ -36,14 +35,22 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); -static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); + static void mndTransExecute(SMnode *pMnode, STrans *pTrans); +static void mndTransSendRpcRsp(STrans *pTrans); +static int32_t mndProcessTransMsg(SMnodeMsg *pMsg); int32_t mndInitTrans(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TRANS, @@ -54,6 +61,7 @@ int32_t mndInitTrans(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndTransActionUpdate, .deleteFp = (SdbDeleteFp)mndTransActionDelete}; + mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg); return sdbSetTable(pMnode->pSdb, table); } @@ -290,12 +298,12 @@ TRANS_DECODE_OVER: static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { pTrans->stage = TRN_STAGE_PREPARE; - mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); + mTrace("trans:%d, perform insert action", pTrans->id); return 0; } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); + mTrace("trans:%d, perform delete action", pTrans->id); mndTransDropLogs(pTrans->redoLogs); mndTransDropLogs(pTrans->undoLogs); @@ -307,7 +315,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { } static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) { - mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage)); + mTrace("trans:%d, perform update action", pOldTrans->id); pOldTrans->stage = pNewTrans->stage; return 0; } @@ -326,34 +334,6 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { sdbRelease(pSdb, pTrans); } -char *mndTransStageStr(ETrnStage stage) { - switch (stage) { - case TRN_STAGE_PREPARE: - return "prepare"; - case TRN_STAGE_EXECUTE: - return "execute"; - case TRN_STAGE_COMMIT: - return "commit"; - case TRN_STAGE_ROLLBACK: - return "rollback"; - case TRN_STAGE_OVER: - return "over"; - default: - return "undefined"; - } -} - -char *mndTransPolicyStr(ETrnPolicy policy) { - switch (policy) { - case TRN_POLICY_ROLLBACK: - return "prepare"; - case TRN_POLICY_RETRY: - return "retry"; - default: - return "undefined"; - } -} - STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { @@ -428,23 +408,11 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { return 0; } -int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendLog(pTrans->redoLogs, pRaw); - mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code); - return code; -} +int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); } -int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendLog(pTrans->undoLogs, pRaw); - mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code); - return code; -} +int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); } -int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendLog(pTrans->commitLogs, pRaw); - mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code); - return code; -} +int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); } static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { void *ptr = taosArrayPush(pArray, pAction); @@ -457,20 +425,14 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { - int32_t code = mndTransAppendAction(pTrans->redoActions, pAction); - mTrace("trans:%d, msg:%s append to redo actions, code:0x%x", pTrans->id, TMSG_INFO(pAction->msgType), code); - return code; + return mndTransAppendAction(pTrans->redoActions, pAction); } int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { - int32_t code = mndTransAppendAction(pTrans->undoActions, pAction); - mTrace("trans:%d, msg:%s append to undo actions, code:0x%x", pTrans->id, TMSG_INFO(pAction->msgType), code); - return code; + return mndTransAppendAction(pTrans->undoActions, pAction); } -int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { - mDebug("trans:%d, prepare transaction", pTrans->id); - +static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); if (pRaw == NULL) { mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); @@ -494,6 +456,17 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } + return 0; +} + +int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { + mDebug("trans:%d, prepare transaction", pTrans->id); + if (mndTransSync(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + return -1; + } + mDebug("trans:%d, prepare finished", pTrans->id); + STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id); if (pNewTrans == NULL) { mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr()); @@ -507,84 +480,41 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return 0; } -int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { +static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { + if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0; + mDebug("trans:%d, commit transaction", pTrans->id); - - SSdbRaw *pRaw = mndTransActionEncode(pTrans); - if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + if (mndTransSync(pMnode, pTrans) != 0) { + mError("trans:%d, failed to commit since %s", pTrans->id, terrstr()); return -1; } - sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - - if (taosArrayGetSize(pTrans->commitLogs) != 0) { - mTrace("trans:%d, sync to other nodes", pTrans->id); - if (mndSyncPropose(pMnode, pRaw) != 0) { - mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); - sdbFreeRaw(pRaw); - return -1; - } - mTrace("trans:%d, sync finished", pTrans->id); - } - - if (sdbWrite(pMnode->pSdb, pRaw) != 0) { - mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); - return -1; - } - mDebug("trans:%d, commit finished", pTrans->id); return 0; } -int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { +static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, rollback transaction", pTrans->id); - - SSdbRaw *pRaw = mndTransActionEncode(pTrans); - if (pRaw == NULL) { - mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + if (mndTransSync(pMnode, pTrans) != 0) { + mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr()); return -1; } - sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - - mTrace("trans:%d, sync to other nodes", pTrans->id); - int32_t code = mndSyncPropose(pMnode, pRaw); - if (code != 0) { - mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); - sdbFreeRaw(pRaw); - return -1; - } - - mTrace("trans:%d, sync finished", pTrans->id); - code = sdbWrite(pMnode->pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); - return -1; - } - mDebug("trans:%d, rollback finished", pTrans->id); return 0; } -static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) { - if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; - mDebug("trans:%d, send rpc rsp, RPC:%p ahandle:%p code:0x%x", pTrans->id, pTrans->rpcHandle, pTrans->rpcAHandle, - code & 0xFFFF); - +static void mndTransSendRpcRsp(STrans *pTrans) { if (pTrans->rpcHandle != NULL) { - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code, .ahandle = pTrans->rpcAHandle}; + mDebug("trans:%d, send rsp, ahandle:%p code:0x%x", pTrans->id, pTrans->rpcAHandle, pTrans->code & 0xFFFF); + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle}; rpcSendResponse(&rspMsg); } } -void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) { - // todo -} - -void mndTransHandleActionRsp(SMnodeMsg *pMsg) { +void mndTransProcessRsp(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; - int64_t sig = (int64_t)(pMsg->rpcMsg.ahandle); - int32_t transId = (int32_t)(sig >> 32); - int32_t action = (int32_t)((sig << 32) >> 32); + int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle); + int32_t transId = (int32_t)(signature >> 32); + int32_t action = (int32_t)((signature << 32) >> 32); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans == NULL) { @@ -593,15 +523,17 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { } SArray *pArray = NULL; - if (pTrans->stage == TRN_STAGE_EXECUTE) { + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { pArray = pTrans->redoActions; - } else if (pTrans->stage == TRN_STAGE_ROLLBACK) { + } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { pArray = pTrans->undoActions; } else { + mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage); + goto HANDLE_ACTION_RSP_OVER; } if (pArray == NULL) { - mError("trans:%d, invalid trans stage:%s", transId, mndTransStageStr(pTrans->stage)); + mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage); goto HANDLE_ACTION_RSP_OVER; } @@ -653,15 +585,27 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { return mndTransExecuteLogs(pMnode, pTrans->commitLogs); } -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { + int32_t numOfActions = taosArrayGetSize(pArray); + + for (int32_t action = 0; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pArray, action); + if (pAction == NULL) continue; + if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; + + pAction->msgSent = 0; + pAction->msgReceived = 0; + pAction->errCode = 0; + mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action); + } +} + +static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); - if (numOfActions == 0) return 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - // if (pAction->msgSent && !pAction->msgReceived) continue; - // if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; if (pAction->msgSent) continue; int64_t signature = pTrans->id; @@ -684,6 +628,17 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } + return 0; +} + +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { + int32_t numOfActions = taosArrayGetSize(pArray); + if (numOfActions == 0) return 0; + + if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) { + return -1; + } + int32_t numOfReceived = 0; int32_t errCode = 0; for (int32_t action = 0; action < numOfActions; ++action) { @@ -698,9 +653,15 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA } if (numOfReceived == numOfActions) { - mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode); - terrno = errCode; - return errCode; + if (errCode == 0) { + mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); + return 0; + } else { + mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode); + mndTransResetActions(pMnode, pTrans, pArray); + terrno = errCode; + return errCode; + } } else { mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode); return TSDB_CODE_MND_ACTION_IN_PROGRESS; @@ -715,88 +676,229 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); } -static void mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + pTrans->stage = TRN_STAGE_REDO_LOG; + mDebug("trans:%d, stage from prepare to redoLog", pTrans->id); + return continueExec; +} + +static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); if (code == 0) { - pTrans->stage = TRN_STAGE_EXECUTE; - mDebug("trans:%d, stage from prepare to execute", pTrans->id); + pTrans->code = 0; + pTrans->stage = TRN_STAGE_REDO_ACTION; + mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id); } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr()); + pTrans->code = terrno; + pTrans->stage = TRN_STAGE_UNDO_LOG; + mError("trans:%d, stage from redoLog to undoLog", pTrans->id); } + + return continueExec; } -static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); if (code == 0) { + pTrans->code = 0; pTrans->stage = TRN_STAGE_COMMIT; - mDebug("trans:%d, stage from execute to commit", pTrans->id); + mDebug("trans:%d, stage from redoAction to commit", pTrans->id); + continueExec = true; } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code)); + mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code)); + continueExec = false; } else { + pTrans->code = terrno; if (pTrans->policy == TRN_POLICY_ROLLBACK) { - pTrans->stage = TRN_STAGE_ROLLBACK; - mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr()); + pTrans->stage = TRN_STAGE_UNDO_ACTION; + mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr()); + continueExec = true; } else { - pTrans->stage = TRN_STAGE_EXECUTE; - pTrans->retryTimes++; - mError("trans:%d, stage keep on execute since %s", pTrans->id, terrstr()); + pTrans->failedTimes++; + mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); + continueExec = false; } } - return code; + return continueExec; } -static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - mndTransExecuteCommitLogs(pMnode, pTrans); - pTrans->stage = TRN_STAGE_OVER; - return 0; +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransCommit(pMnode, pTrans); + + if (code == 0) { + pTrans->code = 0; + pTrans->stage = TRN_STAGE_COMMIT_LOG; + mDebug("trans:%d, stage from commit to commitLog", pTrans->id); + continueExec = true; + } else { + pTrans->code = terrno; + if (pTrans->policy == TRN_POLICY_ROLLBACK) { + pTrans->stage = TRN_STAGE_REDO_ACTION; + mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), + pTrans->failedTimes); + continueExec = true; + } else { + pTrans->failedTimes++; + mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); + continueExec = false; + } + } + + return continueExec; } -static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); + + if (code == 0) { + pTrans->code = 0; + pTrans->stage = TRN_STAGE_FINISHED; + mDebug("trans:%d, stage from commitLog to finished", pTrans->id); + continueExec = true; + } else { + pTrans->code = terrno; + pTrans->failedTimes++; + mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr()); + continueExec = false; + ; + } + + return continueExec; +} + +static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_ROLLBACK; + mDebug("trans:%d, stage from undoLog to rollback", pTrans->id); + continueExec = true; + } else { + mDebug("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr()); + continueExec = false; + } + + return continueExec; +} + +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); if (code == 0) { - mDebug("trans:%d, rollbacked", pTrans->id); + pTrans->stage = TRN_STAGE_REDO_LOG; + mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); + continueExec = true; + } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); + continueExec = false; } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - pTrans->retryTimes++; - mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); + pTrans->failedTimes++; + mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr()); + continueExec = false; } - return code; + return continueExec; +} + +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + int32_t code = mndTransRollback(pMnode, pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_FINISHED; + mDebug("trans:%d, stage from rollback to finished", pTrans->id); + continueExec = true; + ; + } else { + pTrans->failedTimes++; + mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); + continueExec = false; + } + + return continueExec; +} + +static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { + bool continueExec = false; + + SSdbRaw *pRaw = mndTransActionEncode(pTrans); + if (pRaw == NULL) { + mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + } + sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); + + int32_t code = sdbWrite(pMnode->pSdb, pRaw); + if (code != 0) { + mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); + } + + mError("trans:%d, exec finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); + return continueExec; } static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; + bool continueExec = true; - while (code == 0) { + while (continueExec) { switch (pTrans->stage) { case TRN_STAGE_PREPARE: - mndTransPerformPrepareStage(pMnode, pTrans); + continueExec = mndTransPerformPrepareStage(pMnode, pTrans); break; - case TRN_STAGE_EXECUTE: - code = mndTransPerformExecuteStage(pMnode, pTrans); + case TRN_STAGE_REDO_LOG: + continueExec = mndTransPerformRedoLogStage(pMnode, pTrans); + break; + case TRN_STAGE_REDO_ACTION: + continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); + break; + case TRN_STAGE_UNDO_LOG: + continueExec = mndTransPerformUndoLogStage(pMnode, pTrans); + break; + case TRN_STAGE_UNDO_ACTION: + continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); + break; + case TRN_STAGE_COMMIT_LOG: + continueExec = mndTransPerformCommitLogStage(pMnode, pTrans); break; case TRN_STAGE_COMMIT: - code = mndTransCommit(pMnode, pTrans); - if (code == 0) { - mndTransPerformCommitStage(pMnode, pTrans); - } + continueExec = mndTransPerformCommitStage(pMnode, pTrans); break; case TRN_STAGE_ROLLBACK: - code = mndTransRollback(pMnode, pTrans); - if (code == 0) { - mndTransPerformRollbackStage(pMnode, pTrans); - } + continueExec = mndTransPerformRollbackStage(pMnode, pTrans); + break; + case TRN_STAGE_FINISHED: + continueExec = mndTransPerfromFinishedStage(pMnode, pTrans); break; default: - mndTransSendRpcRsp(pTrans, 0); - return; + continueExec = false; + break; } } - mndTransSendRpcRsp(pTrans, code); + if (pTrans->stage == TRN_STAGE_FINISHED) { + mndTransSendRpcRsp(pTrans); + } } + +static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + STrans *pTrans = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); + if (pIter == NULL) break; + + mndTransExecute(pMnode, pTrans); + sdbRelease(pMnode->pSdb, pTrans); + } +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 8ff2139314..c9f4401264 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -333,17 +333,17 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup) { } static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { - mndTransHandleActionRsp(pMsg); + mndTransProcessRsp(pMsg); return 0; } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 27668a585a..64ea85044a 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -50,6 +50,20 @@ void mndSendRedirectMsg(SMnode *pMnode, SRpcMsg *pMsg) { } } +static void mndTransReExecute(void *param, void *tmrId) { + SMnode *pMnode = param; + if (mndIsMaster(pMnode)) { + STransMsg *pMsg = rpcMallocCont(sizeof(STransMsg)); + SEpSet epSet = {.inUse = 0, .numOfEps = 1}; + epSet.port[0] = pMnode->replicas[pMnode->selfIndex].port; + memcpy(epSet.fqdn[0], pMnode->replicas[pMnode->selfIndex].fqdn, TSDB_FQDN_LEN); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransMsg)}; + mndSendMsgToDnode(pMnode, &epSet, &rpcMsg); + } + + taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); +} + static int32_t mndInitTimer(SMnode *pMnode) { if (pMnode->timer == NULL) { pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND"); @@ -60,11 +74,18 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } + if (taosTmrReset(mndTransReExecute, 1000, pMnode, pMnode->timer, &pMnode->transTimer)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; } static void mndCleanupTimer(SMnode *pMnode) { if (pMnode->timer != NULL) { + taosTmrStop(pMnode->transTimer); + pMnode->transTimer = NULL; taosTmrCleanUp(pMnode->timer); pMnode->timer = NULL; }