From 536627edbf590b359b73d0aa81ea98104bf28e9d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 18 Feb 2022 14:12:29 +0800 Subject: [PATCH] parallel exec trans --- include/util/taoserror.h | 1 + source/dnode/mnode/impl/inc/mndDef.h | 12 ++-- source/dnode/mnode/impl/src/mndTrans.c | 82 ++++++++++++++++++++++++++ source/util/src/terror.c | 1 + 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6f9bc35f2a..92028b0837 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -247,6 +247,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) #define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2) +#define TSDB_CODE_MND_TRANS_CANT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4) // mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a092882e1f..f30301c72e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -132,12 +132,14 @@ typedef enum { TRN_TYPE_CREATE_DB = 3001, TRN_TYPE_ALTER_DB = 3002, TRN_TYPE_DROP_DB = 3003, - TRN_TYPE_CREATE_STB = 3004, - TRN_TYPE_ALTER_STB = 3005, - TRN_TYPE_DROP_STB = 3006, - TRN_TYPE_SPLIT_VGROUP = 3007, - TRN_TYPE_MERGE_VGROUP = 3018, + TRN_TYPE_SPLIT_VGROUP = 3004, + TRN_TYPE_MERGE_VGROUP = 3015, TRN_TYPE_DB_SCOPE_END, + TRN_TYPE_STB_SCOPE = 4000, + TRN_TYPE_CREATE_STB = 4001, + TRN_TYPE_ALTER_STB = 4002, + TRN_TYPE_DROP_STB = 4003, + TRN_TYPE_STB_SCOPE_END, } ETrnType; typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5fcdc6d8f2..f7226d9df7 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -546,7 +546,89 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { return 0; } +static bool mndIsBasicTrans(STrans *pTrans) { + return pTrans->stage > TRN_TYPE_BASIC_SCOPE && pTrans->stage < TRN_TYPE_BASIC_SCOPE_END; +} + +static bool mndIsGlobalTrans(STrans *pTrans) { + return pTrans->stage > TRN_TYPE_GLOBAL_SCOPE && pTrans->stage < TRN_TYPE_GLOBAL_SCOPE_END; +} + +static bool mndIsDbTrans(STrans *pTrans) { + return pTrans->stage > TRN_TYPE_DB_SCOPE && pTrans->stage < TRN_TYPE_DB_SCOPE_END; +} + +static bool mndIsStbTrans(STrans *pTrans) { + return pTrans->stage > TRN_TYPE_STB_SCOPE && pTrans->stage < TRN_TYPE_STB_SCOPE_END; +} + +static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewTrans) { + if (mndIsBasicTrans(pNewTrans)) return 0; + + STrans *pTrans = NULL; + void *pIter = NULL; + int32_t code = 0; + + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); + if (pIter == NULL) break; + + if (mndIsGlobalTrans(pNewTrans)) { + if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { + mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); + code = -1; + break; + } + } + + if (mndIsDbTrans(pNewTrans)) { + if (mndIsBasicTrans(pTrans)) continue; + if (mndIsGlobalTrans(pTrans)) { + mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); + code = -1; + break; + } + if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { + if (pNewTrans->dbUid == pTrans->dbUid) { + mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); + code = -1; + break; + } + } + } + + if (mndIsStbTrans(pNewTrans)) { + if (mndIsBasicTrans(pTrans)) continue; + if (mndIsGlobalTrans(pTrans)) { + mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); + code = -1; + break; + } + if (mndIsDbTrans(pTrans)) { + if (pNewTrans->dbUid == pTrans->dbUid) { + mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); + code = -1; + break; + } + } + if (mndIsStbTrans(pTrans)) continue; + } + + sdbRelease(pMnode->pSdb, pTrans); + } + + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pTrans); + return code; +} + int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { + if (mndCheckTransCanBeStartedInParallel(pMnode, pTrans) != 0) { + terrno = TSDB_CODE_MND_TRANS_CANT_PARALLEL; + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + return -1; + } + mDebug("trans:%d, prepare transaction", pTrans->id); if (mndTransSync(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f4ee13c067..2b53a769ff 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -257,6 +257,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill") // mnode-topic TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet")