From e7377e410caa4a55a37ccc7efd985cfc43c8be16 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 17 Aug 2022 15:10:15 +0800 Subject: [PATCH] refactor(mnode): check drop and alter stb for stream and topic --- source/dnode/mnode/impl/src/mndStb.c | 167 +++++++++++++++++++++++++-- 1 file changed, 160 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 3e747b66c8..dd2b595c29 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1145,7 +1145,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p return 0; } -int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) { +static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; while (1) { @@ -1154,7 +1154,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t if (pIter == NULL) break; mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", - pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql); + pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql); if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { sdbRelease(pSdb, pTopic); continue; @@ -1192,20 +1192,66 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t sdbRelease(pSdb, pTopic); nodesDestroyNode(pAst); } + return 0; +} +static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SStreamObj *pStream = NULL; + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + SNode *pAst = NULL; + if (nodesStringToNode(pStream->ast, &pAst) != 0) { + ASSERT(0); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + + if (pCol->tableId != suid) { + mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + goto NEXT; + } + if (pCol->colId > 0 && pCol->colId == colId) { + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; + mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId); + return -1; + } + mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + } + + NEXT: + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + } + return 0; +} + +static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; while (1) { SSmaObj *pSma = NULL; pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); if (pIter == NULL) break; - mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname, - suid, colId, pSma->sql); + mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, + stbFullName, suid, colId, pSma->sql); SNode *pAst = NULL; if (nodesStringToNode(pSma->ast, &pAst) != 0) { terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err", - pSma->name, stbname, suid, colId); + pSma->name, stbFullName, suid, colId); return -1; } @@ -1218,7 +1264,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t if ((pCol->tableId != suid) && (pSma->stbUid != suid)) { mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); - goto NEXT2; + goto NEXT; } if ((pCol->colId) > 0 && (pCol->colId == colId)) { sdbRelease(pSdb, pSma); @@ -1230,11 +1276,24 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); } - NEXT2: + NEXT: sdbRelease(pSdb, pSma); nodesDestroyNode(pAst); } + return 0; +} +int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) { + if (mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId) < 0) { + return -1; + } + if (mndCheckAlterColForStream(pMnode, stbFullName, suid, colId) < 0) { + return -1; + } + + if (mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId) < 0) { + return -1; + } return 0; } @@ -1930,6 +1989,90 @@ _OVER: return code; } +static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SMqTopicObj *pTopic = NULL; + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) { + if (pTopic->stbUid == suid) { + sdbRelease(pSdb, pTopic); + return -1; + } + } + + if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { + sdbRelease(pSdb, pTopic); + continue; + } + + SNode *pAst = NULL; + if (nodesStringToNode(pTopic->ast, &pAst) != 0) { + ASSERT(0); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + + if (pCol->tableId != suid) { + mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + sdbRelease(pSdb, pTopic); + nodesDestroyNode(pAst); + return -1; + } else { + goto NEXT; + } + } + NEXT: + sdbRelease(pSdb, pTopic); + nodesDestroyNode(pAst); + } + return 0; +} + +static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SStreamObj *pStream = NULL; + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) break; + + SNode *pAst = NULL; + if (nodesStringToNode(pStream->ast, &pAst) != 0) { + ASSERT(0); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + + if (pCol->tableId != suid) { + mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId); + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + return -1; + } else { + goto NEXT; + } + } + NEXT: + sdbRelease(pSdb, pStream); + nodesDestroyNode(pAst); + } + return 0; +} + static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; @@ -1971,6 +2114,16 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) { goto _OVER; } + if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) { + terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED; + goto _OVER; + } + + if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) { + terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; + goto _OVER; + } + code = mndDropStb(pMnode, pReq, pDb, pStb); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;