Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-29367-3.0

This commit is contained in:
Hongze Cheng 2024-12-10 08:41:26 +08:00
commit 4ba185fbfe
3 changed files with 46 additions and 32 deletions

View File

@ -52,10 +52,17 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool t
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) {
return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf;
static inline bool mndTransIsInSyncContext(bool topHalf) { return !topHalf; }
static bool mndCannotExecuteTrans(SMnode *pMnode, bool topHalf) {
bool isLeader = mndIsLeader(pMnode);
bool ret = (!pMnode->deploy && !isLeader) || mndTransIsInSyncContext(topHalf);
if (ret) mDebug("cannot execute trans action, deploy:%d, isLeader:%d, topHalf:%d", pMnode->deploy, isLeader, topHalf);
return ret;
}
static inline char *mndStrExecutionContext(bool topHalf) { return topHalf ? "transContext" : "syncContext"; }
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransTimer(SRpcMsg *pReq);
static int32_t mndProcessTtl(SRpcMsg *pReq);
@ -1335,7 +1342,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
// execute in trans context
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->msgSent) return 0;
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
if (mndCannotExecuteTrans(pMnode, topHalf)) {
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
}
@ -1481,8 +1488,8 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, topHalf(TransContext):%d", pTrans->id,
terrstr(), terrno, topHalf);
mError("trans:%d, failed to execute redoActions since:%s, code:0x%x, in %s", pTrans->id, terrstr(), terrno,
mndStrExecutionContext(topHalf));
}
return code;
}
@ -1490,8 +1497,8 @@ static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool t
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute undoActions since %s. topHalf(TransContext):%d", pTrans->id, terrstr(),
topHalf);
mError("trans:%d, failed to execute undoActions since %s. in %s", pTrans->id, terrstr(),
mndStrExecutionContext(topHalf));
}
return code;
}
@ -1499,8 +1506,8 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool t
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mError("trans:%d, failed to execute commitActions since %s. topHalf(TransContext):%d", pTrans->id, terrstr(),
topHalf);
mError("trans:%d, failed to execute commitActions since %s. in %s", pTrans->id, terrstr(),
mndStrExecutionContext(topHalf));
}
return code;
}
@ -1520,7 +1527,7 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pActions, action);
mInfo("trans:%d, current action:%d, stage:%s, actionType(0:log,1:msg):%d", pTrans->id, pTrans->actionPos,
mInfo("trans:%d, current action:%d, stage:%s, actionType(1:msg,2:log):%d", pTrans->id, pTrans->actionPos,
mndTransStr(pAction->stage), pAction->actionType);
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
@ -1551,11 +1558,11 @@ static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArr
}
mndSetTransLastAction(pTrans, pAction);
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
if (mndCannotExecuteTrans(pMnode, topHalf)) {
pTrans->lastErrorNo = code;
pTrans->code = code;
mInfo("trans:%d, %s:%d, topHalf(TransContext):%d, not execute next action, code:%s", pTrans->id,
mndTransStr(pAction->stage), action, topHalf, tstrerror(code));
mInfo("trans:%d, %s:%d, cannot execute next action in %s, code:%s", pTrans->id, mndTransStr(pAction->stage),
action, mndStrExecutionContext(topHalf), tstrerror(code));
break;
}
@ -1656,20 +1663,21 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
}
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH && mndTransIsInSyncContext(topHalf)) {
pTrans->lastErrorNo = code;
pTrans->code = code;
bool continueExec = true;
if (code != 0 && code != TSDB_CODE_MND_TRANS_CTX_SWITCH) {
taosMsleep(100);
continueExec = true;
} else {
continueExec = false;
mInfo(
"trans:%d, failed to execute, will retry redo action stage in 100 ms , in %s, "
"continueExec:%d, code:%s",
pTrans->id, mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
taosMsleep(100);
return true;
} else {
if (mndCannotExecuteTrans(pMnode, topHalf)) {
mInfo("trans:%d, cannot continue to execute redo action stage in %s, continueExec:%d, code:%s", pTrans->id,
mndStrExecutionContext(topHalf), continueExec, tstrerror(code));
return false;
}
mInfo("trans:%d, cannot execute redo action stage, topHalf(TransContext):%d, continueExec:%d, code:%s", pTrans->id,
topHalf, continueExec, tstrerror(code));
return continueExec;
}
terrno = code;
@ -1712,9 +1720,9 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool
return continueExec;
}
// in trans context
// execute in trans context
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true;
int32_t code = mndTransCommit(pMnode, pTrans);
@ -1768,7 +1776,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool
code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
}
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
terrno = code;
if (code == 0) {
@ -1789,7 +1797,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool
// in trans context
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true;
int32_t code = mndTransRollback(pMnode, pTrans);
@ -1806,8 +1814,9 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool to
return continueExec;
}
// excute in trans context
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
if (mndCannotExecuteTrans(pMnode, topHalf)) return false;
bool continueExec = true;
int32_t code = mndTransPreFinish(pMnode, pTrans);
@ -1850,8 +1859,8 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true;
while (continueExec) {
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " topHalf(TransContext):%d", pTrans->id,
mndTransStr(pTrans->stage), pTrans->createdTime, topHalf);
mInfo("trans:%d, continue to execute stage:%s in %s, createTime:%" PRId64 "", pTrans->id,
mndTransStr(pTrans->stage), mndStrExecutionContext(topHalf), pTrans->createdTime);
pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) {
case TRN_STAGE_PREPARE:

View File

@ -977,6 +977,11 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
extern int32_t vnodeAsyncRetention(SVnode *pVnode, int64_t now);
static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) {
if (!pVnode->restored) {
vInfo("vgId:%d, ignore trim req during restoring. ver:%" PRId64, TD_VID(pVnode), ver);
return 0;
}
int32_t code = 0;
SVTrimDbReq trimReq = {0};

View File

@ -1464,7 +1464,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
// if (endIndex != lastVer + 1) return TSDB_CODE_SYN_INTERNAL_ERROR;
pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE &&
(code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) {