Merge pull request #10289 from taosdata/feature/privilege
test for create topic
This commit is contained in:
commit
f53ea71579
|
@ -1083,15 +1083,16 @@ typedef struct {
|
|||
} STaskDropRsp;
|
||||
|
||||
typedef struct {
|
||||
int8_t igExists;
|
||||
char* name;
|
||||
char* sql;
|
||||
char* physicalPlan;
|
||||
char* logicalPlan;
|
||||
char name[TSDB_TOPIC_FNAME_LEN];
|
||||
int8_t igExists;
|
||||
char* sql;
|
||||
char* physicalPlan;
|
||||
char* logicalPlan;
|
||||
} SMCreateTopicReq;
|
||||
|
||||
int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq);
|
||||
void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq);
|
||||
int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq);
|
||||
int32_t tDeserializeSMCreateTopicReq(void* buf, int32_t bufLen, SMCreateTopicReq* pReq);
|
||||
void tFreeSMCreateTopicReq(SMCreateTopicReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int64_t topicId;
|
||||
|
@ -1246,7 +1247,7 @@ typedef struct {
|
|||
int8_t igNotExists;
|
||||
} SMDropTopicReq;
|
||||
|
||||
int32_t tSerializeSMDropTopicReqq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -388,21 +388,20 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
|||
tNameExtractFullName(&name, topicFname);
|
||||
|
||||
SMCreateTopicReq req = {
|
||||
.name = (char*)topicFname,
|
||||
.igExists = 1,
|
||||
.physicalPlan = (char*)pStr,
|
||||
.sql = (char*)sql,
|
||||
.logicalPlan = (char*)"no logic plan",
|
||||
};
|
||||
memcpy(req.name, topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
|
||||
int tlen = tSerializeMCreateTopicReq(NULL, &req);
|
||||
int tlen = tSerializeMCreateTopicReq(NULL, 0, &req);
|
||||
void* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
goto _return;
|
||||
}
|
||||
|
||||
void* abuf = buf;
|
||||
tSerializeMCreateTopicReq(&abuf, &req);
|
||||
tSerializeMCreateTopicReq(buf, tlen, &req);
|
||||
/*printf("formatted: %s\n", dagStr);*/
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||
|
|
|
@ -1859,7 +1859,7 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMDropTopicReqq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) {
|
||||
int32_t tSerializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
|
@ -1886,23 +1886,67 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeMCreateTopicReq(void **buf, const SMCreateTopicReq *pReq) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
||||
tlen += taosEncodeString(buf, pReq->name);
|
||||
tlen += taosEncodeString(buf, pReq->sql);
|
||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||
int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopicReq *pReq) {
|
||||
int32_t sqlLen = 0;
|
||||
int32_t physicalPlanLen = 0;
|
||||
int32_t logicalPlanLen = 0;
|
||||
if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql);
|
||||
if (pReq->physicalPlan != NULL) physicalPlanLen = (int32_t)strlen(pReq->physicalPlan);
|
||||
if (pReq->logicalPlan != NULL) logicalPlanLen = (int32_t)strlen(pReq->logicalPlan);
|
||||
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, physicalPlanLen) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, logicalPlanLen) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tDeserializeSMCreateTopicReq(void *buf, SMCreateTopicReq *pReq) {
|
||||
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
|
||||
buf = taosDecodeString(buf, &(pReq->name));
|
||||
buf = taosDecodeString(buf, &(pReq->sql));
|
||||
buf = taosDecodeString(buf, &(pReq->physicalPlan));
|
||||
buf = taosDecodeString(buf, &(pReq->logicalPlan));
|
||||
return buf;
|
||||
int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq *pReq) {
|
||||
int32_t sqlLen = 0;
|
||||
int32_t physicalPlanLen = 0;
|
||||
int32_t logicalPlanLen = 0;
|
||||
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &physicalPlanLen) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &logicalPlanLen) < 0) return -1;
|
||||
|
||||
pReq->sql = calloc(1, sqlLen + 1);
|
||||
pReq->physicalPlan = calloc(1, physicalPlanLen + 1);
|
||||
pReq->logicalPlan = calloc(1, logicalPlanLen + 1);
|
||||
if (pReq->sql == NULL || pReq->physicalPlan == NULL || pReq->logicalPlan == NULL) return -1;
|
||||
|
||||
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->physicalPlan) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->logicalPlan) < 0) return -1;
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSMCreateTopicReq(SMCreateTopicReq *pReq) {
|
||||
tfree(pReq->sql);
|
||||
tfree(pReq->physicalPlan);
|
||||
tfree(pReq->logicalPlan);
|
||||
}
|
||||
|
||||
int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) {
|
||||
|
|
|
@ -99,6 +99,8 @@ void Testbase::SendShowMetaReq(int8_t showType, const char* db) {
|
|||
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW, pReq, contLen);
|
||||
ASSERT(pRsp->pCont != nullptr);
|
||||
|
||||
if (pRsp->contLen == 0) return;
|
||||
|
||||
SShowRsp showRsp = {0};
|
||||
tDeserializeSShowRsp(pRsp->pCont, pRsp->contLen, &showRsp);
|
||||
tFreeSTableMetaRsp(&metaRsp);
|
||||
|
|
|
@ -1387,8 +1387,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
|
|||
}
|
||||
|
||||
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
|
@ -1402,7 +1401,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
|
|||
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (strcmp(pStb->db, dbName) == 0) {
|
||||
if (pStb->dbUid == pDb->uid) {
|
||||
numOfStbs++;
|
||||
}
|
||||
|
||||
|
@ -1410,6 +1409,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
|
|||
}
|
||||
|
||||
*pNumOfStbs = numOfStbs;
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndTopic.h"
|
||||
#include "mndAuth.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndMnode.h"
|
||||
|
@ -52,6 +53,10 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
|
||||
|
||||
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta);
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -78,7 +83,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
|
||||
SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||
|
||||
|
@ -187,7 +191,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
|
|||
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
|
||||
if (pTopic == NULL) {
|
||||
if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||
}
|
||||
return pTopic;
|
||||
|
@ -225,8 +229,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
|
|||
return pDrop;
|
||||
}
|
||||
|
||||
static int32_t mndCheckCreateTopicMsg(SMCreateTopicReq *creattopReq) {
|
||||
// deserialize and other stuff
|
||||
static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) {
|
||||
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -245,68 +252,121 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq
|
|||
topicObj.logicalPlan = pCreate->logicalPlan;
|
||||
topicObj.sqlLen = strlen(pCreate->sql);
|
||||
|
||||
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
|
||||
if (pTopicRaw == NULL) return -1;
|
||||
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
/*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);*/
|
||||
/*mndTransAppendRedolog(pTrans, pTopicRaw);*/
|
||||
/*if (mndTransPrepare(pMnode, pTrans) != 0) {*/
|
||||
/*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/
|
||||
/*mndTransDrop(pTrans);*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
/*mndTransDrop(pTrans);*/
|
||||
/*return 0;*/
|
||||
return sdbWrite(pMnode->pSdb, pTopicRaw);
|
||||
}
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
|
||||
|
||||
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
char *msgStr = pReq->rpcMsg.pCont;
|
||||
SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||
|
||||
SMCreateTopicReq createTopicReq = {0};
|
||||
tDeserializeSMCreateTopicReq(msgStr, &createTopicReq);
|
||||
|
||||
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
||||
|
||||
if (mndCheckCreateTopicMsg(&createTopicReq) != 0) {
|
||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
|
||||
if (pTopic != NULL) {
|
||||
sdbRelease(pMnode->pSdb, pTopic);
|
||||
if (createTopicReq.igExists) {
|
||||
mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
|
||||
return 0;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
||||
mError("topic:%s, failed to create since already exists", createTopicReq.name);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
SDbObj *pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { return 0; }
|
||||
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
int32_t code = -1;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
SDbObj *pDb = NULL;
|
||||
SUserObj *pUser = NULL;
|
||||
SMCreateTopicReq createTopicReq = {0};
|
||||
|
||||
if (tDeserializeSMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
|
||||
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
||||
|
||||
if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
|
||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
|
||||
pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
|
||||
if (pTopic != NULL) {
|
||||
if (createTopicReq.igExists) {
|
||||
mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
|
||||
code = 0;
|
||||
goto CREATE_TOPIC_OVER;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
} else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
|
||||
pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
|
||||
pUser = mndAcquireUser(pMnode, pReq->user);
|
||||
if (pUser == NULL) {
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
|
||||
if (mndCheckWriteAuth(pUser, pDb) != 0) {
|
||||
goto CREATE_TOPIC_OVER;
|
||||
}
|
||||
|
||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
|
||||
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||
|
||||
CREATE_TOPIC_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||
}
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseUser(pMnode, pUser);
|
||||
|
||||
tFreeSMCreateTopicReq(&createTopicReq);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||
if (pTrans == NULL) {
|
||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||
|
||||
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
|
||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) {
|
||||
SMnode *pMnode = pReq->pMnode;
|
||||
|
@ -419,8 +479,7 @@ static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) {
|
|||
}
|
||||
|
||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
|
@ -434,12 +493,15 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
|
|||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
numOfTopics++;
|
||||
if (pTopic->dbUid == pDb->uid) {
|
||||
numOfTopics++;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pTopic);
|
||||
}
|
||||
|
||||
*pNumOfTopics = numOfTopics;
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -466,6 +528,12 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
|||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||
strcpy(pSchema[cols].name, "sql");
|
||||
pSchema[cols].bytes = pShow->bytes[cols];
|
||||
cols++;
|
||||
|
||||
pMeta->numOfColumns = cols;
|
||||
pShow->numOfColumns = cols;
|
||||
|
||||
|
@ -522,6 +590,10 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
|
|||
*(int64_t *)pWrite = pTopic->createTime;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pTopic);
|
||||
}
|
||||
|
|
|
@ -13,3 +13,4 @@ add_subdirectory(mnode)
|
|||
add_subdirectory(db)
|
||||
add_subdirectory(stb)
|
||||
add_subdirectory(func)
|
||||
add_subdirectory(topic)
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
aux_source_directory(. TOPIC_SRC)
|
||||
add_executable(mnode_test_topic ${TOPIC_SRC})
|
||||
target_link_libraries(
|
||||
mnode_test_topic
|
||||
PUBLIC sut
|
||||
)
|
||||
|
||||
add_test(
|
||||
NAME mnode_test_topic
|
||||
COMMAND mnode_test_topic
|
||||
)
|
|
@ -0,0 +1,177 @@
|
|||
/**
|
||||
* @file topic.cpp
|
||||
* @author slguan (slguan@taosdata.com)
|
||||
* @brief MNODE module topic tests
|
||||
* @version 1.0
|
||||
* @date 2022-02-16
|
||||
*
|
||||
* @copyright Copyright (c) 2022
|
||||
*
|
||||
*/
|
||||
|
||||
#include "sut.h"
|
||||
|
||||
class MndTestTopic : public ::testing::Test {
|
||||
protected:
|
||||
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_topic", 9039); }
|
||||
static void TearDownTestSuite() { test.Cleanup(); }
|
||||
|
||||
static Testbase test;
|
||||
|
||||
public:
|
||||
void SetUp() override {}
|
||||
void TearDown() override {}
|
||||
|
||||
void* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
|
||||
void* BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen);
|
||||
void* BuildDropTopicReq(const char* topicName, int32_t* pContLen);
|
||||
};
|
||||
|
||||
Testbase MndTestTopic::test;
|
||||
|
||||
void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
||||
SCreateDbReq createReq = {0};
|
||||
strcpy(createReq.db, dbname);
|
||||
createReq.numOfVgroups = 2;
|
||||
createReq.cacheBlockSize = 16;
|
||||
createReq.totalBlocks = 10;
|
||||
createReq.daysPerFile = 10;
|
||||
createReq.daysToKeep0 = 3650;
|
||||
createReq.daysToKeep1 = 3650;
|
||||
createReq.daysToKeep2 = 3650;
|
||||
createReq.minRows = 100;
|
||||
createReq.maxRows = 4096;
|
||||
createReq.commitTime = 3600;
|
||||
createReq.fsyncPeriod = 3000;
|
||||
createReq.walLevel = 1;
|
||||
createReq.precision = 0;
|
||||
createReq.compression = 2;
|
||||
createReq.replications = 1;
|
||||
createReq.quorum = 1;
|
||||
createReq.update = 0;
|
||||
createReq.cacheLastRow = 0;
|
||||
createReq.ignoreExist = 1;
|
||||
|
||||
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSCreateDbReq(pReq, contLen, &createReq);
|
||||
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) {
|
||||
SMCreateTopicReq createReq = {0};
|
||||
strcpy(createReq.name, topicName);
|
||||
createReq.igExists = 0;
|
||||
createReq.sql = (char*)sql;
|
||||
createReq.physicalPlan = (char*)"physicalPlan";
|
||||
createReq.logicalPlan = (char*)"logicalPlan";
|
||||
|
||||
int32_t contLen = tSerializeMCreateTopicReq(NULL, 0, &createReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeMCreateTopicReq(pReq, contLen, &createReq);
|
||||
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
void* MndTestTopic::BuildDropTopicReq(const char* topicName, int32_t* pContLen) {
|
||||
SMDropTopicReq dropReq = {0};
|
||||
strcpy(dropReq.name, topicName);
|
||||
|
||||
int32_t contLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSMDropTopicReq(pReq, contLen, &dropReq);
|
||||
|
||||
*pContLen = contLen;
|
||||
return pReq;
|
||||
}
|
||||
|
||||
TEST_F(MndTestTopic, 01_Create_Topic) {
|
||||
const char* dbname = "1.d1";
|
||||
const char* topicName = "1.d1.t1";
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
void* pReq = BuildCreateDbReq(dbname, &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, "");
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
void* pReq = BuildCreateTopicReq("t1", "sql", &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_SELECTED);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
void* pReq = BuildCreateTopicReq(topicName, "sql", &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
void* pReq = BuildCreateTopicReq(topicName, "sql", &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TOPIC_ALREADY_EXIST);
|
||||
}
|
||||
|
||||
{
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname);
|
||||
CHECK_META("show topics", 3);
|
||||
|
||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
|
||||
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||
CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, "sql");
|
||||
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
||||
CheckBinary("t1", TSDB_TABLE_NAME_LEN);
|
||||
CheckTimestamp();
|
||||
CheckBinary("sql", TSDB_SHOW_SQL_LEN);
|
||||
|
||||
// restart
|
||||
test.Restart();
|
||||
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname);
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
||||
CheckBinary("t1", TSDB_TABLE_NAME_LEN);
|
||||
CheckTimestamp();
|
||||
CheckBinary("sql", TSDB_SHOW_SQL_LEN);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
void* pReq = BuildDropTopicReq(topicName, &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_TOPIC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, 0);
|
||||
}
|
||||
|
||||
{
|
||||
int32_t contLen = 0;
|
||||
void* pReq = BuildDropTopicReq(topicName, &contLen);
|
||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_TOPIC, pReq, contLen);
|
||||
ASSERT_NE(pRsp, nullptr);
|
||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TOPIC_NOT_EXIST);
|
||||
|
||||
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname);
|
||||
test.SendShowRetrieveReq();
|
||||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue