diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 04544da80e..1bd39a2299 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -97,7 +97,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans); SSdbRow *mndTransDecode(SSdbRaw *pRaw); void mndTransDropData(STrans *pTrans); -bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf); #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 585d8ea8e7..1ba6c51d4c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -208,15 +208,11 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { } if (pTrans->stage == TRN_STAGE_PREPARE) { - bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans); + bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false); if (!continueExec) goto _OUT; } - if (pTrans->id != pMgmt->transId) { - mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", - pTrans->id, pTrans->createdTime, pMgmt->transId); - mndTransRefresh(pMnode, pTrans); - } + mndTransRefresh(pMnode, pTrans); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7749decf91..49b483114a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -36,21 +36,25 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); -static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans); -static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); } + +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf); +static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf); +static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf); + +static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) { + return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf; +} static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static int32_t mndProcessTransTimer(SRpcMsg *pReq); @@ -1090,8 +1094,9 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) } } -static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->rawWritten) return 0; + if (topHalf) return -1; int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { @@ -1112,9 +1117,9 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi return code; } -static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - if (pAction->msgSent) return 0; - if (mndCannotExecuteTransAction(pMnode)) return -1; +static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { + if (pAction->msgSent) return 0; + if (mndCannotExecuteTransAction(pMnode, topHalf)) return -1; int64_t signature = pTrans->id; signature = (signature << 32); @@ -1159,7 +1164,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio return code; } -static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { + if (!topHalf) return -1; pAction->rawWritten = 0; pAction->errCode = 0; mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id); @@ -1168,34 +1174,34 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction return 0; } -static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { +static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { if (pAction->actionType == TRANS_ACTION_RAW) { - return mndTransWriteSingleLog(pMnode, pTrans, pAction); + return mndTransWriteSingleLog(pMnode, pTrans, pAction, topHalf); } else if (pAction->actionType == TRANS_ACTION_MSG) { - return mndTransSendSingleMsg(pMnode, pTrans, pAction); + return mndTransSendSingleMsg(pMnode, pTrans, pAction, topHalf); } else { - return mndTransExecNullMsg(pMnode, pTrans, pAction); + return mndTransExecNullMsg(pMnode, pTrans, pAction, topHalf); } } -static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) { int32_t numOfActions = taosArrayGetSize(pArray); int32_t code = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code != 0) break; } return code; } -static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) { int32_t numOfActions = taosArrayGetSize(pArray); if (numOfActions == 0) return 0; - if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) { + if (mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf) != 0) { return -1; } @@ -1248,31 +1254,31 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA } } -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); } return code; } -static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute undoActions since %s", terrstr()); } return code; } -static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute commitActions since %s", terrstr()); } return code; } -static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) { +static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { int32_t code = 0; int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); if (numOfActions == 0) return code; @@ -1289,7 +1295,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { if (pAction->msgSent) { if (pAction->msgReceived) { @@ -1317,14 +1323,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) } mndSetTransLastAction(pTrans, pAction); - if (mndCannotExecuteTransAction(pMnode)) break; + if (mndCannotExecuteTransAction(pMnode, topHalf)) break; if (code == 0) { pTrans->code = 0; pTrans->redoActionPos++; mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), pAction->id); + taosThreadMutexUnlock(&pTrans->mutex); code = mndTransSync(pMnode, pTrans); + taosThreadMutexLock(&pTrans->mutex); if (code != 0) { pTrans->redoActionPos--; pTrans->code = terrno; @@ -1357,7 +1365,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) return code; } -bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { +bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; @@ -1368,7 +1376,7 @@ bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); - code = mndTransExecSingleAction(pMnode, pTrans, pAction); + code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code != 0) { mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions); return false; @@ -1381,17 +1389,17 @@ _OVER: return continueExec; } -static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; int32_t code = 0; if (pTrans->exec == TRN_EXEC_SERIAL) { - code = mndTransExecuteRedoActionsSerial(pMnode, pTrans); + code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf); } else { - code = mndTransExecuteRedoActions(pMnode, pTrans); + code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf); } - if (mndCannotExecuteTransAction(pMnode)) return false; + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; terrno = code; if (code == 0) { @@ -1431,8 +1439,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransCommit(pMnode, pTrans); @@ -1452,9 +1460,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteCommitActions(pMnode, pTrans); + int32_t code = mndTransExecuteCommitActions(pMnode, pTrans, topHalf); if (code == 0) { pTrans->code = 0; @@ -1471,9 +1479,9 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); + int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); if (code == 0) { pTrans->stage = TRN_STAGE_PRE_FINISH; @@ -1491,8 +1499,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransRollback(pMnode, pTrans); @@ -1510,8 +1518,8 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { - if (mndCannotExecuteTransAction(pMnode)) return false; +static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; bool continueExec = true; int32_t code = mndTransPreFinish(pMnode, pTrans); @@ -1529,8 +1537,9 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = false; + if (topHalf) return continueExec; SSdbRaw *pRaw = mndTransEncode(pTrans); if (pRaw == NULL) { @@ -1558,43 +1567,28 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) { pTrans->lastExecTime = taosGetTimestampMs(); switch (pTrans->stage) { case TRN_STAGE_PREPARE: - continueExec = mndTransPerformPrepareStage(pMnode, pTrans); + continueExec = mndTransPerformPrepareStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_REDO_ACTION: - continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); + continueExec = mndTransPerformRedoActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_COMMIT: - if (topHalf) { - continueExec = mndTransPerformCommitStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not commit since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformCommitStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_COMMIT_ACTION: - continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); + continueExec = mndTransPerformCommitActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_ROLLBACK: - if (topHalf) { - continueExec = mndTransPerformRollbackStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not rollback since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformRollbackStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_UNDO_ACTION: - continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); + continueExec = mndTransPerformUndoActionStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_PRE_FINISH: - if (topHalf) { - continueExec = mndTransPerformPreFinishStage(pMnode, pTrans); - } else { - mInfo("trans:%d, can not pre-finish since not leader", pTrans->id); - continueExec = false; - } + continueExec = mndTransPerformPreFinishStage(pMnode, pTrans, topHalf); break; case TRN_STAGE_FINISH: - continueExec = mndTransPerformFinishStage(pMnode, pTrans); + continueExec = mndTransPerformFinishStage(pMnode, pTrans, topHalf); break; default: continueExec = false;