Merge pull request #15645 from taosdata/feature/TD-11274-3.0
fix: check modifiable when drop tsma columns
This commit is contained in:
commit
5b34e4ef2c
|
@ -258,6 +258,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03C5)
|
||||
#define TSDB_CODE_MND_INVALID_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x03C6)
|
||||
#define TSDB_CODE_MND_STABLE_UID_NOT_MATCH TAOS_DEF_ERROR_CODE(0, 0x03C7)
|
||||
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA TAOS_DEF_ERROR_CODE(0, 0x03C8)
|
||||
|
||||
// mnode-trans
|
||||
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
||||
|
|
|
@ -37,8 +37,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
|||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -45,7 +45,9 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
|
|||
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen);
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
|
||||
void *alterOriData, int32_t alterOriDataLen);
|
||||
static int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId);
|
||||
|
||||
int32_t mndInitStb(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -409,7 +411,8 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void* alterOriData, int32_t alterOriDataLen) {
|
||||
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen,
|
||||
void *alterOriData, int32_t alterOriDataLen) {
|
||||
SEncoder encoder = {0};
|
||||
int32_t contLen;
|
||||
SName name = {0};
|
||||
|
@ -709,7 +712,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
|||
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
pDst->createdTime = taosGetTimestampMs();
|
||||
pDst->updateTime = pDst->createdTime;
|
||||
pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDst->uid =
|
||||
(pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDst->dbUid = pDb->uid;
|
||||
pDst->tagVer = 1;
|
||||
pDst->colVer = 1;
|
||||
|
@ -895,9 +899,9 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
|||
pSchema->flags = pField->flags;
|
||||
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||
int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name);
|
||||
if (cIndex >= 0){
|
||||
if (cIndex >= 0) {
|
||||
pSchema->colId = pStb->pColumns[cIndex].colId;
|
||||
}else{
|
||||
} else {
|
||||
pSchema->colId = pDst->nextColId++;
|
||||
}
|
||||
}
|
||||
|
@ -909,12 +913,11 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
|||
pSchema->bytes = pField->bytes;
|
||||
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||
int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name);
|
||||
if (cIndex >= 0){
|
||||
if (cIndex >= 0) {
|
||||
pSchema->colId = pStb->pTags[cIndex].colId;
|
||||
}else{
|
||||
} else {
|
||||
pSchema->colId = pDst->nextColId++;
|
||||
}
|
||||
|
||||
}
|
||||
pDst->tagVer = createReq->tagVer;
|
||||
pDst->colVer = createReq->colVer;
|
||||
|
@ -982,7 +985,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||
goto _OVER;
|
||||
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)){
|
||||
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
|
@ -1137,6 +1140,99 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId,
|
||||
pCol->tableId, pTopic->ctbStbUid);
|
||||
|
||||
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
goto NEXT;
|
||||
}
|
||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
||||
return -1;
|
||||
}
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
}
|
||||
|
||||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SSmaObj *pSma = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname,
|
||||
suid, colId, pSma->sql);
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pSma->ast, &pAst) != 0) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
|
||||
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
|
||||
pSma->name, stbname, suid, colId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
mDebug("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId);
|
||||
|
||||
if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
|
||||
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||
goto NEXT2;
|
||||
}
|
||||
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
|
||||
mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
|
||||
return -1;
|
||||
}
|
||||
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||
}
|
||||
|
||||
NEXT2:
|
||||
sdbRelease(pSdb, pSma);
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
|
||||
if (tag < 0) {
|
||||
|
@ -1380,7 +1476,8 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void* alterOriData, int32_t alterOriDataLen) {
|
||||
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData,
|
||||
int32_t alterOriDataLen) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
@ -1607,7 +1704,8 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen) {
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
|
||||
void *alterOriData, int32_t alterOriDataLen) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
|
|
@ -72,56 +72,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
|||
return strchr(topic, '.') + 1;
|
||||
}
|
||||
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId, pTopic->ctbStbUid);
|
||||
|
||||
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
goto NEXT;
|
||||
}
|
||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
||||
return -1;
|
||||
}
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
}
|
||||
|
||||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
|
|
@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SCHEMA_VER, "Invalid schema version while alter stb")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STABLE_UID_NOT_MATCH, "Invalid stable uid while alter stb")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA, "Field used by tsma")
|
||||
|
||||
// mnode-trans
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
|
||||
|
|
Loading…
Reference in New Issue