diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b32a74e9df..283c007ace 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1116,9 +1116,9 @@ typedef struct { char* sql; char* physicalPlan; char* logicalPlan; -} SCMCreateTopicReq; +} SMCreateTopicReq; -static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { +static FORCE_INLINE int tSerializeSMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq) { int tlen = 0; tlen += taosEncodeFixedI8(buf, pReq->igExists); tlen += taosEncodeString(buf, pReq->name); @@ -1128,7 +1128,7 @@ static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateT return tlen; } -static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { +static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SMCreateTopicReq* pReq) { buf = taosDecodeFixedI8(buf, &(pReq->igExists)); buf = taosDecodeString(buf, &(pReq->name)); buf = taosDecodeString(buf, &(pReq->sql)); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cf890f5cbb..9feb0af081 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -137,7 +137,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SCMCreateTopicReq, SCMCreateTopicRsp) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SCMCreateTopicRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 315a632180..e55f25f5c8 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -360,7 +360,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i char topicFname[TSDB_TOPIC_FNAME_LEN] = {0}; tNameExtractFullName(&name, topicFname); - SCMCreateTopicReq req = { + SMCreateTopicReq req = { .name = (char*)topicFname, .igExists = 1, .physicalPlan = (char*)pStr, @@ -368,14 +368,14 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i .logicalPlan = (char*)"no logic plan", }; - int tlen = tSerializeSCMCreateTopicReq(NULL, &req); + int tlen = tSerializeSMCreateTopicReq(NULL, &req); void* buf = malloc(tlen); if (buf == NULL) { goto _return; } void* abuf = buf; - tSerializeSCMCreateTopicReq(&abuf, &req); + tSerializeSMCreateTopicReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2544ab1c81..c76848fbc9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1815,3 +1815,4 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR tCoderClear(&decoder); return 0; } + diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1fc1b25776..31d85d2b72 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1497,7 +1497,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 if (pShow->pIter == NULL) break; if (pStb->dbUid != pDb->uid) { - if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) { + if (strncmp(pStb->db, pDb->name, prefixLen) == 0) { mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index eb2b010ef7..389ebcbb22 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -34,7 +34,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq); static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp); -static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq); +static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq); static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); @@ -225,12 +225,12 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq return pDrop; } -static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { +static int32_t mndCheckCreateTopicMsg(SMCreateTopicReq *creattopReq) { // deserialize and other stuff return 0; } -static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { +static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq *pCreate, SDbObj *pDb) { mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); @@ -264,7 +264,7 @@ static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; char *msgStr = pReq->rpcMsg.pCont; - SCMCreateTopicReq createTopicReq = {0}; + SMCreateTopicReq createTopicReq = {0}; tDeserializeSCMCreateTopicReq(msgStr, &createTopicReq); mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); @@ -348,7 +348,7 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pRsp) { return 0; } -static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) { +static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; STableInfoReq infoReq = {0}; @@ -466,18 +466,6 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * pSchema[cols].bytes = pShow->bytes[cols]; cols++; - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "columns"); - pSchema[cols].bytes = pShow->bytes[cols]; - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "tags"); - pSchema[cols].bytes = pShow->bytes[cols]; - cols++; - pMeta->numOfColumns = cols; pShow->numOfColumns = cols; @@ -493,19 +481,6 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * return 0; } -static void mndExtractTableName(char *tableId, char *name) { - int32_t pos = -1; - int32_t num = 0; - for (pos = 0; tableId[pos] != 0; ++pos) { - if (tableId[pos] == '.') num++; - if (num == 2) break; - } - - if (num == 2) { - strcpy(name, tableId + pos + 1); - } -} - static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -513,9 +488,12 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in SMqTopicObj *pTopic = NULL; int32_t cols = 0; char *pWrite; - char prefix[64] = {0}; + char prefix[TSDB_DB_FNAME_LEN] = {0}; - tstrncpy(prefix, pShow->db, 64); + SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); + if (pDb == NULL) return 0; + + tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN); strcat(prefix, TS_PATH_DELIMITER); int32_t prefixLen = (int32_t)strlen(prefix); @@ -523,7 +501,11 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); if (pShow->pIter == NULL) break; - if (strncmp(pTopic->name, prefix, prefixLen) != 0) { + if (pTopic->dbUid != pDb->uid) { + if (strncmp(pTopic->name, prefix, prefixLen) != 0) { + mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); + } + sdbRelease(pSdb, pTopic); continue; } @@ -540,18 +522,11 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in *(int64_t *)pWrite = pTopic->createTime; cols++; - /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ - /**(int32_t *)pWrite = pTopic->numOfColumns;*/ - /*cols++;*/ - - /*pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;*/ - /**(int32_t *)pWrite = pTopic->numOfTags;*/ - /*cols++;*/ - numOfRows++; sdbRelease(pSdb, pTopic); } + mndReleaseDb(pMnode, pDb); pShow->numOfReads += numOfRows; mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); return numOfRows;