From 300a3a2c0221f362839730358bfdbc8d92028be5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 15:46:16 +0800 Subject: [PATCH 1/3] enh: drop sma while drop stb and db --- source/dnode/mnode/impl/inc/mndSma.h | 2 + source/dnode/mnode/impl/src/mndDb.c | 1 + source/dnode/mnode/impl/src/mndSma.c | 139 ++++++++++++++++++++++----- source/dnode/mnode/impl/src/mndStb.c | 33 ++++--- 4 files changed, 137 insertions(+), 38 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndSma.h b/source/dnode/mnode/impl/inc/mndSma.h index 4a80f619d3..5d13f8b74d 100644 --- a/source/dnode/mnode/impl/inc/mndSma.h +++ b/source/dnode/mnode/impl/inc/mndSma.h @@ -26,6 +26,8 @@ int32_t mndInitSma(SMnode *pMnode); void mndCleanupSma(SMnode *pMnode); SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); +int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb); +int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index cfe363fdf0..43263af573 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -935,6 +935,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER; + if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; SUserObj *pUser = mndAcquireUser(pMnode, pDb->createUser); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index c19b558f19..cc57076c0a 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -38,10 +38,10 @@ static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma); static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups); -static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq); -static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq); +static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq); +static int32_t mndProcessDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); -static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); +static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextSma(SMnode *pMnode, void *pIter); @@ -56,8 +56,8 @@ int32_t mndInitSma(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSmaActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessCreateSmaReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessDropSmaReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq); @@ -79,7 +79,6 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { if (pRaw == NULL) goto _OVER; int32_t dataPos = 0; - SDB_SET_BINARY(pRaw, dataPos, pSma->name, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->stb, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pSma->db, TSDB_DB_FNAME_LEN, _OVER) @@ -100,6 +99,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_INT32(pRaw, dataPos, pSma->tagsFilterLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->sqlLen, _OVER) SDB_SET_INT32(pRaw, dataPos, pSma->astLen, _OVER) + if (pSma->exprLen > 0) { SDB_SET_BINARY(pRaw, dataPos, pSma->expr, pSma->exprLen, _OVER) } @@ -115,6 +115,7 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { SDB_SET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) + terrno = 0; _OVER: @@ -193,6 +194,7 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) { } SDB_GET_RESERVE(pRaw, dataPos, TSDB_SMA_RESERVE_SIZE, _OVER) + terrno = 0; _OVER: @@ -383,6 +385,25 @@ static int32_t mndSetCreateSmaVgroupCommitLogs(SMnode *pMnode, STrans *pTrans, S return 0; } +static int32_t mndSetUpdateSmaStbCommitLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) { + SStbObj stbObj = {0}; + taosRLockLatch(&pStb->lock); + memcpy(&stbObj, pStb, sizeof(SStbObj)); + taosRUnLockLatch(&pStb->lock); + stbObj.pColumns = NULL; + stbObj.pTags = NULL; + stbObj.updateTime = taosGetTimestampMs(); + stbObj.lock = 0; + stbObj.smaVer++; + + SSdbRaw *pCommitRaw = mndStbActionEncode(&stbObj); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; + + return 0; +} + static int32_t mndSetCreateSmaRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SSmaObj *pSma) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; @@ -457,7 +478,6 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, pSma->schemaTag.pSchema[0].flags = 0; snprintf(pSma->schemaTag.pSchema[0].name, TSDB_COL_NAME_LEN, "groupId"); - int32_t smaContLen = 0; void *pSmaReq = mndBuildVCreateSmaReq(pMnode, pVgroup, pSma, &smaContLen); if (pSmaReq == NULL) return -1; @@ -559,6 +579,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; + if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; @@ -599,7 +620,7 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { return 0; } -static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq) { +static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SStbObj *pStb = NULL; @@ -781,13 +802,17 @@ static int32_t mndSetDropSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans, SD } static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *pSma) { - int32_t code = -1; - SVgObj *pVgroup = NULL; - STrans *pTrans = NULL; + int32_t code = -1; + SVgObj *pVgroup = NULL; + SStbObj *pStb = NULL; + STrans *pTrans = NULL; pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVgroup == NULL) goto _OVER; + pStb = mndAcquireStb(pMnode, pSma->stb); + if (pStb == NULL) goto _OVER; + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq); if (pTrans == NULL) goto _OVER; @@ -798,6 +823,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; @@ -807,10 +833,78 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p _OVER: mndTransDrop(pTrans); mndReleaseVgroup(pMnode, pVgroup); + mndReleaseStb(pMnode, pStb); return code; } -static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq) { +int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; + + while (1) { + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + if (pSma->stbUid == pStb->uid) { + pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVgroup == NULL) goto _OVER; + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetDropSmaRedoActions(pMnode, pTrans, pDb, pSma) != 0) goto _OVER; + mndReleaseVgroup(pMnode, pVgroup); + pVgroup = NULL; + } + + sdbRelease(pSdb, pSma); + } + + code = 0; + +_OVER: + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pSma); + mndReleaseVgroup(pMnode, pVgroup); + return code; +} + +int32_t mndDropSmasByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; + + while (1) { + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + if (pSma->dbUid == pDb->uid) { + pVgroup = mndAcquireVgroup(pMnode, pSma->dstVgId); + if (pVgroup == NULL) goto _OVER; + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + mndReleaseVgroup(pMnode, pVgroup); + pVgroup = NULL; + } + + sdbRelease(pSdb, pSma); + } + + code = 0; + +_OVER: + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pSma); + mndReleaseVgroup(pMnode, pVgroup); + return code; +} + +static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SUserObj *pUser = NULL; @@ -900,10 +994,10 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp } static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) { - int32_t code = 0; - SSmaObj *pSma = NULL; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; + int32_t code = 0; + SSmaObj *pSma = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; STableIndexInfo info; while (1) { @@ -922,14 +1016,14 @@ static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIn info.dstTbUid = pSma->dstTbUid; info.dstVgId = pSma->dstVgId; - SVgObj* pVg = mndAcquireVgroup(pMnode, pSma->dstVgId); + SVgObj *pVg = mndAcquireVgroup(pMnode, pSma->dstVgId); if (pVg == NULL) { code = -1; sdbRelease(pSdb, pSma); return code; } info.epSet = mndGetVgroupEpset(pMnode, pVg); - + info.expr = taosMemoryMalloc(pSma->exprLen + 1); if (info.expr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -953,7 +1047,7 @@ static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIn sdbRelease(pSdb, pSma); } - + return code; } @@ -1005,10 +1099,10 @@ _OVER: static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) { STableIndexReq indexReq = {0}; - SMnode *pMnode = pReq->info.node; - int32_t code = -1; + SMnode *pMnode = pReq->info.node; + int32_t code = -1; STableIndexRsp rsp = {0}; - bool exist = false; + bool exist = false; if (tDeserializeSTableIndexReq(pReq->pCont, pReq->contLen, &indexReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -1055,7 +1149,6 @@ _OVER: return code; } - static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ad0c913fb8..cb0a893436 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -23,6 +23,7 @@ #include "mndPerfSchema.h" #include "mndScheduler.h" #include "mndShow.h" +#include "mndSma.h" #include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" @@ -36,9 +37,9 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw); static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb); static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew); -static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq); -static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq); -static int32_t mndProcessMDropStbReq(SRpcMsg *pReq); +static int32_t mndProcessCreateStbReq(SRpcMsg *pReq); +static int32_t mndProcessAlterStbReq(SRpcMsg *pReq); +static int32_t mndProcessDropStbReq(SRpcMsg *pReq); static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); @@ -54,9 +55,9 @@ int32_t mndInitStb(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndStbActionDelete, }; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessCreateStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessAlterStbReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessDropStbReq); mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp); @@ -318,6 +319,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->tagVer = pNew->tagVer; pOld->colVer = pNew->colVer; + pOld->smaVer = pNew->smaVer; pOld->nextColId = pNew->nextColId; pOld->ttl = pNew->ttl; pOld->numOfColumns = pNew->numOfColumns; @@ -361,7 +363,7 @@ SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName) { return mndAcquireDb(pMnode, db); } -static FORCE_INLINE int schemaExColIdCompare(const void *colId, const void *pSchema) { +static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *pSchema) { if (*(col_id_t *)colId < ((SSchema *)pSchema)->colId) { return -1; } else if (*(col_id_t *)colId > ((SSchema *)pSchema)->colId) { @@ -395,14 +397,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.pRSmaParam.xFilesFactor = pStb->xFilesFactor; req.pRSmaParam.delay = pStb->delay; if (pStb->ast1Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, - req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, pStb->pAst1, pStb->uid, + STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } if (pStb->ast2Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, STREAM_TRIGGER_AT_ONCE, 0, - req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { + if (mndConvertRsmaTask(&req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, pStb->pAst2, pStb->uid, + STREAM_TRIGGER_AT_ONCE, 0, req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) { return NULL; } } @@ -761,7 +763,7 @@ int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p return 0; } -static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq) { +static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SStbObj *pStb = NULL; @@ -1296,7 +1298,7 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) { - int ret; + int32_t ret; SEncoder ec = {0}; uint32_t contLen = 0; SMAlterStbRsp alterRsp = {0}; @@ -1415,7 +1417,7 @@ _OVER: return code; } -static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) { +static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SDbObj *pDb = NULL; @@ -1545,6 +1547,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; if (mndSetDropStbRedoActions(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; + if (mndDropSmasByStb(pMnode, pTrans, pDb, pStb) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -1554,7 +1557,7 @@ _OVER: return code; } -static int32_t mndProcessMDropStbReq(SRpcMsg *pReq) { +static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; SUserObj *pUser = NULL; From c9509acf222bc3777a4bca3fea29d1c1cf1ffa0a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 16:01:32 +0800 Subject: [PATCH 2/3] refactor: mnode pre process msg --- include/dnode/mnode/mnode.h | 20 +++++++++++++++++--- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 3 ++- source/dnode/mnode/impl/src/mndQuery.c | 7 +++---- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 99b704826d..21d8405f58 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -59,7 +59,13 @@ void mndClose(SMnode *pMnode); * @param pMnode The mnode object. */ int32_t mndStart(SMnode *pMnode); -void mndStop(SMnode *pMnode); + +/** + * @brief Stop mnode + * + * @param pMnode The mnode object. + */ +void mndStop(SMnode *pMnode); /** * @brief Get mnode monitor info. @@ -71,17 +77,25 @@ void mndStop(SMnode *pMnode); * @return int32_t 0 for success, -1 for failure. */ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pCluster, SMonVgroupInfo *pVgroup, SMonGrantInfo *pGrant); + +/** + * @brief Get mnode loads for status msg. + * + * @param pMnode The mnode object. + * @param pLoad + * @return int32_t 0 for success, -1 for failure. + */ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); /** - * @brief Process the read, write, sync request. + * @brief Process the rpc, sync request. * * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); -int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg); +int32_t mndPreProcessMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index ee2df5f089..7cd7da1aa9 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -114,7 +114,8 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - if (mndPreprocessQueryMsg(pMgmt->pMnode, pMsg) != 0) { + pMsg->info.node = pMgmt->pMnode; + if (mndPreProcessMsg(pMsg) != 0) { dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); return -1; } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 12b39e5b78..5374f48e47 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,11 +18,10 @@ #include "mndMnode.h" #include "qworker.h" -int32_t mndPreprocessQueryMsg(SMnode * pMnode, SRpcMsg * pMsg) { - if (TDMT_VND_QUERY != pMsg->msgType) { - return 0; - } +int32_t mndPreProcessMsg(SRpcMsg *pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) return 0; + SMnode *pMnode = pMsg->info.node; return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } From 72bc32af1e16ef1af3496e7ac4db30aa0747519c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 14 Jun 2022 16:17:37 +0800 Subject: [PATCH 3/3] ehn: remove sync ref --- source/dnode/mnode/impl/src/mndMain.c | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3b2eafced8..bd82701ae4 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -393,11 +393,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { return TAOS_SYNC_OTHER_ERROR; } - if (mndAcquireSyncRef(pMnode) != 0) { - mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr()); - return TAOS_SYNC_OTHER_ERROR; - } - char logBuf[512] = {0}; char *syncNodeStr = sync2SimpleStr(pMgmt->sync); snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); @@ -501,7 +496,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } } - mndReleaseSyncRef(pMnode); return code; } @@ -754,24 +748,3 @@ void mndSetStop(SMnode *pMnode) { } bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; } - -int32_t mndAcquireSyncRef(SMnode *pMnode) { - int32_t code = 0; - taosThreadRwlockRdlock(&pMnode->lock); - if (pMnode->stopped) { - terrno = TSDB_CODE_APP_NOT_READY; - code = -1; - } else { - int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1); - // mTrace("mnode sync is acquired, ref:%d", ref); - } - taosThreadRwlockUnlock(&pMnode->lock); - return code; -} - -void mndReleaseSyncRef(SMnode *pMnode) { - taosThreadRwlockRdlock(&pMnode->lock); - int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); - // mTrace("mnode sync is released, ref:%d", ref); - taosThreadRwlockUnlock(&pMnode->lock); -}