update stb
This commit is contained in:
parent
b5167ceaf1
commit
604ff6c430
|
@ -264,10 +264,10 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char name[TSDB_TABLE_FNAME_LEN];
|
||||
int8_t updateType;
|
||||
int8_t alterType;
|
||||
int32_t numOfSchemas;
|
||||
SSchema pSchemas[];
|
||||
} SMUpdateStbReq;
|
||||
} SMAltertbReq;
|
||||
|
||||
typedef struct {
|
||||
int32_t pid;
|
||||
|
|
|
@ -33,10 +33,10 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
|
|||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
|
||||
static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
|
||||
static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq);
|
||||
static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp);
|
||||
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq);
|
||||
static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||
|
@ -53,10 +53,10 @@ int32_t mndInitStb(SMnode *pMnode) {
|
|||
.deleteFp = (SdbDeleteFp)mndStbActionDelete};
|
||||
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMUpdateStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVUpdateStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STB_META, mndProcessStbMetaReq);
|
||||
|
||||
|
@ -193,6 +193,8 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
|||
|
||||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
||||
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
|
||||
tfree(pStb->pColumns);
|
||||
tfree(pStb->pTags);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -202,10 +204,10 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
|||
taosWLockLatch(&pOld->lock);
|
||||
|
||||
if (pOld->numOfColumns < pNew->numOfColumns) {
|
||||
void *pSchema = malloc(pOld->numOfColumns * sizeof(SSchema));
|
||||
if (pSchema != NULL) {
|
||||
void *pColumns = malloc(pNew->numOfColumns * sizeof(SSchema));
|
||||
if (pColumns != NULL) {
|
||||
free(pOld->pColumns);
|
||||
pOld->pColumns = pSchema;
|
||||
pOld->pColumns = pColumns;
|
||||
} else {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||
|
@ -214,10 +216,10 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
|||
}
|
||||
|
||||
if (pOld->numOfTags < pNew->numOfTags) {
|
||||
void *pSchema = malloc(pOld->numOfTags * sizeof(SSchema));
|
||||
if (pSchema != NULL) {
|
||||
void *pTags = malloc(pNew->numOfTags * sizeof(SSchema));
|
||||
if (pTags != NULL) {
|
||||
free(pOld->pTags);
|
||||
pOld->pTags = pSchema;
|
||||
pOld->pTags = pTags;
|
||||
} else {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
|
||||
|
@ -575,11 +577,11 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCheckUpdateStbReq(SMUpdateStbReq *pUpdate) {
|
||||
pUpdate->numOfSchemas = htonl(pUpdate->numOfSchemas);
|
||||
static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) {
|
||||
pAlter->numOfSchemas = htonl(pAlter->numOfSchemas);
|
||||
|
||||
for (int32_t i = 0; i < pUpdate->numOfSchemas; ++i) {
|
||||
SSchema *pSchema = &pUpdate->pSchemas[i];
|
||||
for (int32_t i = 0; i < pAlter->numOfSchemas; ++i) {
|
||||
SSchema *pSchema = &pAlter->pSchemas[i];
|
||||
pSchema->colId = htonl(pSchema->colId);
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
|
||||
|
@ -696,8 +698,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName,
|
||||
const char *newTagName) {
|
||||
static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName, const char *newTagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
|
@ -727,7 +728,7 @@ static int32_t mndUpdateStbTagName(const SStbObj *pOld, SStbObj *pNew, const cha
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name);
|
||||
if (tag < 0) {
|
||||
terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
|
||||
|
@ -810,7 +811,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) {
|
||||
int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name);
|
||||
if (col < 0) {
|
||||
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
|
||||
|
@ -849,17 +850,16 @@ static int32_t mndUpdateStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndSetUpdateStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
static int32_t mndSetAlterStbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdbRaw *pCommitRaw = mndStbActionEncode(pStb);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
|
@ -868,8 +868,7 @@ static int32_t mndSetUpdateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
@ -892,7 +891,7 @@ static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_VND_CREATE_STB;
|
||||
action.msgType = TDMT_VND_ALTER_STB;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
free(pReq);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
|
@ -905,7 +904,7 @@ static int32_t mndSetUpdateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMUpdateStbReq *pUpdate, SDbObj *pDb, SStbObj *pOld) {
|
||||
static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *pAlter, SDbObj *pDb, SStbObj *pOld) {
|
||||
SStbObj stbObj = {0};
|
||||
taosRLockLatch(&pOld->lock);
|
||||
memcpy(&stbObj, pOld, sizeof(SStbObj));
|
||||
|
@ -916,93 +915,93 @@ static int32_t mndUpdateStb(SMnode *pMnode, SMnodeMsg *pReq, const SMUpdateStbRe
|
|||
|
||||
int32_t code = -1;
|
||||
|
||||
switch (pUpdate->updateType) {
|
||||
switch (pAlter->alterType) {
|
||||
case TSDB_ALTER_TABLE_ADD_TAG_COLUMN:
|
||||
code = mndAddSuperTableTag(pOld, &stbObj, pUpdate->pSchemas, 1);
|
||||
code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pSchemas, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_TAG_COLUMN:
|
||||
code = mndDropSuperTableTag(pOld, &stbObj, pUpdate->pSchemas[0].name);
|
||||
code = mndDropSuperTableTag(pOld, &stbObj, pAlter->pSchemas[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
|
||||
code = mndUpdateStbTagName(pOld, &stbObj, pUpdate->pSchemas[0].name, pUpdate->pSchemas[1].name);
|
||||
code = mndAlterStbTagName(pOld, &stbObj, pAlter->pSchemas[0].name, pAlter->pSchemas[1].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
||||
code = mndUpdateStbTagBytes(pOld, &stbObj, &pUpdate->pSchemas[0]);
|
||||
code = mndAlterStbTagBytes(pOld, &stbObj, &pAlter->pSchemas[0]);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN:
|
||||
code = mndAddSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas, 1);
|
||||
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pSchemas, 1);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
code = mndDropSuperTableColumn(pOld, &stbObj, pUpdate->pSchemas[0].name);
|
||||
code = mndDropSuperTableColumn(pOld, &stbObj, pAlter->pSchemas[0].name);
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||
code = mndUpdateStbColumnBytes(pOld, &stbObj, &pUpdate->pSchemas[0]);
|
||||
code = mndAlterStbColumnBytes(pOld, &stbObj, &pAlter->pSchemas[0]);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
|
||||
break;
|
||||
}
|
||||
|
||||
if (code != 0) goto UPDATE_STB_OVER;
|
||||
if (code != 0) goto ALTER_STB_OVER;
|
||||
|
||||
code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) goto UPDATE_STB_OVER;
|
||||
if (pTrans == NULL) goto ALTER_STB_OVER;
|
||||
|
||||
mDebug("trans:%d, used to update stb:%s", pTrans->id, pUpdate->name);
|
||||
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
|
||||
|
||||
if (mndSetUpdateStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndSetUpdateStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndSetUpdateStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto UPDATE_STB_OVER;
|
||||
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||
if (mndSetAlterStbRedoActions(pMnode, pTrans, pDb, &stbObj) != 0) goto ALTER_STB_OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto ALTER_STB_OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
UPDATE_STB_OVER:
|
||||
ALTER_STB_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
tfree(stbObj.pTags);
|
||||
tfree(stbObj.pColumns);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMUpdateStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SMUpdateStbReq *pUpdate = pReq->rpcMsg.pCont;
|
||||
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
SMAltertbReq *pAlter = pReq->rpcMsg.pCont;
|
||||
|
||||
mDebug("stb:%s, start to update", pUpdate->name);
|
||||
mDebug("stb:%s, start to alter", pAlter->name);
|
||||
|
||||
if (mndCheckUpdateStbReq(pUpdate) != 0) {
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
||||
if (mndCheckAlterStbReq(pAlter) != 0) {
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pUpdate->name);
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, pAlter->name);
|
||||
if (pStb == NULL) {
|
||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pUpdate->name);
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, pAlter->name);
|
||||
if (pDb == NULL) {
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, terrstr());
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndUpdateStb(pMnode, pReq, pUpdate, pDb, pStb);
|
||||
int32_t code = mndAlterStb(pMnode, pReq, pAlter, pDb, pStb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
if (code != 0) {
|
||||
mError("stb:%s, failed to update since %s", pUpdate->name, tstrerror(code));
|
||||
mError("stb:%s, failed to alter since %s", pAlter->name, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessVUpdateStbRsp(SMnodeMsg *pRsp) {
|
||||
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) {
|
||||
mndTransProcessRsp(pRsp);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -21,213 +21,289 @@ class MndTestStb : public ::testing::Test {
|
|||
public:
|
||||
void SetUp() override {}
|
||||
void TearDown() override {}
|
||||
|
||||
SCreateDbReq* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
|
||||
SMCreateStbReq* BuildCreateStbReq(const char* stbname, int32_t* pContLen);
|
||||
SMAltertbReq* BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen);
|
||||
};
|
||||
|
||||
Testbase MndTestStb::test;
|
||||
|
||||
TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||
{
|
||||
int32_t contLen = sizeof(SCreateDbReq);
|
||||
SCreateDbReq* MndTestStb::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
||||
int32_t contLen = sizeof(SCreateDbReq);
|
||||
|
||||
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->db, "1.d1");
|
||||
pReq->numOfVgroups = htonl(2);
|
||||
pReq->cacheBlockSize = htonl(16);
|
||||
pReq->totalBlocks = htonl(10);
|
||||
pReq->daysPerFile = htonl(10);
|
||||
pReq->daysToKeep0 = htonl(3650);
|
||||
pReq->daysToKeep1 = htonl(3650);
|
||||
pReq->daysToKeep2 = htonl(3650);
|
||||
pReq->minRows = htonl(100);
|
||||
pReq->maxRows = htonl(4096);
|
||||
pReq->commitTime = htonl(3600);
|
||||
pReq->fsyncPeriod = htonl(3000);
|
||||
pReq->walLevel = 1;
|
||||
pReq->precision = 0;
|
||||
pReq->compression = 2;
|
||||
pReq->replications = 1;
|
||||
pReq->quorum = 1;
|
||||
pReq->update = 0;
|
||||
pReq->cacheLastRow = 0;
|
||||
pReq->ignoreExist = 1;
|
||||
SCreateDbReq* pReq = (SCreateDbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->db, dbname);
|
||||
pReq->numOfVgroups = htonl(2);
|
||||
pReq->cacheBlockSize = htonl(16);
|
||||
pReq->totalBlocks = htonl(10);
|
||||
pReq->daysPerFile = htonl(10);
|
||||
pReq->daysToKeep0 = htonl(3650);
|
||||
pReq->daysToKeep1 = htonl(3650);
|
||||
pReq->daysToKeep2 = htonl(3650);
|
||||
pReq->minRows = htonl(100);
|
||||
pReq->maxRows = htonl(4096);
|
||||
pReq->commitTime = htonl(3600);
|
||||
pReq->fsyncPeriod = htonl(3000);
|
||||
pReq->walLevel = 1;
|
||||
pReq->precision = 0;
|
||||
pReq->compression = 2;
|
||||
pReq->replications = 1;
|
||||
pReq->quorum = 1;
|
||||
pReq->update = 0;
|
||||
pReq->cacheLastRow = 0;
|
||||
pReq->ignoreExist = 1;
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t cols = 2;
|
||||
int32_t tags = 3;
|
||||
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
|
||||
|
||||
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->name, "1.d1.stb");
|
||||
pReq->numOfTags = htonl(tags);
|
||||
pReq->numOfColumns = htonl(cols);
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[0];
|
||||
pSchema->bytes = htonl(8);
|
||||
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema->name, "ts");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[1];
|
||||
pSchema->bytes = htonl(4);
|
||||
pSchema->type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema->name, "col1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[2];
|
||||
pSchema->bytes = htonl(2);
|
||||
pSchema->type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema->name, "tag1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[3];
|
||||
pSchema->bytes = htonl(8);
|
||||
pSchema->type = TSDB_DATA_TYPE_BIGINT;
|
||||
strcpy(pSchema->name, "tag2");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[4];
|
||||
pSchema->bytes = htonl(16);
|
||||
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema->name, "tag3");
|
||||
}
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
|
||||
CHECK_META("show stables", 4);
|
||||
|
||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
|
||||
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||
CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns");
|
||||
CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags");
|
||||
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||
CheckTimestamp();
|
||||
CheckInt32(2);
|
||||
CheckInt32(3);
|
||||
|
||||
// ----- meta ------
|
||||
{
|
||||
int32_t contLen = sizeof(STableInfoReq);
|
||||
|
||||
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->dbFName, "1.d1");
|
||||
strcpy(pReq->tbName, "stb");
|
||||
|
||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
||||
ASSERT_NE(pMsg, nullptr);
|
||||
ASSERT_EQ(pMsg->code, 0);
|
||||
|
||||
STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
|
||||
pRsp->numOfTags = htonl(pRsp->numOfTags);
|
||||
pRsp->numOfColumns = htonl(pRsp->numOfColumns);
|
||||
pRsp->sversion = htonl(pRsp->sversion);
|
||||
pRsp->tversion = htonl(pRsp->tversion);
|
||||
pRsp->suid = be64toh(pRsp->suid);
|
||||
pRsp->tuid = be64toh(pRsp->tuid);
|
||||
pRsp->vgId = be64toh(pRsp->vgId);
|
||||
for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
|
||||
SSchema* pSchema = &pRsp->pSchema[i];
|
||||
pSchema->colId = htonl(pSchema->colId);
|
||||
pSchema->bytes = htonl(pSchema->bytes);
|
||||
}
|
||||
|
||||
EXPECT_STREQ(pRsp->dbFName, "1.d1");
|
||||
EXPECT_STREQ(pRsp->tbName, "stb");
|
||||
EXPECT_STREQ(pRsp->stbName, "stb");
|
||||
EXPECT_EQ(pRsp->numOfColumns, 2);
|
||||
EXPECT_EQ(pRsp->numOfTags, 3);
|
||||
EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
|
||||
EXPECT_EQ(pRsp->update, 0);
|
||||
EXPECT_EQ(pRsp->sversion, 1);
|
||||
EXPECT_EQ(pRsp->tversion, 0);
|
||||
EXPECT_GT(pRsp->suid, 0);
|
||||
EXPECT_GT(pRsp->tuid, 0);
|
||||
EXPECT_EQ(pRsp->vgId, 0);
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pRsp->pSchema[0];
|
||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
|
||||
EXPECT_EQ(pSchema->colId, 1);
|
||||
EXPECT_EQ(pSchema->bytes, 8);
|
||||
EXPECT_STREQ(pSchema->name, "ts");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pRsp->pSchema[1];
|
||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
|
||||
EXPECT_EQ(pSchema->colId, 2);
|
||||
EXPECT_EQ(pSchema->bytes, 4);
|
||||
EXPECT_STREQ(pSchema->name, "col1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pRsp->pSchema[2];
|
||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT);
|
||||
EXPECT_EQ(pSchema->colId, 3);
|
||||
EXPECT_EQ(pSchema->bytes, 2);
|
||||
EXPECT_STREQ(pSchema->name, "tag1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pRsp->pSchema[3];
|
||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT);
|
||||
EXPECT_EQ(pSchema->colId, 4);
|
||||
EXPECT_EQ(pSchema->bytes, 8);
|
||||
EXPECT_STREQ(pSchema->name, "tag2");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pRsp->pSchema[4];
|
||||
EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
|
||||
EXPECT_EQ(pSchema->colId, 5);
|
||||
EXPECT_EQ(pSchema->bytes, 16);
|
||||
EXPECT_STREQ(pSchema->name, "tag3");
|
||||
}
|
||||
}
|
||||
|
||||
// restart
|
||||
test.Restart();
|
||||
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
|
||||
CHECK_META("show stables", 4);
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
||||
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||
CheckTimestamp();
|
||||
CheckInt32(2);
|
||||
CheckInt32(3);
|
||||
|
||||
{
|
||||
int32_t contLen = sizeof(SMDropStbReq);
|
||||
|
||||
SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->name, "1.d1.stb");
|
||||
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, "1.d1");
|
||||
CHECK_META("show stables", 4);
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
SMCreateStbReq* MndTestStb::BuildCreateStbReq(const char *stbname, int32_t* pContLen) {
|
||||
int32_t cols = 2;
|
||||
int32_t tags = 3;
|
||||
int32_t contLen = (tags + cols) * sizeof(SSchema) + sizeof(SMCreateStbReq);
|
||||
|
||||
SMCreateStbReq* pReq = (SMCreateStbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->name, stbname);
|
||||
pReq->numOfTags = htonl(tags);
|
||||
pReq->numOfColumns = htonl(cols);
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[0];
|
||||
pSchema->bytes = htonl(8);
|
||||
pSchema->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
strcpy(pSchema->name, "ts");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[1];
|
||||
pSchema->bytes = htonl(4);
|
||||
pSchema->type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema->name, "col1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[2];
|
||||
pSchema->bytes = htonl(2);
|
||||
pSchema->type = TSDB_DATA_TYPE_TINYINT;
|
||||
strcpy(pSchema->name, "tag1");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[3];
|
||||
pSchema->bytes = htonl(8);
|
||||
pSchema->type = TSDB_DATA_TYPE_BIGINT;
|
||||
strcpy(pSchema->name, "tag2");
|
||||
}
|
||||
|
||||
{
|
||||
SSchema* pSchema = &pReq->pSchemas[4];
|
||||
pSchema->bytes = htonl(16);
|
||||
pSchema->type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema->name, "tag3");
|
||||
}
|
||||
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
// TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||
// const char *dbname = "1.d1";
|
||||
// const char *stbname = "1.d1.stb";
|
||||
|
||||
// {
|
||||
// int32_t contLen = 0;
|
||||
// SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen);
|
||||
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||
// ASSERT_NE(pRsp, nullptr);
|
||||
// ASSERT_EQ(pRsp->code, 0);
|
||||
// }
|
||||
|
||||
// {
|
||||
// int32_t contLen = 0;
|
||||
// SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||
// ASSERT_NE(pRsp, nullptr);
|
||||
// ASSERT_EQ(pRsp->code, 0);
|
||||
// }
|
||||
|
||||
// {
|
||||
// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||
// CHECK_META("show stables", 4);
|
||||
// CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
|
||||
// CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||
// CHECK_SCHEMA(2, TSDB_DATA_TYPE_INT, 4, "columns");
|
||||
// CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "tags");
|
||||
|
||||
// test.SendShowRetrieveReq();
|
||||
// EXPECT_EQ(test.GetShowRows(), 1);
|
||||
// CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||
// CheckTimestamp();
|
||||
// CheckInt32(2);
|
||||
// CheckInt32(3);
|
||||
// }
|
||||
|
||||
// // ----- meta ------
|
||||
// {
|
||||
// int32_t contLen = sizeof(STableInfoReq);
|
||||
// STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
|
||||
// strcpy(pReq->dbFName, dbname);
|
||||
// strcpy(pReq->tbName, "stb");
|
||||
|
||||
// SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
||||
// ASSERT_NE(pMsg, nullptr);
|
||||
// ASSERT_EQ(pMsg->code, 0);
|
||||
|
||||
// STableMetaRsp* pRsp = (STableMetaRsp*)pMsg->pCont;
|
||||
// pRsp->numOfTags = htonl(pRsp->numOfTags);
|
||||
// pRsp->numOfColumns = htonl(pRsp->numOfColumns);
|
||||
// pRsp->sversion = htonl(pRsp->sversion);
|
||||
// pRsp->tversion = htonl(pRsp->tversion);
|
||||
// pRsp->suid = be64toh(pRsp->suid);
|
||||
// pRsp->tuid = be64toh(pRsp->tuid);
|
||||
// pRsp->vgId = be64toh(pRsp->vgId);
|
||||
// for (int32_t i = 0; i < pRsp->numOfTags + pRsp->numOfColumns; ++i) {
|
||||
// SSchema* pSchema = &pRsp->pSchema[i];
|
||||
// pSchema->colId = htonl(pSchema->colId);
|
||||
// pSchema->bytes = htonl(pSchema->bytes);
|
||||
// }
|
||||
|
||||
// EXPECT_STREQ(pRsp->dbFName, dbname);
|
||||
// EXPECT_STREQ(pRsp->tbName, "stb");
|
||||
// EXPECT_STREQ(pRsp->stbName, "stb");
|
||||
// EXPECT_EQ(pRsp->numOfColumns, 2);
|
||||
// EXPECT_EQ(pRsp->numOfTags, 3);
|
||||
// EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI);
|
||||
// EXPECT_EQ(pRsp->tableType, TSDB_SUPER_TABLE);
|
||||
// EXPECT_EQ(pRsp->update, 0);
|
||||
// EXPECT_EQ(pRsp->sversion, 1);
|
||||
// EXPECT_EQ(pRsp->tversion, 0);
|
||||
// EXPECT_GT(pRsp->suid, 0);
|
||||
// EXPECT_GT(pRsp->tuid, 0);
|
||||
// EXPECT_EQ(pRsp->vgId, 0);
|
||||
|
||||
// {
|
||||
// SSchema* pSchema = &pRsp->pSchema[0];
|
||||
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP);
|
||||
// EXPECT_EQ(pSchema->colId, 1);
|
||||
// EXPECT_EQ(pSchema->bytes, 8);
|
||||
// EXPECT_STREQ(pSchema->name, "ts");
|
||||
// }
|
||||
|
||||
// {
|
||||
// SSchema* pSchema = &pRsp->pSchema[1];
|
||||
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT);
|
||||
// EXPECT_EQ(pSchema->colId, 2);
|
||||
// EXPECT_EQ(pSchema->bytes, 4);
|
||||
// EXPECT_STREQ(pSchema->name, "col1");
|
||||
// }
|
||||
|
||||
// {
|
||||
// SSchema* pSchema = &pRsp->pSchema[2];
|
||||
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TINYINT);
|
||||
// EXPECT_EQ(pSchema->colId, 3);
|
||||
// EXPECT_EQ(pSchema->bytes, 2);
|
||||
// EXPECT_STREQ(pSchema->name, "tag1");
|
||||
// }
|
||||
|
||||
// {
|
||||
// SSchema* pSchema = &pRsp->pSchema[3];
|
||||
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BIGINT);
|
||||
// EXPECT_EQ(pSchema->colId, 4);
|
||||
// EXPECT_EQ(pSchema->bytes, 8);
|
||||
// EXPECT_STREQ(pSchema->name, "tag2");
|
||||
// }
|
||||
|
||||
// {
|
||||
// SSchema* pSchema = &pRsp->pSchema[4];
|
||||
// EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY);
|
||||
// EXPECT_EQ(pSchema->colId, 5);
|
||||
// EXPECT_EQ(pSchema->bytes, 16);
|
||||
// EXPECT_STREQ(pSchema->name, "tag3");
|
||||
// }
|
||||
// }
|
||||
|
||||
// // restart
|
||||
// test.Restart();
|
||||
|
||||
// {
|
||||
// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||
// CHECK_META("show stables", 4);
|
||||
// test.SendShowRetrieveReq();
|
||||
// EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
||||
// CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||
// CheckTimestamp();
|
||||
// CheckInt32(2);
|
||||
// CheckInt32(3);
|
||||
// }
|
||||
|
||||
// {
|
||||
// int32_t contLen = sizeof(SMDropStbReq);
|
||||
|
||||
// SMDropStbReq* pReq = (SMDropStbReq*)rpcMallocCont(contLen);
|
||||
// strcpy(pReq->name, stbname);
|
||||
|
||||
// SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_STB, pReq, contLen);
|
||||
// ASSERT_NE(pRsp, nullptr);
|
||||
// ASSERT_EQ(pRsp->code, 0);
|
||||
// }
|
||||
|
||||
// test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||
// CHECK_META("show stables", 4);
|
||||
// test.SendShowRetrieveReq();
|
||||
// EXPECT_EQ(test.GetShowRows(), 0);
|
||||
// }
|
||||
|
||||
SMAltertbReq* MndTestStb::BuildAlterStbAddTagReq(const char* stbname, int32_t* pContLen) {
|
||||
int32_t contLen = sizeof(SMAltertbReq) + sizeof(SSchema);
|
||||
SMAltertbReq* pReq = (SMAltertbReq*)rpcMallocCont(contLen);
|
||||
strcpy(pReq->name, stbname);
|
||||
pReq->numOfSchemas = htonl(1);
|
||||
pReq->alterType = TSDB_ALTER_TABLE_ADD_TAG_COLUMN;
|
||||
|
||||
SSchema* pSchema = &pReq->pSchemas[0];
|
||||
pSchema->bytes = htonl(4);
|
||||
pSchema->type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema->name, "tag4");
|
||||
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
TEST_F(MndTestStb, 01_Alter_Stb) {
|
||||
const char *dbname = "1.d2";
|
||||
const char *stbname = "1.d2.stb";
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
SCreateDbReq* pReq = BuildCreateDbReq(dbname, &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
SMCreateStbReq* pReq = BuildCreateStbReq(stbname, &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_STB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
SMAltertbReq* pReq = BuildAlterStbAddTagReq(stbname, &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_STB, dbname);
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
CheckBinary("stb", TSDB_TABLE_NAME_LEN);
|
||||
CheckTimestamp();
|
||||
CheckInt32(2);
|
||||
CheckInt32(4);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,7 +106,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
break;
|
||||
|
||||
case TDMT_VND_ALTER_STB:
|
||||
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
||||
tDeserializeSVCreateTbReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateTbReq);
|
||||
free(vCreateTbReq.stbCfg.pSchema);
|
||||
free(vCreateTbReq.stbCfg.pTagSchema);
|
||||
free(vCreateTbReq.name);
|
||||
break;
|
||||
case TDMT_VND_DROP_STB:
|
||||
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||
|
|
Loading…
Reference in New Issue