fix/exit-fail-retry-when-follower

This commit is contained in:
dmchen 2024-12-03 09:53:46 +08:00
parent 4978c0aa59
commit a7a25b93fd
1 changed files with 43 additions and 31 deletions

View File

@ -52,10 +52,17 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool t
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf); static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) { static inline bool mndTransIsInSyncContext(bool topHalf) { return !topHalf; }
return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf;
static bool mndCannotExecuteTrans(SMnode *pMnode, bool topHalf) {
bool isLeader = mndIsLeader(pMnode);
bool ret = (!pMnode->deploy && !isLeader) || mndTransIsInSyncContext(topHalf);
if (ret) mDebug("cannot execute trans action, deploy:%d, isLeader:%d, topHalf:%d", pMnode->deploy, isLeader, topHalf);
return ret;
} }
static inline char *mndStrExecutionContext(bool topHalf) { return topHalf ? "transContext" : "syncContext"; }
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransTimer(SRpcMsg *pReq); static int32_t mndProcessTransTimer(SRpcMsg *pReq);
static int32_t mndProcessTtl(SRpcMsg *pReq); static int32_t mndProcessTtl(SRpcMsg *pReq);
@ -1339,7 +1346,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
// execute in trans context // execute in trans context
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) { static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->msgSent) return 0; if (pAction->msgSent) return 0;
if (mndCannotExecuteTransAction(pMnode, topHalf)) { if (mndCannotExecuteTrans(pMnode, topHalf)) {
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH); TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
} }
@ -1485,8 +1492,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, topHalf(TransContext):%d", pTrans->id, mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, in %s", pTrans->id, terrstr(), terrno,
terrstr(), terrno, topHalf); mndStrExecutionContext(topHalf));
} }
return code; return code;
} }
@ -1494,8 +1501,8 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool t
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute undoActions since %s. topHalf(TransContext):%d", pTrans->id, terrstr(), mError("trans:%d, failed to execute undoActions since %s. in %s", pTrans->id, terrstr(),
topHalf); mndStrExecutionContext(topHalf));
} }
return code; return code;
} }
@ -1503,8 +1510,8 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool t
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) { static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute commitActions since %s. topHalf(TransContext):%d", pTrans->id, terrstr(), mError("trans:%d, failed to execute commitActions since %s. in %s", pTrans->id, terrstr(),
topHalf); mndStrExecutionContext(topHalf));
} }
return code; return code;
} }
@ -1524,7 +1531,7 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) { for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pActions, action); STransAction *pAction = taosArrayGet(pActions, action);
mInfo("trans:%d, current action:%d, stage:%s, actionType(0:log,1:msg):%d", pTrans->id, pTrans->actionPos, mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d", pTrans->id, pTrans->actionPos,
mndTransStr(pAction->stage), pAction->actionType); mndTransStr(pAction->stage), pAction->actionType);
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
@ -1555,11 +1562,11 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
} }
mndSetTransLastAction(pTrans, pAction); mndSetTransLastAction(pTrans, pAction);
if (mndCannotExecuteTransAction(pMnode, topHalf)) { if (mndCannotExecuteTrans(pMnode, topHalf)) {
pTrans->lastErrorNo = code; pTrans->lastErrorNo = code;
pTrans->code = code; pTrans->code = code;
mInfo("trans:%d, %s:%d, topHalf(TransContext):%d, not execute next action, code:%s", pTrans->id, mInfo("trans:%d, %s:%d, cannot execute next action in %s, code:%s", pTrans->id, mndTransStr(pAction->stage),
mndTransStr(pAction->stage), action, topHalf, tstrerror(code)); action, mndStrExecutionContext(topHalf), tstrerror(code));
break; break;
} }
@ -1660,21 +1667,25 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf); code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
} }
if (mndCannotExecuteTransAction(pMnode, topHalf)) { if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH && mndTransIsInSyncContext(topHalf)) {
pTrans->lastErrorNo = code; pTrans->lastErrorNo = code;
pTrans->code = code; pTrans->code = code;
bool continueExec = true; mInfo(
if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) { "trans:%d, failed to execute, will retry redo action stage in 100 ms , in %s, "
taosMsleep(100); "continueExec:%d, code:%s",
continueExec = true; pTrans->id, mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
} else { taosMsleep(100);
continueExec = false; return true;
} else {
if (mndCannotExecuteTrans(pMnode, topHalf)) {
mInfo("trans:%d, cannot continue to execute redo action stage in %s, continueExec:%d, code:%s", pTrans->id,
mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
return false;
} }
mInfo("trans:%d, cannot execute redo action stage, topHalf(TransContext):%d, continueExec:%d, code:%s", pTrans->id,
topHalf, continueExec, tstrerror(code));
return continueExec;
} }
// if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
terrno = code; terrno = code;
if (code == 0) { if (code == 0) {
@ -1716,9 +1727,9 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
return continueExec; return continueExec;
} }
// in trans context // execute in trans context
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransCommit(pMnode, pTrans); int32_t code = mndTransCommit(pMnode, pTrans);
@ -1772,7 +1783,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool
code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
} }
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
terrno = code; terrno = code;
if (code == 0) { if (code == 0) {
@ -1793,7 +1804,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool
// in trans context // in trans context
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransRollback(pMnode, pTrans); int32_t code = mndTransRollback(pMnode, pTrans);
@ -1810,8 +1821,9 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool to
return continueExec; return continueExec;
} }
// excute in trans context
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransPreFinish(pMnode, pTrans); int32_t code = mndTransPreFinish(pMnode, pTrans);
@ -1854,8 +1866,8 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
while (continueExec) { while (continueExec) {
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " topHalf(TransContext):%d", pTrans->id, mInfo("trans:%d, continue to execute stage:%s in %s, createTime:%" PRId64 "", pTrans->id,
mndTransStr(pTrans->stage), pTrans->createdTime, topHalf); mndTransStr(pTrans->stage), mndStrExecutionContext(topHalf), pTrans->createdTime);
pTrans->lastExecTime = taosGetTimestampMs(); pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) { switch (pTrans->stage) {
case TRN_STAGE_PREPARE: case TRN_STAGE_PREPARE: