fix: reset confict trans types

This commit is contained in:
Shengliang Guan 2022-05-04 16:16:35 +08:00
parent 1dd08f30e6
commit 7eede87869
3 changed files with 36 additions and 28 deletions

View File

@ -72,6 +72,7 @@ typedef enum {
TRN_TYPE_DROP_USER = 1003, TRN_TYPE_DROP_USER = 1003,
TRN_TYPE_CREATE_FUNC = 1004, TRN_TYPE_CREATE_FUNC = 1004,
TRN_TYPE_DROP_FUNC = 1005, TRN_TYPE_DROP_FUNC = 1005,
TRN_TYPE_CREATE_SNODE = 1006, TRN_TYPE_CREATE_SNODE = 1006,
TRN_TYPE_DROP_SNODE = 1007, TRN_TYPE_DROP_SNODE = 1007,
TRN_TYPE_CREATE_QNODE = 1008, TRN_TYPE_CREATE_QNODE = 1008,

View File

@ -63,13 +63,15 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter);
int32_t mndInitTrans(SMnode *pMnode) { int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS, SSdbTable table = {
.sdbType = SDB_TRANS,
.keyType = SDB_KEY_INT32, .keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndTransActionEncode, .encodeFp = (SdbEncodeFp)mndTransActionEncode,
.decodeFp = (SdbDecodeFp)mndTransActionDecode, .decodeFp = (SdbDecodeFp)mndTransActionDecode,
.insertFp = (SdbInsertFp)mndTransActionInsert, .insertFp = (SdbInsertFp)mndTransActionInsert,
.updateFp = (SdbUpdateFp)mndTransActionUpdate, .updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete}; .deleteFp = (SdbDeleteFp)mndTransActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq); mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
@ -411,6 +413,16 @@ static const char *mndTransType(ETrnType type) {
return "subscribe"; return "subscribe";
case TRN_TYPE_REBALANCE: case TRN_TYPE_REBALANCE:
return "rebalance"; return "rebalance";
case TRN_TYPE_COMMIT_OFFSET:
return "commit-offset";
case TRN_TYPE_CREATE_STREAM:
return "create-stream";
case TRN_TYPE_DROP_STREAM:
return "drop-stream";
case TRN_TYPE_CONSUMER_LOST:
return "consumer-lost";
case TRN_TYPE_CONSUMER_RECOVER:
return "consumer-recover";
case TRN_TYPE_CREATE_DNODE: case TRN_TYPE_CREATE_DNODE:
return "create-qnode"; return "create-qnode";
case TRN_TYPE_DROP_DNODE: case TRN_TYPE_DROP_DNODE:
@ -713,41 +725,34 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
canParallel = false; canParallel = false;
break; } else {
} }
} }
if (mndIsDbTrans(pNewTrans)) { else if (mndIsDbTrans(pNewTrans)) {
if (mndIsBasicTrans(pTrans)) continue;
if (mndIsGlobalTrans(pTrans)) { if (mndIsGlobalTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
canParallel = false; canParallel = false;
break; } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
}
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) { if (pNewTrans->dbUid == pTrans->dbUid) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
canParallel = false; canParallel = false;
break;
} }
} else {
} }
} }
if (mndIsStbTrans(pNewTrans)) { else if (mndIsStbTrans(pNewTrans)) {
if (mndIsBasicTrans(pTrans)) continue;
if (mndIsGlobalTrans(pTrans)) { if (mndIsGlobalTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
canParallel = false; canParallel = false;
break; } else if (mndIsDbTrans(pTrans)) {
}
if (mndIsDbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) { if (pNewTrans->dbUid == pTrans->dbUid) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
canParallel = false; canParallel = false;
break;
} }
} else {
} }
if (mndIsStbTrans(pTrans)) continue;
} }
sdbRelease(pMnode->pSdb, pTrans); sdbRelease(pMnode->pSdb, pTrans);
@ -842,12 +847,14 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle); pTrans->rpcAHandle);
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, SRpcMsg rspMsg = {
.code = pTrans->code, .handle = pTrans->rpcHandle,
.ahandle = pTrans->rpcAHandle, .ahandle = pTrans->rpcAHandle,
.refId = pTrans->rpcRefId, .refId = pTrans->rpcRefId,
.code = pTrans->code,
.pCont = rpcCont, .pCont = rpcCont,
.contLen = pTrans->rpcRspLen}; .contLen = pTrans->rpcRspLen,
};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
pTrans->rpcHandle = NULL; pTrans->rpcHandle = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;

View File

@ -22,7 +22,7 @@ class MndTestTrans2 : public ::testing::Test {
static void SetUpTestSuite() { static void SetUpTestSuite() {
dDebugFlag = 143; dDebugFlag = 143;
vDebugFlag = 0; vDebugFlag = 0;
mDebugFlag = 143; mDebugFlag = 207;
cDebugFlag = 0; cDebugFlag = 0;
jniDebugFlag = 0; jniDebugFlag = 0;
tmrDebugFlag = 135; tmrDebugFlag = 135;