From 9f1c8cc7b88e6baed8fe9e407c8df254e128857b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 18:21:06 +0800 Subject: [PATCH] enh: unify validation of prepare actions with cb validateFp --- source/dnode/mnode/impl/src/mndDb.c | 16 ++++++ source/dnode/mnode/impl/src/mndSync.c | 68 +++++++------------------ source/dnode/mnode/impl/src/mndVgroup.c | 13 +++++ source/dnode/mnode/sdb/inc/sdb.h | 3 ++ source/dnode/mnode/sdb/src/sdb.c | 1 + 5 files changed, 51 insertions(+), 50 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6517ab826b..73a4e42abf 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); + static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq); @@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndDbActionInsert, .updateFp = (SdbUpdateFp)mndDbActionUpdate, .deleteFp = (SdbDeleteFp)mndDbActionDelete, + .validateFp = (SdbValidateFp)mndNewDbActionValidate, }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq); @@ -247,6 +250,19 @@ _OVER: return pRow; } +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SDbObj *pNewDb = pObj; + + SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); + if (pOldDb != NULL) { + mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); + sdbRelease(pMnode->pSdb, pOldDb); + return -1; + } + + return 0; +} + static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb); return 0; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ad91a634ae..3d8fd6220f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -16,9 +16,7 @@ #define _DEFAULT_SOURCE #include "mndSync.h" #include "mndCluster.h" -#include "mndDb.h" #include "mndTrans.h" -#include "mndVgroup.h" static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { @@ -75,58 +73,28 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t mndValidateNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int code = -1; - SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_VGROUP])(pAction->pRaw); - if (pRow == NULL) goto _OUT; - SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) goto _OUT; - - int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); - if (maxVgId > pVgroup->vgId) { - mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); - goto _OUT; - } - - code = 0; -_OUT: - taosMemoryFreeClear(pRow); - return code; -} - -static int32_t mndValidateCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int code = -1; - SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_DB])(pAction->pRaw); - if (pRow == NULL) goto _OUT; - SDbObj *pNewDb = sdbGetRowObj(pRow); - if (pNewDb == NULL) goto _OUT; - - SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); - if (pOldDb != NULL) { - mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); - sdbRelease(pMnode->pSdb, pOldDb); - goto _OUT; - } - - code = 0; -_OUT: - taosMemoryFreeClear(pRow); - return code; -} - static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int32_t code = 0; + SSdbRaw *pRaw = pAction->pRaw; + SSdb *pSdb = pMnode->pSdb; + SSdbRow *pRow = NULL; + void *pObj = NULL; + int code = -1; - switch (pAction->pRaw->type) { - case SDB_VGROUP: - code = mndValidateNewVgPrepareAction(pMnode, pTrans, pAction); - break; - case SDB_DB: - code = mndValidateCreateDbPrepareAction(pMnode, pTrans, pAction); - break; - default: + if (pRaw->status != SDB_STATUS_CREATING) goto _OUT; + + pRow = (pSdb->decodeFps[pRaw->type])(pRaw); + if (pRow == NULL) goto _OUT; + pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto _OUT; + + SdbValidateFp validateFp = pSdb->validateFps[pRaw->type]; + code = 0; + if (validateFp) { + code = validateFp(pMnode, pTrans, pObj); } +_OUT: + taosMemoryFreeClear(pRow); return code; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 99adaf8f67..0bf524508a 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -33,6 +33,7 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); @@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndVgroupActionInsert, .updateFp = (SdbUpdateFp)mndVgroupActionUpdate, .deleteFp = (SdbDeleteFp)mndVgroupActionDelete, + .validateFp = (SdbValidateFp)mndNewVgActionValidate, }; mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp); @@ -171,6 +173,17 @@ _OVER: return pRow; } +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SVgObj *pVgroup = pObj; + + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (maxVgId > pVgroup->vgId) { + mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); + return -1; + } + return 0; +} + static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) { mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup); return 0; diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3c96d8a2fd..695373d220 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); +typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); @@ -189,6 +190,7 @@ typedef struct SSdb { SdbDeployFp deployFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX]; + SdbValidateFp validateFps[SDB_MAX]; TdThreadMutex filelock; } SSdb; @@ -207,6 +209,7 @@ typedef struct { SdbInsertFp insertFp; SdbUpdateFp updateFp; SdbDeleteFp deleteFp; + SdbValidateFp validateFp; } SSdbTable; typedef struct SSdbOpt { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 9797dd8337..c4b32fe87c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->deployFps[sdbType] = table.deployFp; pSdb->encodeFps[sdbType] = table.encodeFp; pSdb->decodeFps[sdbType] = table.decodeFp; + pSdb->validateFps[sdbType] = table.validateFp; int32_t hashType = 0; if (keyType == SDB_KEY_INT32) {