diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b49eb916f0..c4d61157b6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -355,6 +355,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5) #define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal #define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7) +#define TSDB_CODE_MND_TRANS_CTX_SWITCH TAOS_DEF_ERROR_CODE(0, 0x03D8) #define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF) // mnode-mq diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index d6de406987..0fb246e945 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -108,11 +108,11 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 04544da80e..1bd39a2299 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -97,7 +97,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans); SSdbRow *mndTransDecode(SSdbRaw *pRaw); void mndTransDropData(STrans *pTrans); -bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf); #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndCompactDetail.c b/source/dnode/mnode/impl/src/mndCompactDetail.c index a1c0e95c20..6b1dd78093 100644 --- a/source/dnode/mnode/impl/src/mndCompactDetail.c +++ b/source/dnode/mnode/impl/src/mndCompactDetail.c @@ -290,11 +290,11 @@ int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* p SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail); if (pVgRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { sdbFreeRaw(pVgRaw); return -1; } (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 44ee804d22..01d4d1029c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1215,7 +1215,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) return -1; } - if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { + if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { sdbRelease(pSdb, pSub); sdbCancelFetch(pSdb, pIter); return -1; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f46f33ac22..0fc8dad420 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -180,7 +180,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { goto _OUT; } - mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 + mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 " role:%s raw:%p sec:%d seq:%" PRId64, transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), pRaw, pMgmt->transSec, pMgmt->transSeq); @@ -208,15 +208,11 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { } if (pTrans->stage == TRN_STAGE_PREPARE) { - bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans); + bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false); if (!continueExec) goto _OUT; } - if (pTrans->id != pMgmt->transId) { - mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", - pTrans->id, pTrans->createdTime, pMgmt->transId); - mndTransRefresh(pMnode, pTrans); - } + mndTransRefresh(pMnode, pTrans); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); @@ -234,6 +230,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { goto _OUT; } + int32_t transId = pMgmt->transId; pMgmt->transId = 0; pMgmt->transSec = 0; pMgmt->transSeq = 0; @@ -241,9 +238,9 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { tsem_post(&pMgmt->syncSem); if (pMgmt->errCode != 0) { - mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode)); + mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); } else { - mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq); + mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq); } _OUT: @@ -542,7 +539,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { taosThreadMutexLock(&pMgmt->lock); pMgmt->errCode = 0; - if (pMgmt->transId != 0 /* && pMgmt->transId != transId*/) { + if (pMgmt->transId != 0) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); rpcFreeCont(req.pCont); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7749decf91..9e478f3aa5 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -36,21 +36,25 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); -static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans); -static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); } + +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf); +static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf); + +static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) { + return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf; +} static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static int32_t mndProcessTransTimer(SRpcMsg *pReq); @@ -1090,8 +1094,9 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) } } -static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->rawWritten) return 0; + if (topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH; int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { @@ -1112,9 +1117,9 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi return code; } -static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - if (pAction->msgSent) return 0; - if (mndCannotExecuteTransAction(pMnode)) return -1; +static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { + if (pAction->msgSent) return 0; + if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH; int64_t signature = pTrans->id; signature = (signature << 32); @@ -1159,7 +1164,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio return code; } -static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { + if (!topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH; pAction->rawWritten = 0; pAction->errCode = 0; mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id); @@ -1168,34 +1174,39 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction return 0; } -static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->actionType == TRANS_ACTION_RAW) { - return mndTransWriteSingleLog(pMnode, pTrans, pAction); + return mndTransWriteSingleLog(pMnode, pTrans, pAction, topHalf); } else if (pAction->actionType == TRANS_ACTION_MSG) { - return mndTransSendSingleMsg(pMnode, pTrans, pAction); + return mndTransSendSingleMsg(pMnode, pTrans, pAction, topHalf); } else { - return mndTransExecNullMsg(pMnode, pTrans, pAction); + return mndTransExecNullMsg(pMnode, pTrans, pAction, topHalf); } } -static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) { int32_t numOfActions = taosArrayGetSize(pArray); int32_t code = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); - if (code != 0) break; + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); + if (code != 0) { + mInfo("trans:%d, action:%d not executed since %s. numOfActions:%d", pTrans->id, action, tstrerror(code), + numOfActions); + break; + } } return code; } -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) { int32_t numOfActions = taosArrayGetSize(pArray); + int32_t code = 0; if (numOfActions == 0) return 0; - if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) { + if ((code = mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf)) != 0) { return -1; } @@ -1248,31 +1259,31 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA } } -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); } return code; } -static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute undoActions since %s", terrstr()); } return code; } -static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute commitActions since %s", terrstr()); } return code; } -static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) { +static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { int32_t code = 0; int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); if (numOfActions == 0) return code; @@ -1289,7 +1300,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { if (pAction->msgSent) { if (pAction->msgReceived) { @@ -1317,14 +1328,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } mndSetTransLastAction(pTrans, pAction); - if (mndCannotExecuteTransAction(pMnode)) break; + if (mndCannotExecuteTransAction(pMnode, topHalf)) break; if (code == 0) { pTrans->code = 0; pTrans->redoActionPos++; mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), pAction->id); + taosThreadMutexUnlock(&pTrans->mutex); code = mndTransSync(pMnode, pTrans); + taosThreadMutexLock(&pTrans->mutex); if (code != 0) { pTrans->redoActionPos--; pTrans->code = terrno; @@ -1357,7 +1370,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) return code; } -bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; @@ -1368,7 +1381,7 @@ bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code != 0) { mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions); return false; @@ -1381,17 +1394,17 @@ _OVER: return continueExec; } -static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; if (pTrans->exec == TRN_EXEC_SERIAL) { - code = mndTransExecuteRedoActionsSerial(pMnode, pTrans); + code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf); } else { - code = mndTransExecuteRedoActions(pMnode, pTrans); + code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf); } - if (mndCannotExecuteTransAction(pMnode)) return false; + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; terrno = code; if (code == 0) { @@ -1431,8 +1444,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransCommit(pMnode, pTrans); @@ -1452,9 +1465,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteCommitActions(pMnode, pTrans); + int32_t code = mndTransExecuteCommitActions(pMnode, pTrans, topHalf); if (code == 0) { pTrans->code = 0; @@ -1471,9 +1484,9 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); + int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); if (code == 0) { pTrans->stage = TRN_STAGE_PRE_FINISH; @@ -1491,8 +1504,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransRollback(pMnode, pTrans); @@ -1510,8 +1523,8 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransPreFinish(pMnode, pTrans); @@ -1529,8 +1542,9 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = false; + if (topHalf) return continueExec; SSdbRaw *pRaw = mndTransEncode(pTrans); if (pRaw == NULL) { @@ -1558,43 +1572,28 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) { pTrans->lastExecTime = taosGetTimestampMs(); switch (pTrans->stage) { case TRN_STAGE_PREPARE: - continueExec = mndTransPerformPrepareStage(pMnode, pTrans); + continueExec = mndTransPerformPrepareStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_REDO_ACTION: - continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); + continueExec = mndTransPerformRedoActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_COMMIT: - if (topHalf) { - continueExec = mndTransPerformCommitStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not commit since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformCommitStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_COMMIT_ACTION: - continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); + continueExec = mndTransPerformCommitActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_ROLLBACK: - if (topHalf) { - continueExec = mndTransPerformRollbackStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not rollback since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformRollbackStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_UNDO_ACTION: - continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); + continueExec = mndTransPerformUndoActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_PRE_FINISH: - if (topHalf) { - continueExec = mndTransPerformPreFinishStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not pre-finish since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformPreFinishStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_FINISH: - continueExec = mndTransPerformFinishStage(pMnode, pTrans); + continueExec = mndTransPerformFinishStage(pMnode, pTrans, topHalf); break; default: continueExec = false; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6a32fed147..c294270e4a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -286,6 +286,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background") TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CTX_SWITCH, "Transaction context switch") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") // mnode-mq