From 7eede87869b466e804645d565ea1f42fa27891d3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 4 May 2022 16:16:35 +0800 Subject: [PATCH] fix: reset confict trans types --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndTrans.c | 61 +++++++++++-------- source/dnode/mnode/impl/test/trans/trans2.cpp | 2 +- 3 files changed, 36 insertions(+), 28 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a3c143df41..d516b0bf26 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -72,6 +72,7 @@ typedef enum { TRN_TYPE_DROP_USER = 1003, TRN_TYPE_CREATE_FUNC = 1004, TRN_TYPE_DROP_FUNC = 1005, + TRN_TYPE_CREATE_SNODE = 1006, TRN_TYPE_DROP_SNODE = 1007, TRN_TYPE_CREATE_QNODE = 1008, diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index f3d7bf8c66..ce2a0efd2f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -63,13 +63,15 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); int32_t mndInitTrans(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_TRANS, - .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndTransActionEncode, - .decodeFp = (SdbDecodeFp)mndTransActionDecode, - .insertFp = (SdbInsertFp)mndTransActionInsert, - .updateFp = (SdbUpdateFp)mndTransActionUpdate, - .deleteFp = (SdbDeleteFp)mndTransActionDelete}; + SSdbTable table = { + .sdbType = SDB_TRANS, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndTransActionEncode, + .decodeFp = (SdbDecodeFp)mndTransActionDecode, + .insertFp = (SdbInsertFp)mndTransActionInsert, + .updateFp = (SdbUpdateFp)mndTransActionUpdate, + .deleteFp = (SdbDeleteFp)mndTransActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq); @@ -411,6 +413,16 @@ static const char *mndTransType(ETrnType type) { return "subscribe"; case TRN_TYPE_REBALANCE: return "rebalance"; + case TRN_TYPE_COMMIT_OFFSET: + return "commit-offset"; + case TRN_TYPE_CREATE_STREAM: + return "create-stream"; + case TRN_TYPE_DROP_STREAM: + return "drop-stream"; + case TRN_TYPE_CONSUMER_LOST: + return "consumer-lost"; + case TRN_TYPE_CONSUMER_RECOVER: + return "consumer-recover"; case TRN_TYPE_CREATE_DNODE: return "create-qnode"; case TRN_TYPE_DROP_DNODE: @@ -713,41 +725,34 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); canParallel = false; - break; + } else { } } - if (mndIsDbTrans(pNewTrans)) { - if (mndIsBasicTrans(pTrans)) continue; + else if (mndIsDbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); canParallel = false; - break; - } - if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { + } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); canParallel = false; - break; } + } else { } } - if (mndIsStbTrans(pNewTrans)) { - if (mndIsBasicTrans(pTrans)) continue; + else if (mndIsStbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); canParallel = false; - break; - } - if (mndIsDbTrans(pTrans)) { + } else if (mndIsDbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); canParallel = false; - break; } + } else { } - if (mndIsStbTrans(pTrans)) continue; } sdbRelease(pMnode->pSdb, pTrans); @@ -842,12 +847,14 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, pTrans->rpcAHandle); - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, - .code = pTrans->code, - .ahandle = pTrans->rpcAHandle, - .refId = pTrans->rpcRefId, - .pCont = rpcCont, - .contLen = pTrans->rpcRspLen}; + SRpcMsg rspMsg = { + .handle = pTrans->rpcHandle, + .ahandle = pTrans->rpcAHandle, + .refId = pTrans->rpcRefId, + .code = pTrans->code, + .pCont = rpcCont, + .contLen = pTrans->rpcRspLen, + }; tmsgSendRsp(&rspMsg); pTrans->rpcHandle = NULL; pTrans->rpcRsp = NULL; diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index abab24723f..252cd105a0 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -22,7 +22,7 @@ class MndTestTrans2 : public ::testing::Test { static void SetUpTestSuite() { dDebugFlag = 143; vDebugFlag = 0; - mDebugFlag = 143; + mDebugFlag = 207; cDebugFlag = 0; jniDebugFlag = 0; tmrDebugFlag = 135;