create stb
This commit is contained in:
parent
20489c9a38
commit
7b942097c1
|
@ -249,14 +249,24 @@ typedef struct SSchema {
|
||||||
char name[TSDB_COL_NAME_LEN];
|
char name[TSDB_COL_NAME_LEN];
|
||||||
} SSchema;
|
} SSchema;
|
||||||
|
|
||||||
|
typedef struct SField {
|
||||||
|
char name[TSDB_COL_NAME_LEN];
|
||||||
|
uint8_t type;
|
||||||
|
int32_t bytes;
|
||||||
|
} SField;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
int32_t numOfTags;
|
|
||||||
int32_t numOfColumns;
|
int32_t numOfColumns;
|
||||||
SSchema pSchemas[];
|
int32_t numOfTags;
|
||||||
|
SArray* pColumns;
|
||||||
|
SArray* pTags;
|
||||||
} SMCreateStbReq;
|
} SMCreateStbReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSMCreateStbReq(void** buf, SMCreateStbReq* pReq);
|
||||||
|
void* tDeserializeSMCreateStbReq(void* buf, SMCreateStbReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
|
|
|
@ -37,12 +37,6 @@ typedef struct SQueryNode {
|
||||||
|
|
||||||
#define queryNodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
|
#define queryNodeType(nodeptr) (((const SQueryNode*)(nodeptr))->type)
|
||||||
|
|
||||||
typedef struct SField {
|
|
||||||
char name[TSDB_COL_NAME_LEN];
|
|
||||||
uint8_t type;
|
|
||||||
int32_t bytes;
|
|
||||||
} SField;
|
|
||||||
|
|
||||||
typedef struct SFieldInfo {
|
typedef struct SFieldInfo {
|
||||||
int16_t numOfOutput; // number of column in result
|
int16_t numOfOutput; // number of column in result
|
||||||
SField *final;
|
SField *final;
|
||||||
|
|
|
@ -336,3 +336,66 @@ void *tDeserializeSVDropTbReq(void *buf, SVDropTbReq *pReq) {
|
||||||
buf = taosDecodeFixedU8(buf, &pReq->type);
|
buf = taosDecodeFixedU8(buf, &pReq->type);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSMCreateStbReq(void **buf, SMCreateStbReq *pReq) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
tlen += taosEncodeString(buf, pReq->name);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pReq->numOfColumns);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pReq->numOfTags);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
|
||||||
|
SField *pField = taosArrayGet(pReq->pColumns, i);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pField->type);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pField->bytes);
|
||||||
|
tlen += taosEncodeString(buf, pField->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
||||||
|
SField *pField = taosArrayGet(pReq->pTags, i);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pField->type);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pField->bytes);
|
||||||
|
tlen += taosEncodeString(buf, pField->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *tDeserializeSMCreateStbReq(void *buf, SMCreateStbReq *pReq) {
|
||||||
|
buf = taosDecodeStringTo(buf, pReq->name);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pReq->igExists);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pReq->numOfColumns);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pReq->numOfTags);
|
||||||
|
|
||||||
|
pReq->pColumns = taosArrayInit(pReq->numOfColumns, sizeof(SField));
|
||||||
|
pReq->pTags = taosArrayInit(pReq->numOfTags, sizeof(SField));
|
||||||
|
if (pReq->pColumns == NULL || pReq->pTags == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pReq->numOfColumns; ++i) {
|
||||||
|
SField field = {0};
|
||||||
|
buf = taosDecodeFixedI8(buf, &field.type);
|
||||||
|
buf = taosDecodeFixedI32(buf, &field.bytes);
|
||||||
|
buf = taosDecodeStringTo(buf, field.name);
|
||||||
|
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
|
||||||
|
SField field = {0};
|
||||||
|
buf = taosDecodeFixedI8(buf, &field.type);
|
||||||
|
buf = taosDecodeFixedI32(buf, &field.bytes);
|
||||||
|
buf = taosDecodeStringTo(buf, field.name);
|
||||||
|
if (taosArrayPush(pReq->pTags, &field) == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
|
@ -323,14 +323,6 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||||
pCreate->numOfColumns = htonl(pCreate->numOfColumns);
|
|
||||||
pCreate->numOfTags = htonl(pCreate->numOfTags);
|
|
||||||
int32_t totalCols = pCreate->numOfColumns + pCreate->numOfTags;
|
|
||||||
for (int32_t i = 0; i < totalCols; ++i) {
|
|
||||||
SSchema *pSchema = &pCreate->pSchemas[i];
|
|
||||||
pSchema->bytes = htonl(pSchema->bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCreate->igExists < 0 || pCreate->igExists > 1) {
|
if (pCreate->igExists < 0 || pCreate->igExists > 1) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -346,23 +338,39 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchema *pSchema = &pCreate->pSchemas[0];
|
SField *pField = taosArrayGet(pCreate->pColumns, 0) ;
|
||||||
if (pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < totalCols; ++i) {
|
for (int32_t i = 0; i < pCreate->numOfColumns; ++i) {
|
||||||
SSchema *pSchema = &pCreate->pSchemas[i];
|
SField *pField = taosArrayGet(pCreate->pColumns, i);
|
||||||
if (pSchema->type < 0) {
|
if (pField->type < 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pSchema->bytes <= 0) {
|
if (pField->bytes <= 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pSchema->name[0] == 0) {
|
if (pField->name[0] == 0) {
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pCreate->numOfTags; ++i) {
|
||||||
|
SField *pField = taosArrayGet(pCreate->pTags, i);
|
||||||
|
if (pField->type < 0) {
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pField->bytes <= 0) {
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (pField->name[0] == 0) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -490,16 +498,23 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(stbObj.pColumns, pCreate->pSchemas, stbObj.numOfColumns * sizeof(SSchema));
|
|
||||||
memcpy(stbObj.pTags, pCreate->pSchemas + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
|
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
|
||||||
stbObj.pColumns[i].colId = stbObj.nextColId;
|
SField *pField = taosArrayGet(pCreate->pColumns, i);
|
||||||
|
SSchema *pSchema = &stbObj.pColumns[i];
|
||||||
|
pSchema->type = pField->type;
|
||||||
|
pSchema->bytes = pField->bytes;
|
||||||
|
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||||
|
pSchema->colId = stbObj.nextColId;
|
||||||
stbObj.nextColId++;
|
stbObj.nextColId++;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < stbObj.numOfTags; ++i) {
|
for (int32_t i = 0; i < stbObj.numOfTags; ++i) {
|
||||||
stbObj.pTags[i].colId = stbObj.nextColId;
|
SField *pField = taosArrayGet(pCreate->pTags, i);
|
||||||
|
SSchema *pSchema = &stbObj.pTags[i];
|
||||||
|
pSchema->type = pField->type;
|
||||||
|
pSchema->bytes = pField->bytes;
|
||||||
|
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||||
|
pSchema->colId = stbObj.nextColId;
|
||||||
stbObj.nextColId++;
|
stbObj.nextColId++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -524,57 +539,60 @@ CREATE_STB_OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
|
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
SMCreateStbReq *pCreate = pReq->rpcMsg.pCont;
|
int32_t code = -1;
|
||||||
|
SStbObj *pTopicStb = NULL;
|
||||||
|
SStbObj *pStb = NULL;
|
||||||
|
SDbObj *pDb = NULL;
|
||||||
|
SMCreateStbReq createReq = {0};
|
||||||
|
|
||||||
mDebug("stb:%s, start to create", pCreate->name);
|
if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, &createReq) == NULL) goto CREATE_STB_OVER;
|
||||||
|
|
||||||
if (mndCheckCreateStbReq(pCreate) != 0) {
|
mDebug("stb:%s, start to create", createReq.name);
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
if (mndCheckCreateStbReq(&createReq) != 0) goto CREATE_STB_OVER;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name);
|
pStb = mndAcquireStb(pMnode, createReq.name);
|
||||||
if (pStb != NULL) {
|
if (pStb != NULL) {
|
||||||
mndReleaseStb(pMnode, pStb);
|
if (createReq.igExists) {
|
||||||
if (pCreate->igExists) {
|
mDebug("stb:%s, already exist, ignore exist is set", createReq.name);
|
||||||
mDebug("stb:%s, already exist, ignore exist is set", pCreate->name);
|
code = 0;
|
||||||
return 0;
|
goto CREATE_STB_OVER;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
|
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
goto CREATE_STB_OVER;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
goto CREATE_STB_OVER;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// topic should have different name with stb
|
pTopicStb = mndAcquireStb(pMnode, createReq.name);
|
||||||
SStbObj *pTopicStb = mndAcquireStb(pMnode, pCreate->name);
|
|
||||||
if (pTopicStb != NULL) {
|
if (pTopicStb != NULL) {
|
||||||
mndReleaseStb(pMnode, pTopicStb);
|
|
||||||
terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC;
|
terrno = TSDB_CODE_MND_NAME_CONFLICT_WITH_TOPIC;
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
goto CREATE_STB_OVER;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pCreate->name);
|
pDb = mndAcquireDbByStb(pMnode, createReq.name);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
goto CREATE_STB_OVER;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = mndCreateStb(pMnode, pReq, pCreate, pDb);
|
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
|
|
||||||
|
CREATE_STB_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("stb:%s, failed to create since %s", createReq.name, terrstr());
|
||||||
return -1;
|
} else {
|
||||||
|
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
mndReleaseStb(pMnode, pTopicStb);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
taosArrayClear(createReq.pColumns);
|
||||||
|
taosArrayClear(createReq.pTags);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
|
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
|
||||||
|
|
|
@ -22,19 +22,19 @@ class MndTestStb : public ::testing::Test {
|
||||||
void SetUp() override {}
|
void SetUp() override {}
|
||||||
void TearDown() override {}
|
void TearDown() override {}
|
||||||
|
|
||||||
SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
|
SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
|
||||||
SDropDbReq* BuildDropDbReq(const char* dbname, int32_t* pContLen);
|
SDropDbReq* BuildDropDbReq(const char* dbname, int32_t* pContLen);
|
||||||
SMCreateStbReq* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
|
void* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
|
||||||
SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen);
|
SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen);
|
||||||
SMAltertbReq* BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen);
|
SMAltertbReq* BuildAlterStbDropTagReq(const char* stbname, const char* tagname, int32_t* pContLen);
|
||||||
SMAltertbReq* BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname,
|
SMAltertbReq* BuildAlterStbUpdateTagNameReq(const char* stbname, const char* tagname, const char* newtagname,
|
||||||
int32_t* pContLen);
|
int32_t* pContLen);
|
||||||
SMAltertbReq* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes,
|
SMAltertbReq* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes,
|
||||||
int32_t* pContLen);
|
int32_t* pContLen);
|
||||||
SMAltertbReq* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
SMAltertbReq* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
||||||
SMAltertbReq* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
SMAltertbReq* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
||||||
SMAltertbReq* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
|
SMAltertbReq* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
|
||||||
int32_t* pContLen);
|
int32_t* pContLen);
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase MndTestStb::test;
|
Testbase MndTestStb::test;
|
||||||
|
@ -78,53 +78,62 @@ SDropDbReq* MndTestStb::BuildDropDbReq(const char* dbname, int32_t* pContLen) {
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMCreateStbReq* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
|
void* MndTestStb::BuildCreateStbReq(const char* stbname, int32_t* pContLen) {
|
||||||
int32_t cols = 2;
|
SMCreateStbReq createReq = {0};
|
||||||
int32_t tags = 3;
|
createReq.numOfColumns = 2;
|
||||||
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
|
createReq.numOfTags = 3;
|
||||||
|
createReq.igExists = 0;
|
||||||
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
|
createReq.pColumns = taosArrayInit(createReq.numOfColumns, sizeof(SField));
|
||||||
strcpy(pReq->name, stbname);
|
createReq.pTags = taosArrayInit(createReq.numOfTags, sizeof(SField));
|
||||||
pReq->numOfTags = htonl(tags);
|
strcpy(createReq.name, stbname);
|
||||||
pReq->numOfColumns = htonl(cols);
|
|
||||||
|
|
||||||
{
|
{
|
||||||
SSchema* pSchema = &pReq->pSchemas[0];
|
SField field = {0};
|
||||||
pSchema->bytes = htonl(8);
|
field.bytes = 8;
|
||||||
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
|
field.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
strcpy(pSchema->name, "ts");
|
strcpy(field.name, "ts");
|
||||||
|
taosArrayPush(createReq.pColumns, &field);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SSchema* pSchema = &pReq->pSchemas[1];
|
SField field = {0};
|
||||||
pSchema->bytes = htonl(12);
|
field.bytes = 12;
|
||||||
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
field.type = TSDB_DATA_TYPE_BINARY;
|
||||||
strcpy(pSchema->name, "col1");
|
strcpy(field.name, "col1");
|
||||||
|
taosArrayPush(createReq.pColumns, &field);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SSchema* pSchema = &pReq->pSchemas[2];
|
SField field = {0};
|
||||||
pSchema->bytes = htonl(2);
|
field.bytes = 2;
|
||||||
pSchema->type = TSDB_DATA_TYPE_TINYINT;
|
field.type = TSDB_DATA_TYPE_TINYINT;
|
||||||
strcpy(pSchema->name, "tag1");
|
strcpy(field.name, "tag1");
|
||||||
|
taosArrayPush(createReq.pTags, &field);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SSchema* pSchema = &pReq->pSchemas[3];
|
SField field = {0};
|
||||||
pSchema->bytes = htonl(8);
|
field.bytes = 8;
|
||||||
pSchema->type = TSDB_DATA_TYPE_BIGINT;
|
field.type = TSDB_DATA_TYPE_BIGINT;
|
||||||
strcpy(pSchema->name, "tag2");
|
strcpy(field.name, "tag2");
|
||||||
|
taosArrayPush(createReq.pTags, &field);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SSchema* pSchema = &pReq->pSchemas[4];
|
SField field = {0};
|
||||||
pSchema->bytes = htonl(16);
|
field.bytes = 16;
|
||||||
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
field.type = TSDB_DATA_TYPE_BINARY;
|
||||||
strcpy(pSchema->name, "tag3");
|
strcpy(field.name, "tag3");
|
||||||
|
taosArrayPush(createReq.pTags, &field);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pContLen = contLen;
|
int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq);
|
||||||
return pReq;
|
void* pHead = rpcMallocCont(tlen);
|
||||||
|
|
||||||
|
void* pBuf = pHead;
|
||||||
|
tSerializeSMCreateStbReq(&pBuf, &createReq);
|
||||||
|
*pContLen = tlen;
|
||||||
|
return pHead;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, const char* tagname, int32_t* pContLen) {
|
||||||
|
@ -260,9 +269,9 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
@ -418,8 +427,8 @@ TEST_F(MndTestStb, 02_Alter_Stb_AddTag) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
@ -483,8 +492,8 @@ TEST_F(MndTestStb, 03_Alter_Stb_DropTag) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -529,8 +538,8 @@ TEST_F(MndTestStb, 04_Alter_Stb_AlterTagName) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -598,8 +607,8 @@ TEST_F(MndTestStb, 05_Alter_Stb_AlterTagBytes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -656,8 +665,8 @@ TEST_F(MndTestStb, 06_Alter_Stb_AddColumn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_NE(pRsp, nullptr);
|
ASSERT_NE(pRsp, nullptr);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
@ -721,8 +730,8 @@ TEST_F(MndTestStb, 07_Alter_Stb_DropColumn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -786,8 +795,8 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
void* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -249,92 +249,35 @@ SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx
|
||||||
return pCreateMsg;
|
return pCreateMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
|
SMCreateStbReq* buildCreateStbMsg(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx,
|
||||||
SSchema* pSchema;
|
SMsgBuf* pMsgBuf) {
|
||||||
|
SMCreateStbReq createReq = {0};
|
||||||
|
createReq.igExists = pCreateTableSql->existCheck ? 1 : 0;
|
||||||
|
createReq.pColumns = pCreateTableSql->colInfo.pColumns;
|
||||||
|
createReq.pTags = pCreateTableSql->colInfo.pTagColumns;
|
||||||
|
createReq.numOfColumns = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pColumns);
|
||||||
|
createReq.numOfTags = (int32_t)taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
|
||||||
|
|
||||||
int32_t numOfTags = 0;
|
SName n = {0};
|
||||||
int32_t numOfCols = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pColumns);
|
if (createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf) != 0) {
|
||||||
if (pCreateTableSql->colInfo.pTagColumns != NULL) {
|
|
||||||
numOfTags = (int32_t) taosArrayGetSize(pCreateTableSql->colInfo.pTagColumns);
|
|
||||||
}
|
|
||||||
|
|
||||||
SMCreateStbReq* pCreateStbMsg = (SMCreateStbReq*)calloc(1, sizeof(SMCreateStbReq) + (numOfCols + numOfTags) * sizeof(SSchema));
|
|
||||||
if (pCreateStbMsg == NULL) {
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* pMsg = NULL;
|
if (tNameExtractFullName(&n, createReq.name) != 0) {
|
||||||
#if 0
|
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
|
||||||
int32_t tableType = pCreateTableSql->type;
|
return NULL;
|
||||||
if (tableType != TSQL_CREATE_TABLE && tableType != TSQL_CREATE_STABLE) { // create by using super table, tags value
|
}
|
||||||
SArray* list = pInfo->pCreateTableInfo->childTableInfo;
|
|
||||||
|
|
||||||
int32_t numOfTables = (int32_t)taosArrayGetSize(list);
|
int32_t tlen = tSerializeSMCreateStbReq(NULL, &createReq);
|
||||||
pCreateStbMsg->numOfTables = htonl(numOfTables);
|
void* req = malloc(tlen);
|
||||||
|
if (req == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pMsg = (char*)pCreateMsg;
|
void *buf = req;
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
tSerializeSMCreateStbReq(&buf, &createReq);
|
||||||
SCreateTableMsg* pCreate = (SCreateTableMsg*)pMsg;
|
return req;
|
||||||
|
|
||||||
pCreate->numOfColumns = htons(pCmd->numOfCols);
|
|
||||||
pCreate->numOfTags = htons(pCmd->count);
|
|
||||||
pMsg += sizeof(SCreateTableMsg);
|
|
||||||
|
|
||||||
SCreatedTableInfo* p = taosArrayGet(list, i);
|
|
||||||
strcpy(pCreate->tableName, p->fullname);
|
|
||||||
pCreate->igExists = (p->igExist) ? 1 : 0;
|
|
||||||
|
|
||||||
// use dbinfo from table id without modifying current db info
|
|
||||||
pMsg = serializeTagData(&p->tagdata, pMsg);
|
|
||||||
|
|
||||||
int32_t len = (int32_t)(pMsg - (char*)pCreate);
|
|
||||||
pCreate->len = htonl(len);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
#endif
|
|
||||||
// create (super) table
|
|
||||||
SName n = {0};
|
|
||||||
int32_t code = createSName(&n, &pCreateTableSql->name, pParseCtx, pMsgBuf);
|
|
||||||
if (code != 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tNameExtractFullName(&n, pCreateStbMsg->name);
|
|
||||||
if (code != 0) {
|
|
||||||
buildInvalidOperationMsg(pMsgBuf, "invalid table name or database not specified");
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCreateStbMsg->igExists = pCreateTableSql->existCheck ? 1 : 0;
|
|
||||||
pCreateStbMsg->numOfColumns = htonl(numOfCols);
|
|
||||||
pCreateStbMsg->numOfTags = htonl(numOfTags);
|
|
||||||
|
|
||||||
pSchema = (SSchema*)pCreateStbMsg->pSchemas;
|
|
||||||
for (int i = 0; i < numOfCols; ++i) {
|
|
||||||
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pColumns, i);
|
|
||||||
pSchema->type = pField->type;
|
|
||||||
pSchema->bytes = htonl(pField->bytes);
|
|
||||||
strcpy(pSchema->name, pField->name);
|
|
||||||
|
|
||||||
pSchema++;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfTags; ++i) {
|
|
||||||
SField* pField = taosArrayGet(pCreateTableSql->colInfo.pTagColumns, i);
|
|
||||||
pSchema->type = pField->type;
|
|
||||||
pSchema->bytes = htonl(pField->bytes);
|
|
||||||
strcpy(pSchema->name, pField->name);
|
|
||||||
|
|
||||||
pSchema++;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMsg = (char*)pSchema;
|
|
||||||
|
|
||||||
int32_t msgLen = (int32_t)(pMsg - (char*)pCreateStbMsg);
|
|
||||||
*len = msgLen;
|
|
||||||
|
|
||||||
return pCreateStbMsg;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
|
SMDropStbReq* buildDropStableMsg(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
|
||||||
|
|
|
@ -204,7 +204,12 @@ void* taosArrayGetLast(const SArray* pArray) {
|
||||||
return TARRAY_GET_ELEM(pArray, pArray->size - 1);
|
return TARRAY_GET_ELEM(pArray, pArray->size - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; }
|
size_t taosArrayGetSize(const SArray* pArray) {
|
||||||
|
if (pArray == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return pArray->size;
|
||||||
|
}
|
||||||
|
|
||||||
void taosArraySetSize(SArray* pArray, size_t size) {
|
void taosArraySetSize(SArray* pArray, size_t size) {
|
||||||
assert(size <= pArray->capacity);
|
assert(size <= pArray->capacity);
|
||||||
|
@ -296,7 +301,7 @@ SArray* taosArrayDup(const SArray* pSrc) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayClear(SArray* pArray) {
|
void taosArrayClear(SArray* pArray) {
|
||||||
assert( pArray != NULL );
|
if (pArray == NULL) return;
|
||||||
pArray->size = 0;
|
pArray->size = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue