diff --git a/include/common/tmsg.h b/include/common/tmsg.h index be37fb9e9c..b32a74e9df 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1313,19 +1313,13 @@ typedef struct { int64_t status; } SMVSubscribeRsp; -typedef struct { - char name[TSDB_TOPIC_NAME_LEN]; - int8_t igExists; - int32_t execLen; - void* executor; - int32_t sqlLen; - char* sql; -} SCreateTopicReq; - typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; -} SDropTopicReq; +} SMDropTopicReq; + +int32_t tSerializeSMDropTopicReqq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); +int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 411acbfcfd..2544ab1c81 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1788,3 +1788,30 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq tCoderClear(&decoder); return 0; } + +int32_t tSerializeSMDropTopicReqq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 56c14bcfd3..eb2b010ef7 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -31,12 +31,12 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); -static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq); +static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq); +static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp); static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq); -static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); int32_t mndInitTopic(SMnode *pMnode) { @@ -48,8 +48,8 @@ int32_t mndInitTopic(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndTopicActionUpdate, .deleteFp = (SdbDeleteFp)mndTopicActionDelete}; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicMsg); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicMsg); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); return sdbSetTable(pMnode->pSdb, table); @@ -230,7 +230,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { return 0; } -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -248,7 +248,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); if (pTopicRaw == NULL) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; - /*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);*/ + /*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);*/ /*mndTransAppendRedolog(pTrans, pTopicRaw);*/ /*if (mndTransPrepare(pMnode, pTrans) != 0) {*/ /*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/ @@ -260,9 +260,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq return sdbWrite(pMnode->pSdb, pTopicRaw); } -static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - char *msgStr = pMsg->rpcMsg.pCont; +static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + char *msgStr = pReq->rpcMsg.pCont; SCMCreateTopicReq createTopicReq = {0}; tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); @@ -294,7 +294,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return -1; } - int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb); + int32_t code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb); mndReleaseDb(pMnode, pDb); if (code != TSDB_CODE_SUCCESS) { @@ -306,40 +306,45 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; } +static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { return 0; } -static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SDropTopicReq *pDrop = pMsg->rpcMsg.pCont; +static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMDropTopicReq dropReq = {0}; - mDebug("topic:%s, start to drop", pDrop->name); + if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); + mDebug("topic:%s, start to drop", dropReq.name); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); if (pTopic == NULL) { - if (pDrop->igNotExists) { - mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name); + if (dropReq.igNotExists) { + mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); return 0; } else { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } } - int32_t code = mndDropTopic(pMnode, pMsg, pTopic); + int32_t code = mndDropTopic(pMnode, pReq, pTopic); mndReleaseTopic(pMnode, pTopic); if (code != 0) { terrno = code; - mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } @@ -405,8 +410,8 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) { mndReleaseDb(pMnode, pDb); mndReleaseTopic(pMnode, pTopic); - pMsg->pCont = pMeta; - pMsg->contLen = contLen; + pReq->pCont = pMeta; + pReq->contLen = contLen; mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags); #endif @@ -438,8 +443,8 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo return 0; } -static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) { @@ -501,8 +506,8 @@ static void mndExtractTableName(char *tableId, char *name) { } } -static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SMqTopicObj *pTopic = NULL;