Merge pull request #13489 from taosdata/fix/mnode
enh: should not alter stb if field used by topic
This commit is contained in:
commit
9c21f48405
|
@ -177,8 +177,8 @@ typedef struct {
|
|||
typedef struct SField {
|
||||
char name[TSDB_COL_NAME_LEN];
|
||||
uint8_t type;
|
||||
int32_t bytes;
|
||||
int8_t flags;
|
||||
int32_t bytes;
|
||||
} SField;
|
||||
|
||||
typedef struct SRetention {
|
||||
|
|
|
@ -235,6 +235,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03AC)
|
||||
#define TSDB_CODE_MND_COLUMN_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AD)
|
||||
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AE)
|
||||
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AF)
|
||||
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03B0)
|
||||
|
||||
// mnode-infoSchema
|
||||
|
|
|
@ -37,7 +37,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
|||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colIds);
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "mndPerfSchema.h"
|
||||
#include "mndScheduler.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndTopic.h"
|
||||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
|
@ -394,14 +395,14 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
|||
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
|
||||
req.pRSmaParam.delay = pStb->delay;
|
||||
if (pStb->ast1Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len,
|
||||
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
if (pStb->ast2Len > 0) {
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len,
|
||||
req.pRSmaParam.xFilesFactor) != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -949,13 +950,18 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -968,7 +974,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
|
||||
static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
|
||||
if ((int32_t)taosArrayGetSize(pFields) != 2) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
|
@ -986,6 +992,11 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
|
|||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST;
|
||||
return -1;
|
||||
|
@ -1008,13 +1019,18 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[tag].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1075,7 +1091,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) {
|
||||
static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *colName) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
|
||||
if (col < 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
|
@ -1092,6 +1108,11 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[col].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1104,7 +1125,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
|
||||
if (col < 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
|
@ -1121,6 +1142,11 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return -1;
|
||||
}
|
||||
|
||||
col_id_t colId = pOld->pTags[col].colId;
|
||||
if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -1199,7 +1225,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
|
||||
taosRLockLatch(&pStb->lock);
|
||||
|
||||
|
@ -1269,13 +1294,13 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
|
|||
return code;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont, int32_t *pLen) {
|
||||
static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, SStbObj *pObj, void **pCont,
|
||||
int32_t *pLen) {
|
||||
int ret;
|
||||
SEncoder ec = {0};
|
||||
uint32_t contLen = 0;
|
||||
uint32_t contLen = 0;
|
||||
SMAlterStbRsp alterRsp = {0};
|
||||
SName name = {0};
|
||||
SName name = {0};
|
||||
tNameFromString(&name, pAlter->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
|
||||
alterRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
||||
|
@ -1283,20 +1308,20 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
ret = mndBuildStbSchemaImp(pDb, pObj, name.tname, alterRsp.pMeta);
|
||||
if (ret) {
|
||||
tFreeSMAlterStbRsp(&alterRsp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
tEncodeSize(tEncodeSMAlterStbRsp, &alterRsp, contLen, ret);
|
||||
if (ret) {
|
||||
tFreeSMAlterStbRsp(&alterRsp);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void* cont = taosMemoryMalloc(contLen);
|
||||
void *cont = taosMemoryMalloc(contLen);
|
||||
tEncoderInit(&ec, cont, contLen);
|
||||
tEncodeSMAlterStbRsp(&ec, &alterRsp);
|
||||
tEncoderClear(&ec);
|
||||
|
@ -1305,24 +1330,24 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, const SMAlterStbReq *pAlter, S
|
|||
|
||||
*pCont = cont;
|
||||
*pLen = contLen;
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||
bool needRsp = true;
|
||||
bool needRsp = true;
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = NULL;
|
||||
SField *pField0 = NULL;
|
||||
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pOld->lock);
|
||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||
taosRUnLockLatch(&pOld->lock);
|
||||
stbObj.pColumns = NULL;
|
||||
stbObj.pTags = NULL;
|
||||
stbObj.updateTime = taosGetTimestampMs();
|
||||
stbObj.lock = 0;
|
||||
taosRUnLockLatch(&pOld->lock);
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = NULL;
|
||||
SField *pField0 = NULL;
|
||||
|
||||
switch (pAlter->alterType) {
|
||||
case TSDB_ALTER_TABLE_ADD_TAG:
|
||||
|
@ -1330,25 +1355,25 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
|||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_TAG:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndDropSuperTableTag(pOld, &stbObj, pField0->name);
|
||||
code = mndDropSuperTableTag(pMnode, pOld, &stbObj, pField0->name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
|
||||
code = mndAlterStbTagName(pOld, &stbObj, pAlter->pFields);
|
||||
code = mndAlterStbTagName(pMnode, pOld, &stbObj, pAlter->pFields);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndAlterStbTagBytes(pOld, &stbObj, pField0);
|
||||
code = mndAlterStbTagBytes(pMnode, pOld, &stbObj, pField0);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndDropSuperTableColumn(pOld, &stbObj, pField0->name);
|
||||
code = mndDropSuperTableColumn(pMnode, pOld, &stbObj, pField0->name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||
pField0 = taosArrayGet(pAlter->pFields, 0);
|
||||
code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
|
||||
code = mndAlterStbColumnBytes(pMnode, pOld, &stbObj, pField0);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_OPTIONS:
|
||||
needRsp = false;
|
||||
|
@ -1370,12 +1395,12 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
|
|||
mndTransSetDbName(pTrans, pDb->name);
|
||||
|
||||
if (needRsp) {
|
||||
void* pCont = NULL;
|
||||
void *pCont = NULL;
|
||||
int32_t contLen = 0;
|
||||
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen)) goto _OVER;
|
||||
if (mndBuildSMAlterStbRsp(pDb, pAlter, &stbObj, &pCont, &contLen) != 0) goto _OVER;
|
||||
mndTransSetRpcRsp(pTrans, pCont, contLen);
|
||||
}
|
||||
|
||||
|
||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
|
||||
|
|
|
@ -71,7 +71,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
|||
return strchr(topic, '.') + 1;
|
||||
}
|
||||
|
||||
bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *colAndTagIds) {
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
bool found = false;
|
||||
|
@ -105,20 +105,21 @@ bool mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, const SArray *col
|
|||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(colAndTagIds); i++) {
|
||||
int16_t *pColId = taosArrayGet(colAndTagIds, i);
|
||||
if (taosHashGet(pColHash, pColId, sizeof(int16_t)) != NULL) {
|
||||
found = true;
|
||||
goto NEXT;
|
||||
}
|
||||
if (taosHashGet(pColHash, &colId, sizeof(int16_t)) != NULL) {
|
||||
found = true;
|
||||
goto NEXT;
|
||||
}
|
||||
|
||||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
if (found) return false;
|
||||
if (found) {
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||
|
|
|
@ -225,14 +225,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_IN_DNODE, "Vgroup not in dnode")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_NOT_EXIST, "VGroup does not exist")
|
||||
|
||||
// mnode-stable
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "Stable already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "Stable not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "Stable confilct with topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_ALREADY_EXIST, "STable already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_NOT_EXIST, "STable not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC, "STable confilct with topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STBS, "Too many stables")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB, "Invalid stable name")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_ALTER_OPTION, "Invalid stable alter options")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "Stable option unchanged")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STB_OPTION_UNCHNAGED, "STable option unchanged")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREADY_EXIST, "Tag already exists")
|
||||
|
@ -240,6 +240,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
|
||||
|
||||
// mnode-infoSchema
|
||||
|
|
Loading…
Reference in New Issue