fix/exit-fail-retry-when-follower

This commit is contained in:
dmchen 2024-12-03 09:53:46 +08:00
parent fb8a10e717
commit 8b1125e554
1 changed files with 43 additions and 31 deletions

View File

@ -52,10 +52,17 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool t
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) {
return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf;
static inline bool mndTransIsInSyncContext(bool topHalf) { return !topHalf; }
static bool mndCannotExecuteTrans(SMnode *pMnode, bool topHalf) {
bool isLeader = mndIsLeader(pMnode);
bool ret = (!pMnode->deploy && !isLeader) || mndTransIsInSyncContext(topHalf);
if (ret) mDebug("cannot execute trans action, deploy:%d, isLeader:%d, topHalf:%d", pMnode->deploy, isLeader, topHalf);
return ret;
}
static inline char *mndStrExecutionContext(bool topHalf) { return topHalf ? "transContext" : "syncContext"; }
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransTimer(SRpcMsg *pReq);
static int32_t mndProcessTtl(SRpcMsg *pReq);
@ -1327,7 +1334,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
// execute in trans context
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->msgSent) return 0;
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
if (mndCannotExecuteTrans(pMnode, topHalf)) {
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
}
@ -1473,8 +1480,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, topHalf(TransContext):%d", pTrans->id,
terrstr(), terrno, topHalf);
mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, in %s", pTrans->id, terrstr(), terrno,
mndStrExecutionContext(topHalf));
}
return code;
}
@ -1482,8 +1489,8 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool t
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute undoActions since %s. topHalf(TransContext):%d", pTrans->id, terrstr(),
topHalf);
mError("trans:%d, failed to execute undoActions since %s. in %s", pTrans->id, terrstr(),
mndStrExecutionContext(topHalf));
}
return code;
}
@ -1491,8 +1498,8 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool t
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute commitActions since %s. topHalf(TransContext):%d", pTrans->id, terrstr(),
topHalf);
mError("trans:%d, failed to execute commitActions since %s. in %s", pTrans->id, terrstr(),
mndStrExecutionContext(topHalf));
}
return code;
}
@ -1512,7 +1519,7 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pActions, action);
mInfo("trans:%d, current action:%d, stage:%s, actionType(0:log,1:msg):%d", pTrans->id, pTrans->actionPos,
mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d", pTrans->id, pTrans->actionPos,
mndTransStr(pAction->stage), pAction->actionType);
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
@ -1543,11 +1550,11 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
}
mndSetTransLastAction(pTrans, pAction);
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
if (mndCannotExecuteTrans(pMnode, topHalf)) {
pTrans->lastErrorNo = code;
pTrans->code = code;
mInfo("trans:%d, %s:%d, topHalf(TransContext):%d, not execute next action, code:%s", pTrans->id,
mndTransStr(pAction->stage), action, topHalf, tstrerror(code));
mInfo("trans:%d, %s:%d, cannot execute next action in %s, code:%s", pTrans->id, mndTransStr(pAction->stage),
action, mndStrExecutionContext(topHalf), tstrerror(code));
break;
}
@ -1648,21 +1655,25 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
}
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH && mndTransIsInSyncContext(topHalf)) {
pTrans->lastErrorNo = code;
pTrans->code = code;
bool continueExec = true;
if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
taosMsleep(100);
continueExec = true;
} else {
continueExec = false;
mInfo(
"trans:%d, failed to execute, will retry redo action stage in 100 ms , in %s, "
"continueExec:%d, code:%s",
pTrans->id, mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
taosMsleep(100);
return true;
} else {
if (mndCannotExecuteTrans(pMnode, topHalf)) {
mInfo("trans:%d, cannot continue to execute redo action stage in %s, continueExec:%d, code:%s", pTrans->id,
mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
return false;
}
mInfo("trans:%d, cannot execute redo action stage, topHalf(TransContext):%d, continueExec:%d, code:%s", pTrans->id,
topHalf, continueExec, tstrerror(code));
return continueExec;
}
// if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
terrno = code;
if (code == 0) {
@ -1704,9 +1715,9 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
return continueExec;
}
// in trans context
// execute in trans context
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true;
int32_t code = mndTransCommit(pMnode, pTrans);
@ -1760,7 +1771,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool
code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
}
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
terrno = code;
if (code == 0) {
@ -1781,7 +1792,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool
// in trans context
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true;
int32_t code = mndTransRollback(pMnode, pTrans);
@ -1798,8 +1809,9 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool to
return continueExec;
}
// excute in trans context
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true;
int32_t code = mndTransPreFinish(pMnode, pTrans);
@ -1842,8 +1854,8 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true;
while (continueExec) {
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " topHalf(TransContext):%d", pTrans->id,
mndTransStr(pTrans->stage), pTrans->createdTime, topHalf);
mInfo("trans:%d, continue to execute stage:%s in %s, createTime:%" PRId64 "", pTrans->id,
mndTransStr(pTrans->stage), mndStrExecutionContext(topHalf), pTrans->createdTime);
pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) {
case TRN_STAGE_PREPARE: