refactor: fix: reset confict trans types
This commit is contained in:
parent
7eede87869
commit
5b2b4da11b
|
@ -264,7 +264,8 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
||||
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
|
||||
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
|
||||
#define TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4)
|
||||
#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3)
|
||||
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4)
|
||||
|
||||
// mnode-mq
|
||||
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
|
||||
|
|
|
@ -710,12 +710,12 @@ static bool mndIsStbTrans(STrans *pTrans) {
|
|||
return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
|
||||
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
|
||||
STrans *pTrans = NULL;
|
||||
void *pIter = NULL;
|
||||
bool canParallel = true;
|
||||
bool conflict = false;
|
||||
|
||||
if (mndIsBasicTrans(pNewTrans)) return canParallel;
|
||||
if (mndIsBasicTrans(pNewTrans)) return conflict;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
|
||||
|
@ -724,7 +724,7 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
|
|||
if (mndIsGlobalTrans(pNewTrans)) {
|
||||
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
canParallel = false;
|
||||
conflict = true;
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
@ -732,11 +732,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
|
|||
else if (mndIsDbTrans(pNewTrans)) {
|
||||
if (mndIsGlobalTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
|
||||
canParallel = false;
|
||||
conflict = true;
|
||||
} else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
|
||||
if (pNewTrans->dbUid == pTrans->dbUid) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
canParallel = false;
|
||||
conflict = true;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
@ -745,11 +745,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
|
|||
else if (mndIsStbTrans(pNewTrans)) {
|
||||
if (mndIsGlobalTrans(pTrans)) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
|
||||
canParallel = false;
|
||||
conflict = true;
|
||||
} else if (mndIsDbTrans(pTrans)) {
|
||||
if (pNewTrans->dbUid == pTrans->dbUid) {
|
||||
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
|
||||
canParallel = false;
|
||||
conflict = true;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
@ -760,12 +760,12 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
|
|||
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
sdbRelease(pMnode->pSdb, pTrans);
|
||||
return canParallel;
|
||||
return conflict;
|
||||
}
|
||||
|
||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||
if (!mndCheckTransCanParallel(pMnode, pTrans)) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL;
|
||||
if (mndCheckTransConflict(pMnode, pTrans)) {
|
||||
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -819,7 +819,8 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
||||
bool sendRsp = false;
|
||||
bool sendRsp = false;
|
||||
int32_t code = pTrans->code;
|
||||
|
||||
if (pTrans->stage == TRN_STAGE_FINISHED) {
|
||||
sendRsp = true;
|
||||
|
@ -829,12 +830,14 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|||
if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
|
||||
pTrans->stage == TRN_STAGE_ROLLBACK) {
|
||||
sendRsp = true;
|
||||
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTrans->policy == TRN_POLICY_RETRY) {
|
||||
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) {
|
||||
sendRsp = true;
|
||||
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -845,13 +848,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
taosMemoryFree(pTrans->rpcRsp);
|
||||
|
||||
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
|
||||
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage,
|
||||
pTrans->rpcAHandle);
|
||||
SRpcMsg rspMsg = {
|
||||
.handle = pTrans->rpcHandle,
|
||||
.ahandle = pTrans->rpcAHandle,
|
||||
.refId = pTrans->rpcRefId,
|
||||
.code = pTrans->code,
|
||||
.code = code,
|
||||
.pCont = rpcCont,
|
||||
.contLen = pTrans->rpcRspLen,
|
||||
};
|
||||
|
|
|
@ -271,7 +271,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL, "Conflicting transaction not completed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
|
||||
|
||||
// mnode-mq
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
|
||||
|
|
Loading…
Reference in New Issue