update stb
This commit is contained in:
parent
ff77967299
commit
f24e04e819
|
@ -228,7 +228,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03A9)
|
||||
#define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
|
||||
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
|
||||
#define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
|
||||
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
|
||||
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AD)
|
||||
|
||||
// mnode-func
|
||||
|
|
|
@ -608,7 +608,288 @@ static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pOld, SStbObj *pNew) { return 0; }
|
||||
static int32_t mndFindSuperTableTagIndex(const SStbObj *pStb, const char *tagName) {
|
||||
for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
|
||||
if (strcasecmp(pStb->pTags[tag].name, tagName) == 0) {
|
||||
return tag;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *colName) {
|
||||
for (int32_t col = 0; col < pStb->numOfColumns; col++) {
|
||||
if (strcasecmp(pStb->pColumns[col].name, colName) == 0) {
|
||||
return col;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
|
||||
pNew->pTags = calloc(pNew->numOfTags, sizeof(SSchema));
|
||||
pNew->pColumns = calloc(pNew->numOfColumns, sizeof(SSchema));
|
||||
if (pNew->pTags == NULL || pNew->pColumns == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
|
||||
memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ntags) {
|
||||
if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_TAGS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pOld->numOfColumns + ntags + pOld->numOfTags > TSDB_MAX_COLUMNS) {
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ntags; i++) {
|
||||
if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
pNew->numOfTags = pNew->numOfTags + ntags;
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pNew->pTags + pOld->numOfTags, pSchemas, sizeof(SSchema) * ntags);
|
||||
|
||||
for (int32_t i = pOld->numOfTags; i < pNew->numOfTags; i++) {
|
||||
SSchema *pSchema = &pNew->pTags[i];
|
||||
pSchema->colId = pNew->nextColId;
|
||||
pNew->nextColId++;
|
||||
}
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to add tag %s", pNew->name, pSchemas[0].name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memmove(pNew->pTags + tag, pNew->pTags + tag + 1, sizeof(SSchema) * (pNew->numOfTags - tag - 1));
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to drop tag %s", pNew->name, tagName);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndModifySuperTableTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName,
|
||||
const char *newTagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t len = (int32_t)strlen(newTagName);
|
||||
if (len >= TSDB_COL_NAME_LEN) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSchema *pSchema = (SSchema *)(pNew->pTags + tag);
|
||||
memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndChangeSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!(pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR)) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSchema *pTag = pNew->pTags + tag;
|
||||
if (pSchema->bytes <= pTag->bytes) {
|
||||
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTag->bytes = pSchema->bytes;
|
||||
pNew->version++;
|
||||
|
||||
mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pSchema->name, pSchema->bytes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ncols) {
|
||||
if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ncols; i++) {
|
||||
if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
pNew->numOfColumns = pNew->numOfColumns + ncols;
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pNew->pColumns + pOld->numOfColumns, pSchemas, sizeof(SSchema) * ncols);
|
||||
|
||||
for (int32_t i = pOld->numOfColumns; i < pNew->numOfColumns; i++) {
|
||||
SSchema *pSchema = &pNew->pColumns[i];
|
||||
pSchema->colId = pNew->nextColId;
|
||||
pNew->nextColId++;
|
||||
}
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to add column %s", pNew->name, pSchemas[0].name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
|
||||
if (col <= 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * (pNew->numOfColumns - col - 1));
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to drop col %s", pNew->name, colName);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndChangeSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name);
|
||||
if (col < 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!(pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR)) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
uint32_t nLen = 0;
|
||||
for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
|
||||
nLen += (pOld->pColumns[i].colId == col) ? pSchema->bytes : pOld->pColumns[i].bytes;
|
||||
}
|
||||
|
||||
if (nLen > TSDB_MAX_BYTES_PER_ROW) {
|
||||
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSchema *pCol = pNew->pColumns + col;
|
||||
if (pSchema->bytes <= pCol->bytes) {
|
||||
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pCol->bytes = pSchema->bytes;
|
||||
pNew->version++;
|
||||
|
||||
mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pSchema->name, pSchema->bytes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(const SMAlterStbReq *pAlter, const SStbObj *pOld, SStbObj *pNew) {
|
||||
int32_t code = 0;
|
||||
|
||||
switch (pAlter->alterType) {
|
||||
case TSDB_ALTER_TABLE_ADD_TAG_COLUMN:
|
||||
code = mndAddSuperTableTag(pOld, pNew, pAlter->pSchema, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_TAG_COLUMN:
|
||||
code = mndDropSuperTableTag(pOld, pNew, pAlter->pSchema[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN:
|
||||
code = mndModifySuperTableTagName(pOld, pNew, pAlter->pSchema[0].name, pAlter->pSchema[1].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN:
|
||||
code = mndChangeSuperTableTag(pOld, pNew, &pAlter->pSchema[0]);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
code = mndAddSuperTableColumn(pOld, pNew, pAlter->pSchema, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
code = mndDropSuperTableColumn(pOld, pNew, pAlter->pSchema[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_CHANGE_COLUMN:
|
||||
code = mndChangeSuperTableColumn(pOld, pNew, &pAlter->pSchema[0]);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
break;
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
tfree(pNew->pTags);
|
||||
tfree(pNew->pColumns);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
|
@ -629,9 +910,13 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
|||
}
|
||||
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pStb->lock);
|
||||
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
||||
stbObj.pColumns = NULL;
|
||||
stbObj.pTags = NULL;
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
|
||||
int32_t code = mndUpdateStb(pMnode, pReq, pStb, &stbObj);
|
||||
int32_t code = mndUpdateStb(pAlter, pStb, &stbObj);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
if (code != 0) {
|
||||
|
|
|
@ -236,9 +236,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, "Too many tags")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_ALREAY_EXIST, "Tag already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREAY_EXIST, "Column already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES, "Exceed max row bytes")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREAY_EXIST, "Column already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
||||
|
||||
// mnode-func
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_ALREADY_EXIST, "Func already exists")
|
||||
|
|
Loading…
Reference in New Issue