diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b8baf99552..abc752955d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -264,7 +264,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) #define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2) -#define TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4) +#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3) +#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4) // mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index ce2a0efd2f..1c238d4082 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -710,12 +710,12 @@ static bool mndIsStbTrans(STrans *pTrans) { return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END; } -static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { +static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { STrans *pTrans = NULL; void *pIter = NULL; - bool canParallel = true; + bool conflict = false; - if (mndIsBasicTrans(pNewTrans)) return canParallel; + if (mndIsBasicTrans(pNewTrans)) return conflict; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); @@ -724,7 +724,7 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { if (mndIsGlobalTrans(pNewTrans)) { if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - canParallel = false; + conflict = true; } else { } } @@ -732,11 +732,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { else if (mndIsDbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); - canParallel = false; + conflict = true; } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - canParallel = false; + conflict = true; } } else { } @@ -745,11 +745,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { else if (mndIsStbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); - canParallel = false; + conflict = true; } else if (mndIsDbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - canParallel = false; + conflict = true; } } else { } @@ -760,12 +760,12 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { sdbCancelFetch(pMnode->pSdb, pIter); sdbRelease(pMnode->pSdb, pTrans); - return canParallel; + return conflict; } int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { - if (!mndCheckTransCanParallel(pMnode, pTrans)) { - terrno = TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL; + if (mndCheckTransConflict(pMnode, pTrans)) { + terrno = TSDB_CODE_MND_TRANS_CONFLICT; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); return -1; } @@ -819,7 +819,8 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { } static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { - bool sendRsp = false; + bool sendRsp = false; + int32_t code = pTrans->code; if (pTrans->stage == TRN_STAGE_FINISHED) { sendRsp = true; @@ -829,12 +830,14 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { sendRsp = true; + if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; } } if (pTrans->policy == TRN_POLICY_RETRY) { if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) { sendRsp = true; + if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; } } @@ -845,13 +848,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } taosMemoryFree(pTrans->rpcRsp); - mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, + mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage, pTrans->rpcAHandle); SRpcMsg rspMsg = { .handle = pTrans->rpcHandle, .ahandle = pTrans->rpcAHandle, .refId = pTrans->rpcRefId, - .code = pTrans->code, + .code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, }; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index bd787d50b0..00fe8bd0e9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -271,7 +271,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL, "Conflicting transaction not completed") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") // mnode-mq TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")