test for create topic
This commit is contained in:
parent
88ee9d94e4
commit
43de12b5fa
|
@ -1083,15 +1083,16 @@ typedef struct {
|
||||||
} STaskDropRsp;
|
} STaskDropRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t igExists;
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
char* name;
|
int8_t igExists;
|
||||||
char* sql;
|
char* sql;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
} SMCreateTopicReq;
|
} SMCreateTopicReq;
|
||||||
|
|
||||||
int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq);
|
int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq);
|
||||||
void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq);
|
int32_t tDeserializeSMCreateTopicReq(void* buf, int32_t bufLen, SMCreateTopicReq* pReq);
|
||||||
|
void tFreeSMCreateTopicReq(SMCreateTopicReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t topicId;
|
int64_t topicId;
|
||||||
|
@ -1262,7 +1263,7 @@ typedef struct {
|
||||||
int8_t igNotExists;
|
int8_t igNotExists;
|
||||||
} SMDropTopicReq;
|
} SMDropTopicReq;
|
||||||
|
|
||||||
int32_t tSerializeSMDropTopicReqq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||||
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -361,21 +361,20 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
|
||||||
tNameExtractFullName(&name, topicFname);
|
tNameExtractFullName(&name, topicFname);
|
||||||
|
|
||||||
SMCreateTopicReq req = {
|
SMCreateTopicReq req = {
|
||||||
.name = (char*)topicFname,
|
|
||||||
.igExists = 1,
|
.igExists = 1,
|
||||||
.physicalPlan = (char*)pStr,
|
.physicalPlan = (char*)pStr,
|
||||||
.sql = (char*)sql,
|
.sql = (char*)sql,
|
||||||
.logicalPlan = (char*)"no logic plan",
|
.logicalPlan = (char*)"no logic plan",
|
||||||
};
|
};
|
||||||
|
memcpy(req.name, topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
|
||||||
int tlen = tSerializeMCreateTopicReq(NULL, &req);
|
int tlen = tSerializeMCreateTopicReq(NULL, 0, &req);
|
||||||
void* buf = malloc(tlen);
|
void* buf = malloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* abuf = buf;
|
tSerializeMCreateTopicReq(buf, tlen, &req);
|
||||||
tSerializeMCreateTopicReq(&abuf, &req);
|
|
||||||
/*printf("formatted: %s\n", dagStr);*/
|
/*printf("formatted: %s\n", dagStr);*/
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||||
|
|
|
@ -1858,7 +1858,7 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSMDropTopicReqq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) {
|
int32_t tSerializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
@ -1885,23 +1885,67 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeMCreateTopicReq(void **buf, const SMCreateTopicReq *pReq) {
|
int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopicReq *pReq) {
|
||||||
int32_t tlen = 0;
|
int32_t sqlLen = 0;
|
||||||
tlen += taosEncodeFixedI8(buf, pReq->igExists);
|
int32_t physicalPlanLen = 0;
|
||||||
tlen += taosEncodeString(buf, pReq->name);
|
int32_t logicalPlanLen = 0;
|
||||||
tlen += taosEncodeString(buf, pReq->sql);
|
if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql);
|
||||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
if (pReq->physicalPlan != NULL) physicalPlanLen = (int32_t)strlen(pReq->physicalPlan);
|
||||||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
if (pReq->logicalPlan != NULL) logicalPlanLen = (int32_t)strlen(pReq->logicalPlan);
|
||||||
|
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, physicalPlanLen) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, logicalPlanLen) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1;
|
||||||
|
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tDeserializeSMCreateTopicReq(void *buf, SMCreateTopicReq *pReq) {
|
int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq *pReq) {
|
||||||
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
|
int32_t sqlLen = 0;
|
||||||
buf = taosDecodeString(buf, &(pReq->name));
|
int32_t physicalPlanLen = 0;
|
||||||
buf = taosDecodeString(buf, &(pReq->sql));
|
int32_t logicalPlanLen = 0;
|
||||||
buf = taosDecodeString(buf, &(pReq->physicalPlan));
|
|
||||||
buf = taosDecodeString(buf, &(pReq->logicalPlan));
|
SCoder decoder = {0};
|
||||||
return buf;
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &sqlLen) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &physicalPlanLen) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &logicalPlanLen) < 0) return -1;
|
||||||
|
|
||||||
|
pReq->sql = calloc(1, sqlLen + 1);
|
||||||
|
pReq->physicalPlan = calloc(1, physicalPlanLen + 1);
|
||||||
|
pReq->logicalPlan = calloc(1, logicalPlanLen + 1);
|
||||||
|
if (pReq->sql == NULL || pReq->physicalPlan == NULL || pReq->logicalPlan == NULL) return -1;
|
||||||
|
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->physicalPlan) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->logicalPlan) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tFreeSMCreateTopicReq(SMCreateTopicReq *pReq) {
|
||||||
|
tfree(pReq->sql);
|
||||||
|
tfree(pReq->physicalPlan);
|
||||||
|
tfree(pReq->logicalPlan);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) {
|
int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) {
|
||||||
|
|
|
@ -99,6 +99,8 @@ void Testbase::SendShowMetaReq(int8_t showType, const char* db) {
|
||||||
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW, pReq, contLen);
|
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW, pReq, contLen);
|
||||||
ASSERT(pRsp->pCont != nullptr);
|
ASSERT(pRsp->pCont != nullptr);
|
||||||
|
|
||||||
|
if (pRsp->contLen == 0) return;
|
||||||
|
|
||||||
SShowRsp showRsp = {0};
|
SShowRsp showRsp = {0};
|
||||||
tDeserializeSShowRsp(pRsp->pCont, pRsp->contLen, &showRsp);
|
tDeserializeSShowRsp(pRsp->pCont, pRsp->contLen, &showRsp);
|
||||||
tFreeSTableMetaRsp(&metaRsp);
|
tFreeSTableMetaRsp(&metaRsp);
|
||||||
|
|
|
@ -1387,8 +1387,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
|
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
@ -1402,7 +1401,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
|
||||||
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
|
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if (strcmp(pStb->db, dbName) == 0) {
|
if (pStb->dbUid == pDb->uid) {
|
||||||
numOfStbs++;
|
numOfStbs++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1410,6 +1409,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs
|
||||||
}
|
}
|
||||||
|
|
||||||
*pNumOfStbs = numOfStbs;
|
*pNumOfStbs = numOfStbs;
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
|
#include "mndAuth.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
@ -52,6 +53,10 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
|
||||||
|
|
||||||
|
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta);
|
||||||
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic);
|
||||||
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,7 +83,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
|
@ -187,7 +191,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
|
||||||
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
|
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
|
SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName);
|
||||||
if (pTopic == NULL) {
|
if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
|
||||||
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
|
||||||
}
|
}
|
||||||
return pTopic;
|
return pTopic;
|
||||||
|
@ -225,8 +229,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
|
||||||
return pDrop;
|
return pDrop;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCheckCreateTopicMsg(SMCreateTopicReq *creattopReq) {
|
static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) {
|
||||||
// deserialize and other stuff
|
if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) {
|
||||||
|
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,68 +252,121 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq
|
||||||
topicObj.logicalPlan = pCreate->logicalPlan;
|
topicObj.logicalPlan = pCreate->logicalPlan;
|
||||||
topicObj.sqlLen = strlen(pCreate->sql);
|
topicObj.sqlLen = strlen(pCreate->sql);
|
||||||
|
|
||||||
SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||||
if (pTopicRaw == NULL) return -1;
|
if (pTrans == NULL) {
|
||||||
if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1;
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
/*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);*/
|
return -1;
|
||||||
/*mndTransAppendRedolog(pTrans, pTopicRaw);*/
|
}
|
||||||
/*if (mndTransPrepare(pMnode, pTrans) != 0) {*/
|
mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
|
||||||
/*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/
|
|
||||||
/*mndTransDrop(pTrans);*/
|
|
||||||
/*return -1;*/
|
|
||||||
/*}*/
|
|
||||||
/*mndTransDrop(pTrans);*/
|
|
||||||
/*return 0;*/
|
|
||||||
return sdbWrite(pMnode->pSdb, pTopicRaw);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj);
|
||||||
SMnode *pMnode = pReq->pMnode;
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
char *msgStr = pReq->rpcMsg.pCont;
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
SMCreateTopicReq createTopicReq = {0};
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
tDeserializeSMCreateTopicReq(msgStr, &createTopicReq);
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
|
||||||
|
|
||||||
if (mndCheckCreateTopicMsg(&createTopicReq) != 0) {
|
|
||||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
|
|
||||||
if (pTopic != NULL) {
|
|
||||||
sdbRelease(pMnode->pSdb, pTopic);
|
|
||||||
if (createTopicReq.igExists) {
|
|
||||||
mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
|
||||||
mError("topic:%s, failed to create since already exists", createTopicReq.name);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
|
|
||||||
if (pDb == NULL) {
|
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
|
||||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
terrno = code;
|
|
||||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { return 0; }
|
static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->pMnode;
|
||||||
|
int32_t code = -1;
|
||||||
|
SMqTopicObj *pTopic = NULL;
|
||||||
|
SDbObj *pDb = NULL;
|
||||||
|
SUserObj *pUser = NULL;
|
||||||
|
SMCreateTopicReq createTopicReq = {0};
|
||||||
|
|
||||||
|
if (tDeserializeSMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);
|
||||||
|
|
||||||
|
if (mndCheckCreateTopicReq(&createTopicReq) != 0) {
|
||||||
|
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTopic = mndAcquireTopic(pMnode, createTopicReq.name);
|
||||||
|
if (pTopic != NULL) {
|
||||||
|
if (createTopicReq.igExists) {
|
||||||
|
mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name);
|
||||||
|
code = 0;
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
}
|
||||||
|
} else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) {
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name);
|
||||||
|
if (pDb == NULL) {
|
||||||
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
|
if (pUser == NULL) {
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckWriteAuth(pUser, pDb) != 0) {
|
||||||
|
goto CREATE_TOPIC_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb);
|
||||||
|
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
CREATE_TOPIC_OVER:
|
||||||
|
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
|
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
mndReleaseUser(pMnode, pUser);
|
||||||
|
|
||||||
|
tFreeSMCreateTopicReq(&createTopicReq);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) {
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
|
||||||
|
|
||||||
|
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
|
||||||
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) {
|
static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
|
@ -419,8 +479,7 @@ static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
@ -434,12 +493,15 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
|
||||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
numOfTopics++;
|
if (pTopic->dbUid == pDb->uid) {
|
||||||
|
numOfTopics++;
|
||||||
|
}
|
||||||
|
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
*pNumOfTopics = numOfTopics;
|
*pNumOfTopics = numOfTopics;
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,6 +528,12 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
|
||||||
pSchema[cols].bytes = pShow->bytes[cols];
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
|
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
|
||||||
|
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
strcpy(pSchema[cols].name, "sql");
|
||||||
|
pSchema[cols].bytes = pShow->bytes[cols];
|
||||||
|
cols++;
|
||||||
|
|
||||||
pMeta->numOfColumns = cols;
|
pMeta->numOfColumns = cols;
|
||||||
pShow->numOfColumns = cols;
|
pShow->numOfColumns = cols;
|
||||||
|
|
||||||
|
@ -522,6 +590,10 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in
|
||||||
*(int64_t *)pWrite = pTopic->createTime;
|
*(int64_t *)pWrite = pTopic->createTime;
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
|
||||||
|
cols++;
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,3 +13,4 @@ add_subdirectory(mnode)
|
||||||
add_subdirectory(db)
|
add_subdirectory(db)
|
||||||
add_subdirectory(stb)
|
add_subdirectory(stb)
|
||||||
add_subdirectory(func)
|
add_subdirectory(func)
|
||||||
|
add_subdirectory(topic)
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
aux_source_directory(. TOPIC_SRC)
|
||||||
|
add_executable(mnode_test_topic ${TOPIC_SRC})
|
||||||
|
target_link_libraries(
|
||||||
|
mnode_test_topic
|
||||||
|
PUBLIC sut
|
||||||
|
)
|
||||||
|
|
||||||
|
add_test(
|
||||||
|
NAME mnode_test_topic
|
||||||
|
COMMAND mnode_test_topic
|
||||||
|
)
|
|
@ -0,0 +1,177 @@
|
||||||
|
/**
|
||||||
|
* @file topic.cpp
|
||||||
|
* @author slguan (slguan@taosdata.com)
|
||||||
|
* @brief MNODE module topic tests
|
||||||
|
* @version 1.0
|
||||||
|
* @date 2022-02-16
|
||||||
|
*
|
||||||
|
* @copyright Copyright (c) 2022
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "sut.h"
|
||||||
|
|
||||||
|
class MndTestTopic : public ::testing::Test {
|
||||||
|
protected:
|
||||||
|
static void SetUpTestSuite() { test.Init("/tmp/mnode_test_topic", 9039); }
|
||||||
|
static void TearDownTestSuite() { test.Cleanup(); }
|
||||||
|
|
||||||
|
static Testbase test;
|
||||||
|
|
||||||
|
public:
|
||||||
|
void SetUp() override {}
|
||||||
|
void TearDown() override {}
|
||||||
|
|
||||||
|
void* BuildCreateDbReq(const char* dbname, int32_t* pContLen);
|
||||||
|
void* BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen);
|
||||||
|
void* BuildDropTopicReq(const char* topicName, int32_t* pContLen);
|
||||||
|
};
|
||||||
|
|
||||||
|
Testbase MndTestTopic::test;
|
||||||
|
|
||||||
|
void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) {
|
||||||
|
SCreateDbReq createReq = {0};
|
||||||
|
strcpy(createReq.db, dbname);
|
||||||
|
createReq.numOfVgroups = 2;
|
||||||
|
createReq.cacheBlockSize = 16;
|
||||||
|
createReq.totalBlocks = 10;
|
||||||
|
createReq.daysPerFile = 10;
|
||||||
|
createReq.daysToKeep0 = 3650;
|
||||||
|
createReq.daysToKeep1 = 3650;
|
||||||
|
createReq.daysToKeep2 = 3650;
|
||||||
|
createReq.minRows = 100;
|
||||||
|
createReq.maxRows = 4096;
|
||||||
|
createReq.commitTime = 3600;
|
||||||
|
createReq.fsyncPeriod = 3000;
|
||||||
|
createReq.walLevel = 1;
|
||||||
|
createReq.precision = 0;
|
||||||
|
createReq.compression = 2;
|
||||||
|
createReq.replications = 1;
|
||||||
|
createReq.quorum = 1;
|
||||||
|
createReq.update = 0;
|
||||||
|
createReq.cacheLastRow = 0;
|
||||||
|
createReq.ignoreExist = 1;
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSCreateDbReq(pReq, contLen, &createReq);
|
||||||
|
|
||||||
|
*pContLen = contLen;
|
||||||
|
return pReq;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) {
|
||||||
|
SMCreateTopicReq createReq = {0};
|
||||||
|
strcpy(createReq.name, topicName);
|
||||||
|
createReq.igExists = 0;
|
||||||
|
createReq.sql = (char*)sql;
|
||||||
|
createReq.physicalPlan = (char*)"physicalPlan";
|
||||||
|
createReq.logicalPlan = (char*)"logicalPlan";
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeMCreateTopicReq(NULL, 0, &createReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeMCreateTopicReq(pReq, contLen, &createReq);
|
||||||
|
|
||||||
|
*pContLen = contLen;
|
||||||
|
return pReq;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* MndTestTopic::BuildDropTopicReq(const char* topicName, int32_t* pContLen) {
|
||||||
|
SMDropTopicReq dropReq = {0};
|
||||||
|
strcpy(dropReq.name, topicName);
|
||||||
|
|
||||||
|
int32_t contLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSMDropTopicReq(pReq, contLen, &dropReq);
|
||||||
|
|
||||||
|
*pContLen = contLen;
|
||||||
|
return pReq;
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(MndTestTopic, 01_Create_Topic) {
|
||||||
|
const char* dbname = "1.d1";
|
||||||
|
const char* topicName = "1.d1.t1";
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void* pReq = BuildCreateDbReq(dbname, &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void* pReq = BuildCreateTopicReq("t1", "sql", &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_SELECTED);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void* pReq = BuildCreateTopicReq(topicName, "sql", &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void* pReq = BuildCreateTopicReq(topicName, "sql", &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TOPIC_ALREADY_EXIST);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname);
|
||||||
|
CHECK_META("show topics", 3);
|
||||||
|
|
||||||
|
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name");
|
||||||
|
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||||
|
CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, "sql");
|
||||||
|
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
|
|
||||||
|
CheckBinary("t1", TSDB_TABLE_NAME_LEN);
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckBinary("sql", TSDB_SHOW_SQL_LEN);
|
||||||
|
|
||||||
|
// restart
|
||||||
|
test.Restart();
|
||||||
|
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname);
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
|
|
||||||
|
CheckBinary("t1", TSDB_TABLE_NAME_LEN);
|
||||||
|
CheckTimestamp();
|
||||||
|
CheckBinary("sql", TSDB_SHOW_SQL_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void* pReq = BuildDropTopicReq(topicName, &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_TOPIC, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = 0;
|
||||||
|
void* pReq = BuildDropTopicReq(topicName, &contLen);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_TOPIC, pReq, contLen);
|
||||||
|
ASSERT_NE(pRsp, nullptr);
|
||||||
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TOPIC_NOT_EXIST);
|
||||||
|
|
||||||
|
test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname);
|
||||||
|
test.SendShowRetrieveReq();
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 0);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue