commit
ebb9051532
|
@ -113,13 +113,13 @@ typedef enum _mgmt_table {
|
|||
|
||||
#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1
|
||||
#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2
|
||||
#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_NAME 3
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
|
||||
|
||||
#define TSDB_ALTER_TABLE_ADD_COLUMN 5
|
||||
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
|
||||
#define TSDB_ALTER_TABLE_CHANGE_COLUMN 7
|
||||
#define TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN 8
|
||||
#define TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES 7
|
||||
#define TSDB_ALTER_TABLE_UPDATE_TAG_BYTES 8
|
||||
|
||||
#define TSDB_FILL_NONE 0
|
||||
#define TSDB_FILL_NULL 1
|
||||
|
@ -254,7 +254,7 @@ typedef struct {
|
|||
int8_t igExists;
|
||||
int32_t numOfTags;
|
||||
int32_t numOfColumns;
|
||||
SSchema pSchema[];
|
||||
SSchema pSchemas[];
|
||||
} SMCreateStbReq;
|
||||
|
||||
typedef struct {
|
||||
|
@ -264,10 +264,10 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t alterType;
|
||||
int32_t numOfColumns;
|
||||
SSchema pSchema[];
|
||||
} SMAlterStbReq;
|
||||
int8_t updateType;
|
||||
int32_t numOfSchemas;
|
||||
SSchema pSchemas[];
|
||||
} SMUpdateStbReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t pid;
|
||||
|
|
|
@ -228,7 +228,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_TOO_MANY_COLUMNS TAOS_DEF_ERROR_CODE(0, 0x03A9)
|
||||
#define TSDB_CODE_MND_COLUMN_ALREAY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AA)
|
||||
#define TSDB_CODE_MND_COLUMN_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03AB)
|
||||
#define TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
|
||||
#define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x03AC)
|
||||
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03AD)
|
||||
|
||||
// mnode-func
|
||||
|
|
|
@ -33,10 +33,10 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
|||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
|
||||
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
|
||||
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq);
|
||||
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
|
@ -53,10 +53,10 @@ int32_t mndInitStb(SMnode *pMnode) {
|
|||
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMUpdateStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVUpdateStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq);
|
||||
|
||||
|
@ -325,7 +325,7 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
|||
pCreate->numOfTags = htonl(pCreate->numOfTags);
|
||||
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
|
||||
for (int32_t i = 0; i < totalCols; ++i) {
|
||||
SSchema *pSchema = &pCreate->pSchema[i];
|
||||
SSchema *pSchema = &pCreate->pSchemas[i];
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
}
|
||||
|
||||
|
@ -346,7 +346,7 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
|||
|
||||
int32_t maxColId = (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS);
|
||||
for (int32_t i = 0; i < totalCols; ++i) {
|
||||
SSchema *pSchema = &pCreate->pSchema[i];
|
||||
SSchema *pSchema = &pCreate->pSchemas[i];
|
||||
if (pSchema->type < 0) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
|
@ -465,8 +465,8 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
|
||||
static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCreate, SDbObj *pDb) {
|
||||
SStbObj stbObj = {0};
|
||||
tstrncpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
tstrncpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
memcpy(stbObj.name, pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
memcpy(stbObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
stbObj.createdTime = taosGetTimestampMs();
|
||||
stbObj.updateTime = stbObj.createdTime;
|
||||
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
|
@ -477,18 +477,14 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
|
|||
stbObj.numOfTags = pCreate->numOfTags;
|
||||
|
||||
stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema));
|
||||
if (stbObj.pColumns == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema));
|
||||
|
||||
stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema));
|
||||
if (stbObj.pTags == NULL) {
|
||||
if (stbObj.pColumns == NULL || stbObj.pTags == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy(stbObj.pTags, pCreate->pSchema + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
|
||||
|
||||
memcpy(stbObj.pColumns, pCreate->pSchemas, stbObj.numOfColumns * sizeof(SSchema));
|
||||
memcpy(stbObj.pTags, pCreate->pSchemas + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
|
||||
|
||||
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
|
||||
stbObj.pColumns[i].colId = stbObj.nextColId;
|
||||
|
@ -579,11 +575,11 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
||||
pAlter->numOfColumns = htonl(pAlter->numOfColumns);
|
||||
static int32_t mndCheckUpdateStbReq(SMUpdateStbReq *pUpdate) {
|
||||
pUpdate->numOfSchemas = htonl(pUpdate->numOfSchemas);
|
||||
|
||||
for (int32_t i = 0; i < pAlter->numOfColumns; ++i) {
|
||||
SSchema *pSchema = &pAlter->pSchema[i];
|
||||
for (int32_t i = 0; i < pUpdate->numOfSchemas; ++i) {
|
||||
SSchema *pSchema = &pUpdate->pSchemas[i];
|
||||
pSchema->colId = htonl(pSchema->colId);
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
|
||||
|
@ -608,41 +604,405 @@ static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, SStbObj *pOld, SStbObj *pNew) { return 0; }
|
||||
static int32_t mndFindSuperTableTagIndex(const SStbObj *pStb, const char *tagName) {
|
||||
for (int32_t tag = 0; tag < pStb->numOfTags; tag++) {
|
||||
if (strcasecmp(pStb->pTags[tag].name, tagName) == 0) {
|
||||
return tag;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SMAlterStbReq *pAlter = pReq->rpcMsg.pCont;
|
||||
return -1;
|
||||
}
|
||||
|
||||
mDebug("stb:%s, start to alter", pAlter->name);
|
||||
static int32_t mndFindSuperTableColumnIndex(const SStbObj *pStb, const char *colName) {
|
||||
for (int32_t col = 0; col < pStb->numOfColumns; col++) {
|
||||
if (strcasecmp(pStb->pColumns[col].name, colName) == 0) {
|
||||
return col;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndCheckAlterStbReq(pAlter) != 0) {
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
|
||||
pNew->pTags = calloc(pNew->numOfTags, sizeof(SSchema));
|
||||
pNew->pColumns = calloc(pNew->numOfColumns, sizeof(SSchema));
|
||||
if (pNew->pTags == NULL || pNew->pColumns == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pAlter->name);
|
||||
memcpy(pNew->pColumns, pOld->pColumns, sizeof(SSchema) * pOld->numOfColumns);
|
||||
memcpy(pNew->pTags, pOld->pTags, sizeof(SSchema) * pOld->numOfTags);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ntags) {
|
||||
if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_TAGS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pOld->numOfColumns + ntags + pOld->numOfTags > TSDB_MAX_COLUMNS) {
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ntags; i++) {
|
||||
if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
pNew->numOfTags = pNew->numOfTags + ntags;
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pNew->pTags + pOld->numOfTags, pSchemas, sizeof(SSchema) * ntags);
|
||||
|
||||
for (int32_t i = pOld->numOfTags; i < pNew->numOfTags; i++) {
|
||||
SSchema *pSchema = &pNew->pTags[i];
|
||||
pSchema->colId = pNew->nextColId;
|
||||
pNew->nextColId++;
|
||||
}
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to add tag %s", pNew->name, pSchemas[0].name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memmove(pNew->pTags + tag, pNew->pTags + tag + 1, sizeof(SSchema) * (pNew->numOfTags - tag - 1));
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to drop tag %s", pNew->name, tagName);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName,
|
||||
const char *newTagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, newTagName) >= 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t len = (int32_t)strlen(newTagName);
|
||||
if (len >= TSDB_COL_NAME_LEN) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSchema *pSchema = (SSchema *)(pNew->pTags + tag);
|
||||
memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!(pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR)) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSchema *pTag = pNew->pTags + tag;
|
||||
if (pSchema->bytes <= pTag->bytes) {
|
||||
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTag->bytes = pSchema->bytes;
|
||||
pNew->version++;
|
||||
|
||||
mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pSchema->name, pSchema->bytes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ncols) {
|
||||
if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
|
||||
terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < ncols; i++) {
|
||||
if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_ALREAY_EXIST;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
pNew->numOfColumns = pNew->numOfColumns + ncols;
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy(pNew->pColumns + pOld->numOfColumns, pSchemas, sizeof(SSchema) * ncols);
|
||||
|
||||
for (int32_t i = pOld->numOfColumns; i < pNew->numOfColumns; i++) {
|
||||
SSchema *pSchema = &pNew->pColumns[i];
|
||||
pSchema->colId = pNew->nextColId;
|
||||
pNew->nextColId++;
|
||||
}
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to add column %s", pNew->name, pSchemas[0].name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const char *colName) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, colName);
|
||||
if (col <= 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
memmove(pNew->pColumns + col, pNew->pColumns + col + 1, sizeof(SSchema) * (pNew->numOfColumns - col - 1));
|
||||
|
||||
pNew->version++;
|
||||
mDebug("stb:%s, start to drop col %s", pNew->name, colName);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name);
|
||||
if (col < 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!(pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR)) {
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
uint32_t nLen = 0;
|
||||
for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
|
||||
nLen += (pOld->pColumns[i].colId == col) ? pSchema->bytes : pOld->pColumns[i].bytes;
|
||||
}
|
||||
|
||||
if (nLen > TSDB_MAX_BYTES_PER_ROW) {
|
||||
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSchema *pCol = pNew->pColumns + col;
|
||||
if (pSchema->bytes <= pCol->bytes) {
|
||||
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pCol->bytes = pSchema->bytes;
|
||||
pNew->version++;
|
||||
|
||||
mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pSchema->name, pSchema->bytes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndSetUpdateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
int32_t contLen;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (pVgroup->dbUid != pDb->uid) continue;
|
||||
|
||||
void *pReq = mndBuildCreateStbReq(pMnode, pVgroup, pStb, &contLen);
|
||||
if (pReq == NULL) {
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_CREATE_STB;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return -1;
|
||||
}
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMUpdateStbReq *pUpdate, SDbObj *pDb, SStbObj *pOld) {
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pOld->lock);
|
||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||
stbObj.pColumns = NULL;
|
||||
stbObj.pTags = NULL;
|
||||
stbObj.updateTime = taosGetTimestampMs();
|
||||
taosRUnLockLatch(&pOld->lock);
|
||||
|
||||
int32_t code = -1;
|
||||
|
||||
switch (pUpdate->updateType) {
|
||||
case TSDB_ALTER_TABLE_ADD_TAG_COLUMN:
|
||||
code = mndAddSuperTableTag(pOld, &stbObj, pUpdate->pSchemas, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_TAG_COLUMN:
|
||||
code = mndDropSuperTableTag(pOld, &stbObj, pUpdate->pSchemas[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
|
||||
code = mndUpdateStbTagName(pOld, &stbObj, pUpdate->pSchemas[0].name, pUpdate->pSchemas[1].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
||||
code = mndUpdateStbTagBytes(pOld, &stbObj, &pUpdate->pSchemas[0]);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
code = mndAddSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
code = mndDropSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||
code = mndUpdateStbColumnBytes(pOld, &stbObj, &pUpdate->pSchemas[0]);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
break;
|
||||
}
|
||||
|
||||
if (code != 0) goto UPDATE_STB_OVER;
|
||||
|
||||
code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto UPDATE_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to update stb:%s", pTrans->id, pUpdate->name);
|
||||
|
||||
if (mndSetUpdateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndSetUpdateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndSetUpdateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_STB_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
UPDATE_STB_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
tfree(stbObj.pTags);
|
||||
tfree(stbObj.pColumns);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SMUpdateStbReq *pUpdate = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to update", pUpdate->name);
|
||||
|
||||
if (mndCheckUpdateStbReq(pUpdate) != 0) {
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pUpdate->name);
|
||||
if (pStb == NULL) {
|
||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SStbObj stbObj = {0};
|
||||
memcpy(&stbObj, pStb, sizeof(SStbObj));
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pUpdate->name);
|
||||
if (pDb == NULL) {
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndUpdateStb(pMnode, pReq, pStb, &stbObj);
|
||||
int32_t code = mndUpdateStb(pMnode, pReq, pUpdate, pDb, pStb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
if (code != 0) {
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, tstrerror(code));
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) {
|
||||
static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp) {
|
||||
mndTransProcessRsp(pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -67,35 +67,35 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
|||
pReq->numOfColumns = htonl(cols);
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchema[0];
|
||||
SSchema* pSchema = &pReq->pSchemas[0];
|
||||
pSchema->bytes = htonl(8);
|
||||
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema->name, "ts");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchema[1];
|
||||
SSchema* pSchema = &pReq->pSchemas[1];
|
||||
pSchema->bytes = htonl(4);
|
||||
pSchema->type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema->name, "col1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchema[2];
|
||||
SSchema* pSchema = &pReq->pSchemas[2];
|
||||
pSchema->bytes = htonl(2);
|
||||
pSchema->type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema->name, "tag1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchema[3];
|
||||
SSchema* pSchema = &pReq->pSchemas[3];
|
||||
pSchema->bytes = htonl(8);
|
||||
pSchema->type = TSDB_DATA_TYPE_BIGINT;
|
||||
strcpy(pSchema->name, "tag2");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchema[4];
|
||||
SSchema* pSchema = &pReq->pSchemas[4];
|
||||
pSchema->bytes = htonl(16);
|
||||
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema->name, "tag3");
|
||||
|
|
|
@ -813,7 +813,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
|
|||
|
||||
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
|
||||
X.n += F.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, -1);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, -1);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
|
||||
|
@ -842,7 +842,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
|
|||
toTSDBType(Z.type);
|
||||
A = tListItemAppendToken(A, &Z, -1);
|
||||
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, -1);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
|
||||
|
@ -859,7 +859,7 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
|
|||
|
||||
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
|
||||
X.n += F.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, -1);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, -1);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
|
||||
|
@ -882,7 +882,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
|
|||
|
||||
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
|
||||
X.n += F.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, TSDB_SUPER_TABLE);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, TSDB_SUPER_TABLE);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
|
||||
|
@ -911,7 +911,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
|
|||
toTSDBType(Z.type);
|
||||
A = tListItemAppendToken(A, &Z, -1);
|
||||
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TSDB_SUPER_TABLE);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
|
||||
|
@ -928,7 +928,7 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
|
|||
|
||||
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
|
||||
X.n += F.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, TSDB_SUPER_TABLE);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, TSDB_SUPER_TABLE);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
|
||||
|
|
|
@ -610,7 +610,7 @@ SAlterTableInfo *tSetAlterTableInfo(SToken *pTableName, SArray *pCols, SArray *p
|
|||
pAlterTable->type = type;
|
||||
pAlterTable->tableType = tableType;
|
||||
|
||||
if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || type == TSDB_ALTER_TABLE_CHANGE_COLUMN || type == TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN) {
|
||||
if (type == TSDB_ALTER_TABLE_ADD_COLUMN || type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN || type == TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES || type == TSDB_ALTER_TABLE_UPDATE_TAG_BYTES) {
|
||||
pAlterTable->pAddColumns = pCols;
|
||||
assert(pVals == NULL);
|
||||
} else {
|
||||
|
|
|
@ -310,7 +310,7 @@ SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len
|
|||
pCreateStbMsg->numOfColumns = htonl(numOfCols);
|
||||
pCreateStbMsg->numOfTags = htonl(numOfTags);
|
||||
|
||||
pSchema = (SSchema*)pCreateStbMsg->pSchema;
|
||||
pSchema = (SSchema*)pCreateStbMsg->pSchemas;
|
||||
for (int i = 0; i < numOfCols; ++i) {
|
||||
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
|
||||
pSchema->type = pField->type;
|
||||
|
|
|
@ -3199,7 +3199,7 @@ static void yy_reduce(
|
|||
case 285: /* cmd ::= ALTER TABLE ids cpxName MODIFY COLUMN columnlist */
|
||||
{
|
||||
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, -1);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, -1);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
break;
|
||||
|
@ -3231,7 +3231,7 @@ static void yy_reduce(
|
|||
toTSDBType(yymsp[0].minor.yy0.type);
|
||||
A = tListItemAppendToken(A, &yymsp[0].minor.yy0, -1);
|
||||
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, -1);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, -1);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
break;
|
||||
|
@ -3250,7 +3250,7 @@ static void yy_reduce(
|
|||
case 290: /* cmd ::= ALTER TABLE ids cpxName MODIFY TAG columnlist */
|
||||
{
|
||||
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, -1);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, -1);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
break;
|
||||
|
@ -3275,7 +3275,7 @@ static void yy_reduce(
|
|||
case 293: /* cmd ::= ALTER STABLE ids cpxName MODIFY COLUMN columnlist */
|
||||
{
|
||||
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, TSDB_SUPER_TABLE);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, TSDB_SUPER_TABLE);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
break;
|
||||
|
@ -3307,7 +3307,7 @@ static void yy_reduce(
|
|||
toTSDBType(yymsp[0].minor.yy0.type);
|
||||
A = tListItemAppendToken(A, &yymsp[0].minor.yy0, -1);
|
||||
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN, TSDB_SUPER_TABLE);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-5].minor.yy0, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, TSDB_SUPER_TABLE);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
break;
|
||||
|
@ -3326,7 +3326,7 @@ static void yy_reduce(
|
|||
case 298: /* cmd ::= ALTER STABLE ids cpxName MODIFY TAG columnlist */
|
||||
{
|
||||
yymsp[-4].minor.yy0.n += yymsp[-3].minor.yy0.n;
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, TSDB_SUPER_TABLE);
|
||||
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&yymsp[-4].minor.yy0, yymsp[0].minor.yy165, NULL, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, TSDB_SUPER_TABLE);
|
||||
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -238,7 +238,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TAG_NOT_EXIST, "Tag does not exist")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_COLUMNS, "Too many columns")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREAY_EXIST, "Column already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_EXCEED_MAX_ROW_BYTES, "Exceed max row bytes")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes")
|
||||
|
||||
// mnode-func
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FUNC_ALREADY_EXIST, "Func already exists")
|
||||
|
|
Loading…
Reference in New Issue