From 353feec145793dbc8ebda5d5f03442657adf4c54 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 10:50:47 +0800 Subject: [PATCH] serialize topic msg --- include/common/tmsg.h | 34 +++----------------- include/common/tmsgdef.h | 2 +- source/client/src/tmq.c | 4 +-- source/common/src/tmsg.c | 43 ++++++++++++++++++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 2 +- 5 files changed, 52 insertions(+), 33 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 283c007ace..8e0586cd34 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1118,39 +1118,15 @@ typedef struct { char* logicalPlan; } SMCreateTopicReq; -static FORCE_INLINE int tSerializeSMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq) { - int tlen = 0; - tlen += taosEncodeFixedI8(buf, pReq->igExists); - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeString(buf, pReq->sql); - tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += taosEncodeString(buf, pReq->logicalPlan); - return tlen; -} - -static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SMCreateTopicReq* pReq) { - buf = taosDecodeFixedI8(buf, &(pReq->igExists)); - buf = taosDecodeString(buf, &(pReq->name)); - buf = taosDecodeString(buf, &(pReq->sql)); - buf = taosDecodeString(buf, &(pReq->physicalPlan)); - buf = taosDecodeString(buf, &(pReq->logicalPlan)); - return buf; -} +int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq); +void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq); typedef struct { int64_t topicId; -} SCMCreateTopicRsp; +} SMCreateTopicRsp; -static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) { - int tlen = 0; - tlen += taosEncodeFixedI64(buf, pRsp->topicId); - return tlen; -} - -static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) { - buf = taosDecodeFixedI64(buf, &pRsp->topicId); - return buf; -} +int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp); +int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp); typedef struct { int32_t topicNum; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9feb0af081..2f41e574bd 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -137,7 +137,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SCMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index e55f25f5c8..96872c53d5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -368,14 +368,14 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i .logicalPlan = (char*)"no logic plan", }; - int tlen = tSerializeSMCreateTopicReq(NULL, &req); + int tlen = tSerializeMCreateTopicReq(NULL, &req); void* buf = malloc(tlen); if (buf == NULL) { goto _return; } void* abuf = buf; - tSerializeSMCreateTopicReq(&abuf, &req); + tSerializeMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c76848fbc9..a331dd0929 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1816,3 +1816,46 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR return 0; } +int32_t tSerializeMCreateTopicReq(void **buf, const SMCreateTopicReq *pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pReq->igExists); + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeString(buf, pReq->sql); + tlen += taosEncodeString(buf, pReq->physicalPlan); + tlen += taosEncodeString(buf, pReq->logicalPlan); + return tlen; +} + +void *tDeserializeSMCreateTopicReq(void *buf, SMCreateTopicReq *pReq) { + buf = taosDecodeFixedI8(buf, &(pReq->igExists)); + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeString(buf, &(pReq->sql)); + buf = taosDecodeString(buf, &(pReq->physicalPlan)); + buf = taosDecodeString(buf, &(pReq->logicalPlan)); + return buf; +} + +int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 389ebcbb22..040b7c5830 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -265,7 +265,7 @@ static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { char *msgStr = pReq->rpcMsg.pCont; SMCreateTopicReq createTopicReq = {0}; - tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); + tDeserializeSMCreateTopicReq(msgStr, &createTopicReq); mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql);