Merge pull request #16183 from taosdata/feature/tq
refactor(mnode): check drop and alter stb for stream and topic
This commit is contained in:
commit
2f8f2e0976
|
@ -1145,7 +1145,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) {
|
static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1154,7 +1154,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||||
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
|
pTopic->name, stbFullName, suid, colId, pTopic->subType, pTopic->sql);
|
||||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
continue;
|
continue;
|
||||||
|
@ -1192,20 +1192,66 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
SNode *pAst = NULL;
|
||||||
|
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList *pNodeList = NULL;
|
||||||
|
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||||
|
SNode *pNode = NULL;
|
||||||
|
FOREACH(pNode, pNodeList) {
|
||||||
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
|
|
||||||
|
if (pCol->tableId != suid) {
|
||||||
|
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
||||||
|
goto NEXT;
|
||||||
|
}
|
||||||
|
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
|
||||||
|
mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
||||||
|
}
|
||||||
|
|
||||||
|
NEXT:
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname,
|
mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name,
|
||||||
suid, colId, pSma->sql);
|
stbFullName, suid, colId, pSma->sql);
|
||||||
|
|
||||||
SNode *pAst = NULL;
|
SNode *pAst = NULL;
|
||||||
if (nodesStringToNode(pSma->ast, &pAst) != 0) {
|
if (nodesStringToNode(pSma->ast, &pAst) != 0) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
|
||||||
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
|
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
|
||||||
pSma->name, stbname, suid, colId);
|
pSma->name, stbFullName, suid, colId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1218,7 +1264,7 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
|
||||||
|
|
||||||
if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
|
if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
|
||||||
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||||
goto NEXT2;
|
goto NEXT;
|
||||||
}
|
}
|
||||||
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
|
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
|
@ -1230,11 +1276,24 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t
|
||||||
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||||
}
|
}
|
||||||
|
|
||||||
NEXT2:
|
NEXT:
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbFullName, int64_t suid, col_id_t colId) {
|
||||||
|
if (mndCheckAlterColForTopic(pMnode, stbFullName, suid, colId) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (mndCheckAlterColForStream(pMnode, stbFullName, suid, colId) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckAlterColForTSma(pMnode, stbFullName, suid, colId) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1930,6 +1989,90 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
SMqTopicObj *pTopic = NULL;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
if (pTopic->stbUid == suid) {
|
||||||
|
sdbRelease(pSdb, pTopic);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
sdbRelease(pSdb, pTopic);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode *pAst = NULL;
|
||||||
|
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList *pNodeList = NULL;
|
||||||
|
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||||
|
SNode *pNode = NULL;
|
||||||
|
FOREACH(pNode, pNodeList) {
|
||||||
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
|
|
||||||
|
if (pCol->tableId != suid) {
|
||||||
|
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||||
|
sdbRelease(pSdb, pTopic);
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
goto NEXT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NEXT:
|
||||||
|
sdbRelease(pSdb, pTopic);
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName, int64_t suid) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
SNode *pAst = NULL;
|
||||||
|
if (nodesStringToNode(pStream->ast, &pAst) != 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList *pNodeList = NULL;
|
||||||
|
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||||
|
SNode *pNode = NULL;
|
||||||
|
FOREACH(pNode, pNodeList) {
|
||||||
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
|
|
||||||
|
if (pCol->tableId != suid) {
|
||||||
|
mDebug("stream:%s, check colId:%d passed", pStream->name, pCol->colId);
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
goto NEXT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
NEXT:
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -1971,6 +2114,16 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mndCheckDropStbForTopic(pMnode, dropReq.name, pStb->uid) < 0) {
|
||||||
|
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckDropStbForStream(pMnode, dropReq.name, pStb->uid) < 0) {
|
||||||
|
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
code = mndDropStb(pMnode, pReq, pDb, pStb);
|
code = mndDropStb(pMnode, pReq, pDb, pStb);
|
||||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue