From f01a26f68a22036b986daa4754c223e85de2b17d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 14:26:58 +0800 Subject: [PATCH 1/5] enh: refactor func names of new Vg prepare actions --- source/dnode/mnode/impl/inc/mndVgroup.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 6 +++--- source/dnode/mnode/impl/src/mndSma.c | 2 +- source/dnode/mnode/impl/src/mndVgroup.c | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 7c2f8b5b65..4dbd2fe7f8 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -37,7 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId); int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups); -int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg); +int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg); int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid); int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 4f7e80c0a3..e9f04dac52 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -448,9 +448,9 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; } -static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { - if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; + if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; } return 0; } @@ -633,7 +633,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; mndTransSetOper(pTrans, MND_OPER_CREATE_DB); - if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; + if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index e186a8742f..d666f80fd3 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -631,7 +631,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mndTransSetSerial(pTrans); mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); - if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 406392271c..99adaf8f67 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1259,7 +1259,7 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb return 0; } -int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { +int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { SSdbRaw *pRaw = mndVgroupActionEncode(pVg); if (pRaw == NULL) goto _err; @@ -2380,13 +2380,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); int32_t srcVgId = newVg1.vgId; newVg1.vgId = maxVgId; - if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1) != 0) goto _OVER; if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER; maxVgId++; srcVgId = newVg2.vgId; newVg2.vgId = maxVgId; - if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; From e7411183d9b28ab2143679442d1a455f9b220231 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 17:08:24 +0800 Subject: [PATCH 2/5] enh: create SDB_DB entry in prepareAction --- source/dnode/mnode/impl/src/mndDb.c | 13 +++++- source/dnode/mnode/impl/src/mndSync.c | 63 ++++++++++++++++++++------- source/dnode/mnode/sdb/src/sdbHash.c | 2 + 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e9f04dac52..6517ab826b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -448,6 +448,16 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; } +static int32_t mndSetCreateDbPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pDbRaw = mndDbActionEncode(pDb); + if (pDbRaw == NULL) return -1; + + STransAction action = {.pRaw = pDbRaw, .msgType = TDMT_MND_CREATE_DB}; + if (mndTransAppendPrepareAction(pTrans, &action) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; + return 0; +} + static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; @@ -459,7 +469,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1; - if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_UPDATE) != 0) return -1; for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); @@ -633,6 +643,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; mndTransSetOper(pTrans, MND_OPER_CREATE_DB); + if (mndSetCreateDbPrepareActions(pMnode, pTrans, &dbObj) != 0) goto _OVER; if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 68bfe09b5e..ad91a634ae 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndSync.h" #include "mndCluster.h" +#include "mndDb.h" #include "mndTrans.h" #include "mndVgroup.h" @@ -74,23 +75,17 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - SSdbRow *pRow = NULL; - int32_t code = -1; +static int32_t mndValidateNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int code = -1; + SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_VGROUP])(pAction->pRaw); + if (pRow == NULL) goto _OUT; + SVgObj *pVgroup = sdbGetRowObj(pRow); + if (pVgroup == NULL) goto _OUT; - if (pAction->msgType == TDMT_MND_CREATE_VG) { - pRow = mndVgroupActionDecode(pAction->pRaw); - if (pRow == NULL) goto _OUT; - - SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) goto _OUT; - - int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); - if (maxVgId > pVgroup->vgId) { - mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId, - maxVgId); - goto _OUT; - } + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (maxVgId > pVgroup->vgId) { + mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); + goto _OUT; } code = 0; @@ -99,6 +94,42 @@ _OUT: return code; } +static int32_t mndValidateCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int code = -1; + SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_DB])(pAction->pRaw); + if (pRow == NULL) goto _OUT; + SDbObj *pNewDb = sdbGetRowObj(pRow); + if (pNewDb == NULL) goto _OUT; + + SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); + if (pOldDb != NULL) { + mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); + sdbRelease(pMnode->pSdb, pOldDb); + goto _OUT; + } + + code = 0; +_OUT: + taosMemoryFreeClear(pRow); + return code; +} + +static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int32_t code = 0; + + switch (pAction->pRaw->type) { + case SDB_VGROUP: + code = mndValidateNewVgPrepareAction(pMnode, pTrans, pAction); + break; + case SDB_DB: + code = mndValidateCreateDbPrepareAction(pMnode, pTrans, pAction); + break; + default: + } + + return code; +} + static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) { int32_t code = -1; int32_t action = 0; diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 258b22d8ee..09743d549a 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) { return "dropped"; case SDB_STATUS_INIT: return "init"; + case SDB_STATUS_UPDATE: + return "update"; default: return "undefine"; } From 9f1c8cc7b88e6baed8fe9e407c8df254e128857b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 18:21:06 +0800 Subject: [PATCH 3/5] enh: unify validation of prepare actions with cb validateFp --- source/dnode/mnode/impl/src/mndDb.c | 16 ++++++ source/dnode/mnode/impl/src/mndSync.c | 68 +++++++------------------ source/dnode/mnode/impl/src/mndVgroup.c | 13 +++++ source/dnode/mnode/sdb/inc/sdb.h | 3 ++ source/dnode/mnode/sdb/src/sdb.c | 1 + 5 files changed, 51 insertions(+), 50 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6517ab826b..73a4e42abf 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); + static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq); @@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndDbActionInsert, .updateFp = (SdbUpdateFp)mndDbActionUpdate, .deleteFp = (SdbDeleteFp)mndDbActionDelete, + .validateFp = (SdbValidateFp)mndNewDbActionValidate, }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq); @@ -247,6 +250,19 @@ _OVER: return pRow; } +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SDbObj *pNewDb = pObj; + + SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); + if (pOldDb != NULL) { + mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); + sdbRelease(pMnode->pSdb, pOldDb); + return -1; + } + + return 0; +} + static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb); return 0; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ad91a634ae..3d8fd6220f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -16,9 +16,7 @@ #define _DEFAULT_SOURCE #include "mndSync.h" #include "mndCluster.h" -#include "mndDb.h" #include "mndTrans.h" -#include "mndVgroup.h" static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { @@ -75,58 +73,28 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t mndValidateNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int code = -1; - SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_VGROUP])(pAction->pRaw); - if (pRow == NULL) goto _OUT; - SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) goto _OUT; - - int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); - if (maxVgId > pVgroup->vgId) { - mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); - goto _OUT; - } - - code = 0; -_OUT: - taosMemoryFreeClear(pRow); - return code; -} - -static int32_t mndValidateCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int code = -1; - SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_DB])(pAction->pRaw); - if (pRow == NULL) goto _OUT; - SDbObj *pNewDb = sdbGetRowObj(pRow); - if (pNewDb == NULL) goto _OUT; - - SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); - if (pOldDb != NULL) { - mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); - sdbRelease(pMnode->pSdb, pOldDb); - goto _OUT; - } - - code = 0; -_OUT: - taosMemoryFreeClear(pRow); - return code; -} - static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int32_t code = 0; + SSdbRaw *pRaw = pAction->pRaw; + SSdb *pSdb = pMnode->pSdb; + SSdbRow *pRow = NULL; + void *pObj = NULL; + int code = -1; - switch (pAction->pRaw->type) { - case SDB_VGROUP: - code = mndValidateNewVgPrepareAction(pMnode, pTrans, pAction); - break; - case SDB_DB: - code = mndValidateCreateDbPrepareAction(pMnode, pTrans, pAction); - break; - default: + if (pRaw->status != SDB_STATUS_CREATING) goto _OUT; + + pRow = (pSdb->decodeFps[pRaw->type])(pRaw); + if (pRow == NULL) goto _OUT; + pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto _OUT; + + SdbValidateFp validateFp = pSdb->validateFps[pRaw->type]; + code = 0; + if (validateFp) { + code = validateFp(pMnode, pTrans, pObj); } +_OUT: + taosMemoryFreeClear(pRow); return code; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 99adaf8f67..0bf524508a 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -33,6 +33,7 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); @@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndVgroupActionInsert, .updateFp = (SdbUpdateFp)mndVgroupActionUpdate, .deleteFp = (SdbDeleteFp)mndVgroupActionDelete, + .validateFp = (SdbValidateFp)mndNewVgActionValidate, }; mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp); @@ -171,6 +173,17 @@ _OVER: return pRow; } +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SVgObj *pVgroup = pObj; + + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (maxVgId > pVgroup->vgId) { + mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); + return -1; + } + return 0; +} + static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) { mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup); return 0; diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3c96d8a2fd..695373d220 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); +typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); @@ -189,6 +190,7 @@ typedef struct SSdb { SdbDeployFp deployFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX]; + SdbValidateFp validateFps[SDB_MAX]; TdThreadMutex filelock; } SSdb; @@ -207,6 +209,7 @@ typedef struct { SdbInsertFp insertFp; SdbUpdateFp updateFp; SdbDeleteFp deleteFp; + SdbValidateFp validateFp; } SSdbTable; typedef struct SSdbOpt { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 9797dd8337..c4b32fe87c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->deployFps[sdbType] = table.deployFp; pSdb->encodeFps[sdbType] = table.encodeFp; pSdb->decodeFps[sdbType] = table.decodeFp; + pSdb->validateFps[sdbType] = table.validateFp; int32_t hashType = 0; if (keyType == SDB_KEY_INT32) { From f30d1ebacdcc1d70706ead27a4d19010d03afebf Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 18:35:44 +0800 Subject: [PATCH 4/5] enh: refactor func name as mndTransAppendPrepareLog --- source/dnode/mnode/impl/inc/mndTrans.h | 4 +++- source/dnode/mnode/impl/src/mndDb.c | 7 +++---- source/dnode/mnode/impl/src/mndTrans.c | 9 ++++----- source/dnode/mnode/impl/src/mndVgroup.c | 3 +-- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 625546aa55..04544da80e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -66,11 +66,13 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans); STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq, const char *opername); void mndTransDrop(STrans *pTrans); + +int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendNullLog(STrans *pTrans); -int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction); + int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 73a4e42abf..56cea10b32 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -464,12 +464,11 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; } -static int32_t mndSetCreateDbPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { +static int32_t mndSetCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; - STransAction action = {.pRaw = pDbRaw, .msgType = TDMT_MND_CREATE_DB}; - if (mndTransAppendPrepareAction(pTrans, &action) != 0) return -1; + if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1; if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; return 0; } @@ -659,7 +658,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; mndTransSetOper(pTrans, MND_OPER_CREATE_DB); - if (mndSetCreateDbPrepareActions(pMnode, pTrans, &dbObj) != 0) goto _OVER; + if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER; if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7ebaf6dda5..02d9368595 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -655,11 +655,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendAction(pTrans->commitActions, &action); } -int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) { - pAction->stage = TRN_STAGE_PREPARE; - pAction->actionType = TRANS_ACTION_RAW; - pAction->mTraceId = pTrans->mTraceId; - return mndTransAppendAction(pTrans->prepareActions, pAction); +int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = { + .pRaw = pRaw, .stage = TRN_STAGE_PREPARE, .actionType = TRANS_ACTION_RAW, .mTraceId = pTrans->mTraceId}; + return mndTransAppendAction(pTrans->prepareActions, &action); } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 0bf524508a..e0156db67c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1276,8 +1276,7 @@ int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { SSdbRaw *pRaw = mndVgroupActionEncode(pVg); if (pRaw == NULL) goto _err; - STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG}; - if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err; + if (mndTransAppendPrepareLog(pTrans, pRaw) != 0) goto _err; (void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); pRaw = NULL; return 0; From d3ce0c1f80472bba65f1705ae2845a0aee9dcf4b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 21 Aug 2023 11:42:57 +0800 Subject: [PATCH 5/5] fix: ensure creating status of create-db before completion --- source/dnode/mnode/impl/src/mndDb.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 56cea10b32..20b342f9e3 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -660,7 +660,6 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mndTransSetOper(pTrans, MND_OPER_CREATE_DB); if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER; if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; - if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;