From 51f46e40d517536879f833556979e513a3933472 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 10:25:14 +0800 Subject: [PATCH 1/4] serialize drop topic req --- include/common/tmsg.h | 14 ++---- source/common/src/tmsg.c | 27 ++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 69 ++++++++++++++------------ 3 files changed, 68 insertions(+), 42 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index be37fb9e9c..b32a74e9df 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1313,19 +1313,13 @@ typedef struct { int64_t status; } SMVSubscribeRsp; -typedef struct { - char name[TSDB_TOPIC_NAME_LEN]; - int8_t igExists; - int32_t execLen; - void* executor; - int32_t sqlLen; - char* sql; -} SCreateTopicReq; - typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t igNotExists; -} SDropTopicReq; +} SMDropTopicReq; + +int32_t tSerializeSMDropTopicReqq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); +int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 411acbfcfd..2544ab1c81 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1788,3 +1788,30 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq tCoderClear(&decoder); return 0; } + +int32_t tSerializeSMDropTopicReqq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 56c14bcfd3..eb2b010ef7 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -31,12 +31,12 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic); static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic); -static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg); +static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq); +static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq); +static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp); static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq); -static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); int32_t mndInitTopic(SMnode *pMnode) { @@ -48,8 +48,8 @@ int32_t mndInitTopic(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndTopicActionUpdate, .deleteFp = (SdbDeleteFp)mndTopicActionDelete}; - mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicMsg); - mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicMsg); + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); return sdbSetTable(pMnode->pSdb, table); @@ -230,7 +230,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { return 0; } -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -248,7 +248,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); if (pTopicRaw == NULL) return -1; if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; - /*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);*/ + /*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);*/ /*mndTransAppendRedolog(pTrans, pTopicRaw);*/ /*if (mndTransPrepare(pMnode, pTrans) != 0) {*/ /*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/ @@ -260,9 +260,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq return sdbWrite(pMnode->pSdb, pTopicRaw); } -static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - char *msgStr = pMsg->rpcMsg.pCont; +static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + char *msgStr = pReq->rpcMsg.pCont; SCMCreateTopicReq createTopicReq = {0}; tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); @@ -294,7 +294,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return -1; } - int32_t code = mndCreateTopic(pMnode, pMsg, &createTopicReq, pDb); + int32_t code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb); mndReleaseDb(pMnode, pDb); if (code != TSDB_CODE_SUCCESS) { @@ -306,40 +306,45 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, SMqTopicObj *pTopic) { return 0; } +static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { return 0; } -static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SDropTopicReq *pDrop = pMsg->rpcMsg.pCont; +static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SMDropTopicReq dropReq = {0}; - mDebug("topic:%s, start to drop", pDrop->name); + if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pDrop->name); + mDebug("topic:%s, start to drop", dropReq.name); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); if (pTopic == NULL) { - if (pDrop->igNotExists) { - mDebug("topic:%s, not exist, ignore not exist is set", pDrop->name); + if (dropReq.igNotExists) { + mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); return 0; } else { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } } - int32_t code = mndDropTopic(pMnode, pMsg, pTopic); + int32_t code = mndDropTopic(pMnode, pReq, pTopic); mndReleaseTopic(pMnode, pTopic); if (code != 0) { terrno = code; - mError("topic:%s, failed to drop since %s", pDrop->name, terrstr()); + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); return -1; } return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) { - mndTransProcessRsp(pMsg); +static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); return 0; } @@ -405,8 +410,8 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) { mndReleaseDb(pMnode, pDb); mndReleaseTopic(pMnode, pTopic); - pMsg->pCont = pMeta; - pMsg->contLen = contLen; + pReq->pCont = pMeta; + pReq->contLen = contLen; mDebug("topic:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pTopic->numOfColumns, pTopic->numOfTags); #endif @@ -438,8 +443,8 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo return 0; } -static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; if (mndGetNumOfTopics(pMnode, pShow->db, &pShow->numOfRows) != 0) { @@ -501,8 +506,8 @@ static void mndExtractTableName(char *tableId, char *name) { } } -static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SMqTopicObj *pTopic = NULL; From abd6b98ba4f22dc19edc9b5d61a0d0e52d29cee3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 10:34:29 +0800 Subject: [PATCH 2/4] serialize create topic req --- include/common/tmsg.h | 6 +-- include/common/tmsgdef.h | 2 +- source/client/src/tmq.c | 6 +-- source/common/src/tmsg.c | 1 + source/dnode/mnode/impl/src/mndStb.c | 2 +- source/dnode/mnode/impl/src/mndTopic.c | 57 ++++++++------------------ 6 files changed, 25 insertions(+), 49 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b32a74e9df..283c007ace 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1116,9 +1116,9 @@ typedef struct { char* sql; char* physicalPlan; char* logicalPlan; -} SCMCreateTopicReq; +} SMCreateTopicReq; -static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { +static FORCE_INLINE int tSerializeSMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq) { int tlen = 0; tlen += taosEncodeFixedI8(buf, pReq->igExists); tlen += taosEncodeString(buf, pReq->name); @@ -1128,7 +1128,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT return tlen; } -static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { +static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SMCreateTopicReq* pReq) { buf = taosDecodeFixedI8(buf, &(pReq->igExists)); buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeString(buf, &(pReq->sql)); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cf890f5cbb..9feb0af081 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -137,7 +137,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SCMCreateTopicReq, SCMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SCMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 315a632180..e55f25f5c8 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -360,7 +360,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i char topicFname[TSDB_TOPIC_FNAME_LEN] = {0}; tNameExtractFullName(&name, topicFname); - SCMCreateTopicReq req = { + SMCreateTopicReq req = { .name = (char*)topicFname, .igExists = 1, .physicalPlan = (char*)pStr, @@ -368,14 +368,14 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i .logicalPlan = (char*)"no logic plan", }; - int tlen = tSerializeSCMCreateTopicReq(NULL, &req); + int tlen = tSerializeSMCreateTopicReq(NULL, &req); void* buf = malloc(tlen); if (buf == NULL) { goto _return; } void* abuf = buf; - tSerializeSCMCreateTopicReq(&abuf, &req); + tSerializeSMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2544ab1c81..c76848fbc9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1815,3 +1815,4 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR tCoderClear(&decoder); return 0; } + diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1fc1b25776..31d85d2b72 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1497,7 +1497,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 if (pShow->pIter == NULL) break; if (pStb->dbUid != pDb->uid) { - if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) { + if (strncmp(pStb->db, pDb->name, prefixLen) == 0) { mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index eb2b010ef7..389ebcbb22 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -34,7 +34,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq); static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp); -static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq); +static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq); static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); @@ -225,12 +225,12 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq return pDrop; } -static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { +static int32_t mndCheckCreateTopicMsg(SMCreateTopicReq *creattopReq) { // deserialize and other stuff return 0; } -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -264,7 +264,7 @@ static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; char *msgStr = pReq->rpcMsg.pCont; - SCMCreateTopicReq createTopicReq = {0}; + SMCreateTopicReq createTopicReq = {0}; tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); @@ -348,7 +348,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) { +static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; STableInfoReq infoReq = {0}; @@ -466,18 +466,6 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * pSchema[cols].bytes = pShow->bytes[cols]; cols++; - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "columns"); - pSchema[cols].bytes = pShow->bytes[cols]; - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "tags"); - pSchema[cols].bytes = pShow->bytes[cols]; - cols++; - pMeta->numOfColumns = cols; pShow->numOfColumns = cols; @@ -493,19 +481,6 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static void mndExtractTableName(char *tableId, char *name) { - int32_t pos = -1; - int32_t num = 0; - for (pos = 0; tableId[pos] != 0; ++pos) { - if (tableId[pos] == '.') num++; - if (num == 2) break; - } - - if (num == 2) { - strcpy(name, tableId + pos + 1); - } -} - static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -513,9 +488,12 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in SMqTopicObj *pTopic = NULL; int32_t cols = 0; char *pWrite; - char prefix[64] = {0}; + char prefix[TSDB_DB_FNAME_LEN] = {0}; - tstrncpy(prefix, pShow->db, 64); + SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); + if (pDb == NULL) return 0; + + tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN); strcat(prefix, TS_PATH_DELIMITER); int32_t prefixLen = (int32_t)strlen(prefix); @@ -523,7 +501,11 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); if (pShow->pIter == NULL) break; - if (strncmp(pTopic->name, prefix, prefixLen) != 0) { + if (pTopic->dbUid != pDb->uid) { + if (strncmp(pTopic->name, prefix, prefixLen) != 0) { + mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); + } + sdbRelease(pSdb, pTopic); continue; } @@ -540,18 +522,11 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in *(int64_t *)pWrite = pTopic->createTime; cols++; - /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ - /**(int32_t *)pWrite = pTopic->numOfColumns;*/ - /*cols++;*/ - - /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ - /**(int32_t *)pWrite = pTopic->numOfTags;*/ - /*cols++;*/ - numOfRows++; sdbRelease(pSdb, pTopic); } + mndReleaseDb(pMnode, pDb); pShow->numOfReads += numOfRows; mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows; From 353feec145793dbc8ebda5d5f03442657adf4c54 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 10:50:47 +0800 Subject: [PATCH 3/4] serialize topic msg --- include/common/tmsg.h | 34 +++----------------- include/common/tmsgdef.h | 2 +- source/client/src/tmq.c | 4 +-- source/common/src/tmsg.c | 43 ++++++++++++++++++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 2 +- 5 files changed, 52 insertions(+), 33 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 283c007ace..8e0586cd34 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1118,39 +1118,15 @@ typedef struct { char* logicalPlan; } SMCreateTopicReq; -static FORCE_INLINE int tSerializeSMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq) { - int tlen = 0; - tlen += taosEncodeFixedI8(buf, pReq->igExists); - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeString(buf, pReq->sql); - tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += taosEncodeString(buf, pReq->logicalPlan); - return tlen; -} - -static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SMCreateTopicReq* pReq) { - buf = taosDecodeFixedI8(buf, &(pReq->igExists)); - buf = taosDecodeString(buf, &(pReq->name)); - buf = taosDecodeString(buf, &(pReq->sql)); - buf = taosDecodeString(buf, &(pReq->physicalPlan)); - buf = taosDecodeString(buf, &(pReq->logicalPlan)); - return buf; -} +int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq); +void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq); typedef struct { int64_t topicId; -} SCMCreateTopicRsp; +} SMCreateTopicRsp; -static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) { - int tlen = 0; - tlen += taosEncodeFixedI64(buf, pRsp->topicId); - return tlen; -} - -static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) { - buf = taosDecodeFixedI64(buf, &pRsp->topicId); - return buf; -} +int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp); +int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp); typedef struct { int32_t topicNum; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 9feb0af081..2f41e574bd 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -137,7 +137,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SCMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index e55f25f5c8..96872c53d5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -368,14 +368,14 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i .logicalPlan = (char*)"no logic plan", }; - int tlen = tSerializeSMCreateTopicReq(NULL, &req); + int tlen = tSerializeMCreateTopicReq(NULL, &req); void* buf = malloc(tlen); if (buf == NULL) { goto _return; } void* abuf = buf; - tSerializeSMCreateTopicReq(&abuf, &req); + tSerializeMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c76848fbc9..a331dd0929 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1816,3 +1816,46 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR return 0; } +int32_t tSerializeMCreateTopicReq(void **buf, const SMCreateTopicReq *pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pReq->igExists); + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeString(buf, pReq->sql); + tlen += taosEncodeString(buf, pReq->physicalPlan); + tlen += taosEncodeString(buf, pReq->logicalPlan); + return tlen; +} + +void *tDeserializeSMCreateTopicReq(void *buf, SMCreateTopicReq *pReq) { + buf = taosDecodeFixedI8(buf, &(pReq->igExists)); + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeString(buf, &(pReq->sql)); + buf = taosDecodeString(buf, &(pReq->physicalPlan)); + buf = taosDecodeString(buf, &(pReq->logicalPlan)); + return buf; +} + +int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->topicId) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->topicId) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 389ebcbb22..040b7c5830 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -265,7 +265,7 @@ static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { char *msgStr = pReq->rpcMsg.pCont; SMCreateTopicReq createTopicReq = {0}; - tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); + tDeserializeSMCreateTopicReq(msgStr, &createTopicReq); mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); From c8cc7ed3a69f4752b8c2d7db421400bf5382c31d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 11:45:44 +0800 Subject: [PATCH 4/4] serialize connect msg --- include/common/tmsg.h | 68 ++++------- source/client/src/clientImpl.c | 38 +++---- source/client/src/clientMsgHandler.c | 34 +++--- source/common/src/tmsg.c | 107 ++++++++++++++++++ source/dnode/mnode/impl/src/mndMnode.c | 2 +- source/dnode/mnode/impl/src/mndProfile.c | 55 ++++----- .../dnode/mnode/impl/test/profile/profile.cpp | 88 +++++++------- source/dnode/mnode/impl/test/show/show.cpp | 12 +- 8 files changed, 238 insertions(+), 166 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8e0586cd34..59f8d38345 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -234,9 +234,9 @@ typedef struct { void* pMsg; } SSubmitMsgIter; -int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); -int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); -int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +int32_t tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); +int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); typedef struct { @@ -295,6 +295,17 @@ int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); void tFreeSMAltertbReq(SMAltertbReq* pReq); +typedef struct SEpSet { + int8_t inUse; + int8_t numOfEps; + SEp eps[TSDB_MAX_REPLICA]; +} SEpSet; + +int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp); +int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); +int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); +void* taosDecodeSEpSet(void* buf, SEpSet* pEp); + typedef struct { int32_t pid; char app[TSDB_APP_NAME_LEN]; @@ -302,62 +313,21 @@ typedef struct { int64_t startTime; } SConnectReq; -typedef struct SEpSet { - int8_t inUse; - int8_t numOfEps; - SEp eps[TSDB_MAX_REPLICA]; -} SEpSet; - -static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { - int tlen = 0; - tlen += taosEncodeFixedI8(buf, pEp->inUse); - tlen += taosEncodeFixedI8(buf, pEp->numOfEps); - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - tlen += taosEncodeFixedU16(buf, pEp->eps[i].port); - tlen += taosEncodeString(buf, pEp->eps[i].fqdn); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { - buf = taosDecodeFixedI8(buf, &pEp->inUse); - buf = taosDecodeFixedI8(buf, &pEp->numOfEps); - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - buf = taosDecodeFixedU16(buf, &pEp->eps[i].port); - buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn); - } - return buf; -} -static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) { - if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; - if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; - if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1; - } - return 0; -} - -static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) { - if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; - if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; - } - return 0; -} +int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); +int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); typedef struct { int32_t acctId; int64_t clusterId; int32_t connId; int8_t superUser; - int8_t align[3]; SEpSet epSet; char sVersion[128]; } SConnectRsp; +int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); +int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); + typedef struct { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index dfe7b12ce4..b8efa8213d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -357,40 +357,38 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t return pTscObj; } -static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) { - SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - pMsgSendInfo->msgType = TDMT_MND_CONNECT; - pMsgSendInfo->msgInfo.len = sizeof(SConnectReq); + pMsgSendInfo->msgType = TDMT_MND_CONNECT; pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; - pMsgSendInfo->param = pRequest; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; + pMsgSendInfo->param = pRequest; - SConnectReq *pConnect = calloc(1, sizeof(SConnectReq)); - if (pConnect == NULL) { - tfree(pMsgSendInfo); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return NULL; - } - - STscObj *pObj = pRequest->pTscObj; + SConnectReq connectReq = {0}; + STscObj* pObj = pRequest->pTscObj; char* db = getDbOfConnection(pObj); if (db != NULL) { - tstrncpy(pConnect->db, db, sizeof(pConnect->db)); + tstrncpy(connectReq.db, db, sizeof(connectReq.db)); } tfree(db); - pConnect->pid = htonl(appInfo.pid); - pConnect->startTime = htobe64(appInfo.startTime); - tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); + connectReq.pid = htonl(appInfo.pid); + connectReq.startTime = htobe64(appInfo.startTime); + tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); - pMsgSendInfo->msgInfo.pData = pConnect; + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = malloc(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); + + pMsgSendInfo->msgInfo.len = contLen; + pMsgSendInfo->msgInfo.pData = pReq; return pMsgSendInfo; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 79ec08e14d..48c9920c0c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -45,41 +45,35 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; - SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; - pConnect->acctId = htonl(pConnect->acctId); - pConnect->connId = htonl(pConnect->connId); - pConnect->clusterId = htobe64(pConnect->clusterId); + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp); + assert(connectRsp.epSet.numOfEps > 0); - assert(pConnect->epSet.numOfEps > 0); - for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { - pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port); + if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); } - if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) { - updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet); - } - - for (int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { + for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) { tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i, - pConnect->epSet.eps[i].fqdn, pConnect->epSet.eps[i].port, pTscObj->id); + connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id); } - pTscObj->connId = pConnect->connId; - pTscObj->acctId = pConnect->acctId; - tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver)); + pTscObj->connId = connectRsp.connId; + pTscObj->acctId = connectRsp.acctId; + tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver)); // update the appInstInfo - pTscObj->pAppInfo->clusterId = pConnect->clusterId; + pTscObj->pAppInfo->clusterId = connectRsp.clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); pTscObj->connType = HEARTBEAT_TYPE_QUERY; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY); // pRequest->body.resInfo.pRspMsg = pMsg->pData; - tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, + tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, pTscObj->pAppInfo->numOfConns); free(pMsg->pData); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a331dd0929..e763c0c85f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -85,6 +85,47 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { } } +int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { + if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; + if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; + if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSEpSet(SCoder *pDecoder, SEpSet *pEp) { + if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; + if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} + +int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pEp->inUse); + tlen += taosEncodeFixedI8(buf, pEp->numOfEps); + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + tlen += taosEncodeFixedU16(buf, pEp->eps[i].port); + tlen += taosEncodeString(buf, pEp->eps[i].fqdn); + } + return tlen; +} + +void *taosDecodeSEpSet(void *buf, SEpSet *pEp) { + buf = taosDecodeFixedI8(buf, &pEp->inUse); + buf = taosDecodeFixedI8(buf, &pEp->numOfEps); + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + buf = taosDecodeFixedU16(buf, &pEp->eps[i].port); + buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn); + } + return buf; +} + static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; @@ -1859,3 +1900,69 @@ int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp tCoderClear(&decoder); return 0; } + +int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->pid) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->app) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; + if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; + if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 2e8664b039..1ba9f77b45 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -243,7 +243,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { pEpSet->inUse = pEpSet->numOfEps; } - addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port)); + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); sdbRelease(pSdb, pObj); } } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 5e6909d01a..566dcff380 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -184,15 +184,17 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { } static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SUserObj *pUser = NULL; - SDbObj *pDb = NULL; - SConnObj *pConn = NULL; - int32_t code = -1; + SMnode *pMnode = pReq->pMnode; + SUserObj *pUser = NULL; + SDbObj *pDb = NULL; + SConnObj *pConn = NULL; + int32_t code = -1; + SConnectReq connReq = {0}; - SConnectReq *pConnReq = pReq->rpcMsg.pCont; - pConnReq->pid = htonl(pConnReq->pid); - pConnReq->startTime = htobe64(pConnReq->startTime); + if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto CONN_OVER; + } SRpcConnInfo info = {0}; if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { @@ -209,41 +211,42 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { goto CONN_OVER; } - if (pConnReq->db[0]) { - snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db); + if (connReq.db[0]) { + snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db); pDb = mndAcquireDb(pMnode, pReq->db); if (pDb == NULL) { terrno = TSDB_CODE_MND_INVALID_DB; - mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr()); + mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr()); goto CONN_OVER; } } - pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime); + pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); goto CONN_OVER; } - SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr()); - goto CONN_OVER; - } + SConnectRsp connectRsp = {0}; + connectRsp.acctId = pUser->acctId; + connectRsp.superUser = pUser->superUser; + connectRsp.clusterId = pMnode->clusterId; + connectRsp.connId = pConn->id; - pRsp->acctId = htonl(pUser->acctId); - pRsp->superUser = pUser->superUser; - pRsp->clusterId = htobe64(pMnode->clusterId); - pRsp->connId = htonl(pConn->id); + snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, + gitinfo); + mndGetMnodeEpSet(pMnode, &connectRsp.epSet); - snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); - mndGetMnodeEpSet(pMnode, &pRsp->epSet); + int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp); + if (contLen < 0) goto CONN_OVER; + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) goto CONN_OVER; + tSerializeSConnectRsp(pRsp, contLen, &connectRsp); - pReq->contLen = sizeof(SConnectRsp); + pReq->contLen = contLen; pReq->pCont = pRsp; - mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app); + mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app); code = 0; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 5b7bcd47b8..4f4197cd64 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -28,44 +28,44 @@ Testbase MndTestProfile::test; int32_t MndTestProfile::connId; TEST_F(MndTestProfile, 01_ConnectMsg) { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_profile"); + strcpy(connectReq.db, ""); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_profile"); - strcpy(pReq->db, ""); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; - ASSERT_NE(pRsp, nullptr); - pRsp->acctId = htonl(pRsp->acctId); - pRsp->clusterId = htobe64(pRsp->clusterId); - pRsp->connId = htonl(pRsp->connId); - pRsp->epSet.eps[0].port = htons(pRsp->epSet.eps[0].port); + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - EXPECT_EQ(pRsp->acctId, 1); - EXPECT_GT(pRsp->clusterId, 0); - EXPECT_EQ(pRsp->connId, 1); - EXPECT_EQ(pRsp->superUser, 1); + EXPECT_EQ(connectRsp.acctId, 1); + EXPECT_GT(connectRsp.clusterId, 0); + EXPECT_EQ(connectRsp.connId, 1); + EXPECT_EQ(connectRsp.superUser, 1); - EXPECT_EQ(pRsp->epSet.inUse, 0); - EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.eps[0].port, 9031); - EXPECT_STREQ(pRsp->epSet.eps[0].fqdn, "localhost"); + EXPECT_EQ(connectRsp.epSet.inUse, 0); + EXPECT_EQ(connectRsp.epSet.numOfEps, 1); + EXPECT_EQ(connectRsp.epSet.eps[0].port, 9031); + EXPECT_STREQ(connectRsp.epSet.eps[0].fqdn, "localhost"); - connId = pRsp->connId; + connId = connectRsp.connId; } TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_profile"); + strcpy(connectReq.db, "invalid_db"); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_profile"); - strcpy(pReq->db, "invalid_db"); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -194,35 +194,33 @@ TEST_F(MndTestProfile, 05_KillConnMsg) { } { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_profile"); + strcpy(connectReq.db, "invalid_db"); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_profile"); - strcpy(pReq->db, ""); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; - ASSERT_NE(pRsp, nullptr); - pRsp->acctId = htonl(pRsp->acctId); - pRsp->clusterId = htobe64(pRsp->clusterId); - pRsp->connId = htonl(pRsp->connId); - pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - EXPECT_EQ(pRsp->acctId, 1); - EXPECT_GT(pRsp->clusterId, 0); - EXPECT_GT(pRsp->connId, connId); - EXPECT_EQ(pRsp->superUser, 1); + EXPECT_EQ(connectRsp.acctId, 1); + EXPECT_GT(connectRsp.clusterId, 0); + EXPECT_GT(connectRsp.connId, connId); + EXPECT_EQ(connectRsp.superUser, 1); - EXPECT_EQ(pRsp->epSet.inUse, 0); - EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9031); - EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); + EXPECT_EQ(connectRsp.epSet.inUse, 0); + EXPECT_EQ(connectRsp.epSet.numOfEps, 1); + EXPECT_EQ(connectRsp.epSet.port[0], 9031); + EXPECT_STREQ(connectRsp.epSet.fqdn[0], "localhost"); - connId = pRsp->connId; + connId = connectRsp.connId; } #endif } diff --git a/source/dnode/mnode/impl/test/show/show.cpp b/source/dnode/mnode/impl/test/show/show.cpp index e7e17d65c6..a57d99a257 100644 --- a/source/dnode/mnode/impl/test/show/show.cpp +++ b/source/dnode/mnode/impl/test/show/show.cpp @@ -54,12 +54,14 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { } TEST_F(MndTestShow, 03_ShowMsg_Conn) { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_show"); + strcpy(connectReq.db, ""); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_show"); - strcpy(pReq->db, ""); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pRsp, nullptr);