Merge pull request #26777 from taosdata/fix/TD-30989-mnode11-2

fix/TD-30989
This commit is contained in:
Hongze Cheng 2024-07-25 14:37:22 +08:00 committed by GitHub
commit 10531c0b19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 78 additions and 70 deletions

View File

@ -548,8 +548,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
pOld->id, pOld, mndTransStr(pOld->stage), pOld->createdTime, pNew, mndTransStr(pNew->stage),
pNew->createdTime);
// only occured while sync timeout
terrno = TSDB_CODE_MND_TRANS_SYNC_TIMEOUT;
return -1;
TAOS_RETURN(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT);
}
mndTransUpdateActions(pOld->prepareActions, pNew->prepareActions);
@ -667,8 +666,7 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
void *ptr = taosArrayPush(pArray, pAction);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
return 0;
@ -779,26 +777,29 @@ void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
return -1;
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mInfo("trans:%d, sync to other mnodes, stage:%s createTime:%" PRId64, pTrans->id, mndTransStr(pTrans->stage),
pTrans->createdTime);
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) {
mError("trans:%d, failed to sync, errno:%s code:0x%x createTime:%" PRId64 " saved trans:%d", pTrans->id, terrstr(),
code, pTrans->createdTime, pMnode->syncMgmt.transId);
mError("trans:%d, failed to sync, errno:%s code:0x%x createTime:%" PRId64 " saved trans:%d", pTrans->id,
tstrerror(code), code, pTrans->createdTime, pMnode->syncMgmt.transId);
sdbFreeRaw(pRaw);
return -1;
TAOS_RETURN(code);
}
sdbFreeRaw(pRaw);
mInfo("trans:%d, sync finished, createTime:%" PRId64, pTrans->id, pTrans->createdTime);
return 0;
TAOS_RETURN(code);
}
static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
@ -890,24 +891,26 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
}
int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
code = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
return -1;
TAOS_RETURN(code);
}
}
if (mndCheckTransConflict(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return terrno;
code = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
return 0;
TAOS_RETURN(code);
}
int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
void *pIter = NULL;
bool conflict = false;
SCompactObj *pCompact = NULL;
@ -934,12 +937,12 @@ int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) {
}
if (conflict) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT_COMPACT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return terrno;
code = TSDB_CODE_MND_TRANS_CONFLICT_COMPACT;
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
return 0;
TAOS_RETURN(code);
}
static bool mndTransActionsOfSameType(SArray *pActions) {
@ -960,66 +963,65 @@ static bool mndTransActionsOfSameType(SArray *pActions) {
}
static int32_t mndTransCheckParallelActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans->exec == TRN_EXEC_PARALLEL) {
if (mndTransActionsOfSameType(pTrans->redoActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of parallel redo actions are not the same", pTrans->id);
return -1;
TAOS_RETURN(code);
}
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (mndTransActionsOfSameType(pTrans->undoActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of parallel undo actions are not the same", pTrans->id);
return -1;
TAOS_RETURN(code);
}
}
}
return 0;
TAOS_RETURN(code);
}
static int32_t mndTransCheckCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
code = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, commit actions of non-changeless trans are empty", pTrans->id);
return -1;
TAOS_RETURN(code);
}
if (mndTransActionsOfSameType(pTrans->commitActions) == false) {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
code = TSDB_CODE_MND_TRANS_INVALID_STAGE;
mError("trans:%d, types of commit actions are not the same", pTrans->id);
return -1;
TAOS_RETURN(code);
}
return 0;
TAOS_RETURN(code);
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (pTrans == NULL) return -1;
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
return -1;
}
TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans));
if (mndTransCheckParallelActions(pMnode, pTrans) != 0) {
return -1;
}
TAOS_CHECK_RETURN(mndTransCheckParallelActions(pMnode, pTrans));
if (mndTransCheckCommitActions(pMnode, pTrans) != 0) {
return -1;
}
TAOS_CHECK_RETURN(mndTransCheckCommitActions(pMnode, pTrans));
mInfo("trans:%d, prepare transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, prepare finished", pTrans->id);
STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
if (pNew == NULL) {
mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr());
return -1;
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("trans:%d, failed to read from sdb since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
pNew->pRpcArray = pTrans->pRpcArray;
@ -1032,37 +1034,41 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
mndTransExecute(pMnode, pNew);
mndReleaseTrans(pMnode, pNew);
// TDOD change to TAOS_RETURN(code);
return 0;
}
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
mInfo("trans:%d, commit transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, commit finished", pTrans->id);
return 0;
TAOS_RETURN(code);
}
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
mInfo("trans:%d, rollback transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to rollback since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, rollback finished", pTrans->id);
return 0;
TAOS_RETURN(code);
}
static int32_t mndTransPreFinish(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
mInfo("trans:%d, pre-finish transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to pre-finish since %s", pTrans->id, terrstr());
return -1;
if ((code = mndTransSync(pMnode, pTrans)) != 0) {
mError("trans:%d, failed to pre-finish since %s", pTrans->id, tstrerror(code));
TAOS_RETURN(code);
}
mInfo("trans:%d, pre-finish finished", pTrans->id);
return 0;
TAOS_RETURN(code);
}
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
@ -1168,6 +1174,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
int32_t code = 0;
SMnode *pMnode = pRsp->info.node;
int64_t signature = (int64_t)(pRsp->info.ahandle);
int32_t transId = (int32_t)(signature >> 32);
@ -1175,7 +1182,9 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans == NULL) {
mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr());
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
mError("trans:%d, failed to get transId from vnode rsp since %s", transId, tstrerror(code));
goto _OVER;
}
@ -1216,7 +1225,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
_OVER:
mndReleaseTrans(pMnode, pTrans);
return 0;
TAOS_RETURN(code);
}
static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
@ -1252,8 +1261,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->rawWritten) return 0;
if (topHalf) {
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
}
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
@ -1272,15 +1280,14 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
mndSetTransLastAction(pTrans, pAction);
}
return code;
TAOS_RETURN(code);
}
// execute at top half
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->msgSent) return 0;
if (mndCannotExecuteTransAction(pMnode, topHalf)) {
terrno = TSDB_CODE_MND_TRANS_CTX_SWITCH;
return TSDB_CODE_MND_TRANS_CTX_SWITCH;
TAOS_RETURN(TSDB_CODE_MND_TRANS_CTX_SWITCH);
}
int64_t signature = pTrans->id;
@ -1324,7 +1331,7 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
mndSetTransLastAction(pTrans, pAction);
}
return code;
TAOS_RETURN(code);
}
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
@ -1822,8 +1829,7 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
} else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
pArray = pTrans->undoActions;
} else {
terrno = TSDB_CODE_MND_TRANS_INVALID_STAGE;
return -1;
TAOS_RETURN(TSDB_CODE_MND_TRANS_INVALID_STAGE);
}
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
@ -1846,17 +1852,19 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
STrans *pTrans = NULL;
if (tDeserializeSKillTransReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
code = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
mInfo("trans:%d, start to kill", killReq.transId);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) {
if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS)) != 0) {
goto _OVER;
}
pTrans = mndAcquireTrans(pMnode, killReq.transId);
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
@ -1868,7 +1876,7 @@ _OVER:
}
mndReleaseTrans(pMnode, pTrans);
return code;
TAOS_RETURN(code);
}
static int32_t mndCompareTransId(int32_t *pTransId1, int32_t *pTransId2) { return *pTransId1 >= *pTransId2 ? 1 : 0; }