From 8d5f5901c4903e83ba5e945e26e0fd7430b8552f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Dec 2021 15:06:29 +0800 Subject: [PATCH] TD-10431 profile test --- include/common/taosmsg.h | 85 +++---- .../dnode/mgmt/impl/test/profile/profile.cpp | 217 +++++++++++++++++- source/dnode/mnode/impl/src/mndProfile.c | 169 ++++++++------ source/dnode/mnode/impl/src/mndShow.c | 11 +- 4 files changed, 361 insertions(+), 121 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 9443ef7475..300de7b698 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -418,10 +418,10 @@ typedef struct { } SDropSTableMsg; typedef struct SColIndex { - int16_t colId; // column id - int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag - uint16_t flag; // denote if it is a tag or a normal column - char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1]; + int16_t colId; // column id + int16_t colIndex; // column index in colList if it is a normal column or index in tagColList if a tag + int16_t flag; // denote if it is a tag or a normal column + char name[TSDB_COL_NAME_LEN + TSDB_DB_NAME_LEN + 1]; } SColIndex; typedef struct SColumnFilterInfo { @@ -515,8 +515,8 @@ typedef struct { int16_t numOfCols; // the number of columns will be load from vnode SInterval interval; // SSessionWindow sw; // session window - uint16_t tagCondLen; // tag length in current query - uint16_t colCondLen; // column length in current query + int16_t tagCondLen; // tag length in current query + int16_t colCondLen; // column length in current query int16_t numOfGroupCols; // num of group by columns int16_t orderByIdx; int16_t orderType; // used in group by xx order by xxx @@ -524,10 +524,10 @@ typedef struct { int16_t prjOrder; // global order in super table projection query. int64_t limit; int64_t offset; - uint32_t queryType; // denote another query process + int32_t queryType; // denote another query process int16_t numOfOutput; // final output columns numbers int16_t fillType; // interpolate type - uint64_t fillVal; // default value array list + int64_t fillVal; // default value array list int32_t secondStageOutput; STsBufInfo tsBuf; // tsBuf info int32_t numOfTags; // number of tags columns involved @@ -542,8 +542,11 @@ typedef struct { } SQueryTableMsg; typedef struct { - int32_t code; - union{uint64_t qhandle; uint64_t qId;}; // query handle + int32_t code; + union { + uint64_t qhandle; + uint64_t qId; + }; // query handle } SQueryTableRsp; // todo: the show handle should be replaced with id @@ -706,7 +709,7 @@ typedef struct { typedef struct { char db[TSDB_FULL_DB_NAME_LEN]; - uint32_t vgId; + int32_t vgId; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; @@ -784,14 +787,14 @@ typedef struct STableMetaMsg { } STableMetaMsg; typedef struct SMultiTableMeta { - int32_t numOfTables; - int32_t numOfVgroup; - int32_t numOfUdf; - int32_t contLen; - uint8_t compressed; // denote if compressed or not - uint32_t rawLen; // size before compress - uint8_t metaClone; // make meta clone after retrieve meta from mnode - char meta[]; + int32_t numOfTables; + int32_t numOfVgroup; + int32_t numOfUdf; + int32_t contLen; + int8_t compressed; // denote if compressed or not + int32_t rawLen; // size before compress + uint8_t metaClone; // make meta clone after retrieve meta from mnode + char meta[]; } SMultiTableMeta; typedef struct { @@ -841,8 +844,8 @@ typedef struct { } SConfigTableMsg; typedef struct { - uint32_t dnodeId; - int32_t vgId; + int32_t dnodeId; + int32_t vgId; } SConfigVnodeMsg; typedef struct { @@ -851,29 +854,29 @@ typedef struct { } SCfgDnodeMsg; typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; - uint32_t queryId; - int64_t useconds; - int64_t stime; - uint64_t qId; - uint64_t sqlObjId; - int32_t pid; - char fqdn[TSDB_FQDN_LEN]; - uint8_t stableQuery; - int32_t numOfSub; - char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) + char sql[TSDB_SHOW_SQL_LEN]; + int32_t queryId; + int64_t useconds; + int64_t stime; + int64_t qId; + int64_t sqlObjId; + int32_t pid; + char fqdn[TSDB_FQDN_LEN]; + int8_t stableQuery; + int32_t numOfSub; + char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; // include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) } SQueryDesc; typedef struct { - char sql[TSDB_SHOW_SQL_LEN]; - char dstTable[TSDB_TABLE_NAME_LEN]; - uint32_t streamId; - int64_t num; // number of computing/cycles - int64_t useconds; - int64_t ctime; - int64_t stime; - int64_t slidingTime; - int64_t interval; + char sql[TSDB_SHOW_SQL_LEN]; + char dstTable[TSDB_TABLE_NAME_LEN]; + int32_t streamId; + int64_t num; // number of computing/cycles + int64_t useconds; + int64_t ctime; + int64_t stime; + int64_t slidingTime; + int64_t interval; } SStreamDesc; typedef struct { diff --git a/source/dnode/mgmt/impl/test/profile/profile.cpp b/source/dnode/mgmt/impl/test/profile/profile.cpp index 210885bd27..83636d1477 100644 --- a/source/dnode/mgmt/impl/test/profile/profile.cpp +++ b/source/dnode/mgmt/impl/test/profile/profile.cpp @@ -215,7 +215,7 @@ TEST_F(DndTestProfile, SConnectMsg_03) { sendMsg(pClient, &rpcMsg); SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); - ASSERT_NE(pMsg->code, 0); + ASSERT_EQ(pMsg->code, 0); SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont; ASSERT_NE(pRsp, nullptr); @@ -310,7 +310,7 @@ TEST_F(DndTestProfile, SKillConnMsg_01) { sendMsg(pClient, &rpcMsg); SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, TSDB_CODE_TSC_INVALID_CONNECTION); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONNECTION); ASSERT_EQ(pMsg->contLen, 0); } @@ -338,7 +338,7 @@ TEST_F(DndTestProfile, SKillConnMsg_01) { EXPECT_EQ(pRsp->acctId, 1); EXPECT_GT(pRsp->clusterId, 0); - EXPECT_EQ(pRsp->superAuth, 1); + EXPECT_GT(pRsp->connId, connId); EXPECT_EQ(pRsp->readAuth, 1); EXPECT_EQ(pRsp->writeAuth, 1); @@ -446,6 +446,115 @@ TEST_F(DndTestProfile, SKillQueryMsg_02) { ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONN_ID); } +TEST_F(DndTestProfile, SKillQueryMsg_03) { + ASSERT_NE(pClient, nullptr); + int32_t showId = 0; + + { + SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pReq->type = TSDB_MGMT_TABLE_QUERIES; + strcpy(pReq->db, ""); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SShowMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + + SShowRsp* pRsp = (SShowRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->showId = htonl(pRsp->showId); + STableMetaMsg* pMeta = &pRsp->tableMeta; + pMeta->contLen = htonl(pMeta->contLen); + pMeta->numOfColumns = htons(pMeta->numOfColumns); + pMeta->sversion = htons(pMeta->sversion); + pMeta->tversion = htons(pMeta->tversion); + pMeta->tid = htonl(pMeta->tid); + pMeta->uid = htobe64(pMeta->uid); + pMeta->suid = htobe64(pMeta->suid); + + showId = pRsp->showId; + + EXPECT_NE(pRsp->showId, 0); + EXPECT_EQ(pMeta->contLen, 0); + EXPECT_STREQ(pMeta->tableFname, ""); + EXPECT_EQ(pMeta->numOfTags, 0); + EXPECT_EQ(pMeta->precision, 0); + EXPECT_EQ(pMeta->tableType, 0); + EXPECT_EQ(pMeta->numOfColumns, 14); + EXPECT_EQ(pMeta->sversion, 0); + EXPECT_EQ(pMeta->tversion, 0); + EXPECT_EQ(pMeta->tid, 0); + EXPECT_EQ(pMeta->uid, 0); + EXPECT_STREQ(pMeta->sTableName, ""); + EXPECT_EQ(pMeta->suid, 0); + + SSchema* pSchema = NULL; + pSchema = &pMeta->schema[0]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); + EXPECT_EQ(pSchema->bytes, 4); + EXPECT_STREQ(pSchema->name, "queryId"); + + pSchema = &pMeta->schema[1]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); + EXPECT_EQ(pSchema->bytes, 4); + EXPECT_STREQ(pSchema->name, "connId"); + + pSchema = &pMeta->schema[2]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "user"); + + pSchema = &pMeta->schema[3]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "ip:port"); + } + + { + SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pReq->showId = htonl(showId); + pReq->free = 0; + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SRetrieveTableMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->offset = htobe64(pRsp->offset); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->compLen = htonl(pRsp->compLen); + + EXPECT_EQ(pRsp->numOfRows, 0); + EXPECT_EQ(pRsp->offset, 0); + EXPECT_EQ(pRsp->useconds, 0); + EXPECT_EQ(pRsp->completed, 1); + EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRsp->compressed, 0); + EXPECT_EQ(pRsp->reserved, 0); + EXPECT_EQ(pRsp->compLen, 0); + } +} + TEST_F(DndTestProfile, SKillStreamMsg_01) { ASSERT_NE(pClient, nullptr); @@ -523,3 +632,105 @@ TEST_F(DndTestProfile, SKillStreamMsg_02) { ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONN_ID); } + +TEST_F(DndTestProfile, SKillStreamMsg_03) { + ASSERT_NE(pClient, nullptr); + int32_t showId = 0; + + { + SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pReq->type = TSDB_MGMT_TABLE_STREAMS; + strcpy(pReq->db, ""); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SShowMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + + SShowRsp* pRsp = (SShowRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->showId = htonl(pRsp->showId); + STableMetaMsg* pMeta = &pRsp->tableMeta; + pMeta->contLen = htonl(pMeta->contLen); + pMeta->numOfColumns = htons(pMeta->numOfColumns); + pMeta->sversion = htons(pMeta->sversion); + pMeta->tversion = htons(pMeta->tversion); + pMeta->tid = htonl(pMeta->tid); + pMeta->uid = htobe64(pMeta->uid); + pMeta->suid = htobe64(pMeta->suid); + + showId = pRsp->showId; + + EXPECT_NE(pRsp->showId, 0); + EXPECT_EQ(pMeta->contLen, 0); + EXPECT_STREQ(pMeta->tableFname, ""); + EXPECT_EQ(pMeta->numOfTags, 0); + EXPECT_EQ(pMeta->precision, 0); + EXPECT_EQ(pMeta->tableType, 0); + EXPECT_EQ(pMeta->numOfColumns, 10); + EXPECT_EQ(pMeta->sversion, 0); + EXPECT_EQ(pMeta->tversion, 0); + EXPECT_EQ(pMeta->tid, 0); + EXPECT_EQ(pMeta->uid, 0); + EXPECT_STREQ(pMeta->sTableName, ""); + EXPECT_EQ(pMeta->suid, 0); + + SSchema* pSchema = NULL; + pSchema = &pMeta->schema[0]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); + EXPECT_EQ(pSchema->bytes, 4); + EXPECT_STREQ(pSchema->name, "streamId"); + + pSchema = &pMeta->schema[1]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); + EXPECT_EQ(pSchema->bytes, 4); + EXPECT_STREQ(pSchema->name, "connId"); + + pSchema = &pMeta->schema[2]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "user"); + } + + { + SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pReq->showId = htonl(showId); + pReq->free = 0; + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SRetrieveTableMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->offset = htobe64(pRsp->offset); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->compLen = htonl(pRsp->compLen); + + EXPECT_EQ(pRsp->numOfRows, 0); + EXPECT_EQ(pRsp->offset, 0); + EXPECT_EQ(pRsp->useconds, 0); + EXPECT_EQ(pRsp->completed, 1); + EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRsp->compressed, 0); + EXPECT_EQ(pRsp->reserved, 0); + EXPECT_EQ(pRsp->compLen, 0); + } +} diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 6d95c5ffb9..8f51b1a0be 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -29,7 +29,7 @@ typedef struct { char user[TSDB_USER_LEN]; char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc int32_t pid; // pid of app that invokes taosc - int32_t connId; + int32_t id; int8_t killed; int8_t align; uint16_t port; @@ -46,7 +46,7 @@ typedef struct { static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app); static void mndFreeConn(SConnObj *pConn); -static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *user, uint32_t ip, uint16_t port); +static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn); static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); @@ -57,11 +57,11 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg); static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg); static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); -static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow); +static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); -static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow); -static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMnodeMsg *pMsg); +static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveStreams(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); int32_t mndInitProfile(SMnode *pMnode) { @@ -109,7 +109,7 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1); SConnObj connObj = {.pid = pid, - .connId = connId, + .id = connId, .killed = 0, .port = port, .ip = ip, @@ -130,10 +130,10 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); if (pConn == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); + mError("conn:%d, data:%p failed to put into cache since %s, user:%s", connId, pConn, user, terrstr()); return NULL; } else { - mDebug("conn:%d, is created, user:%s", connId, user); + mTrace("conn:%d, data:%p created, user:%s", pConn->id, pConn, user); return pConn; } } @@ -141,39 +141,29 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t static void mndFreeConn(SConnObj *pConn) { tfree(pConn->pQueries); tfree(pConn->pStreams); - mDebug("conn:%d, is destroyed", pConn->connId); + mTrace("conn:%d, data:%p destroyed", pConn->id, pConn); } -static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *newUser, uint32_t newIp, uint16_t newPort) { +static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); if (pConn == NULL) { - mDebug("conn:%d, already destroyed, user:%s", connId, newUser); - return NULL; - } - - if (pConn->ip != newIp || pConn->port != newPort /* || strcmp(pConn->user, newUser) != 0 */) { - char oldIpStr[30]; - char newIpStr[30]; - taosIp2String(pConn->ip, oldIpStr); - taosIp2String(newIp, newIpStr); - mDebug("conn:%d, incoming conn user:%s ip:%s:%u, not match exist user:%s ip:%s:%u", connId, newUser, newIpStr, - newPort, pConn->user, oldIpStr, pConn->port); - - if (pMgmt->connId < connId) pMgmt->connId = connId + 1; - taosCacheRelease(pMgmt->cache, (void **)&pConn, false); + mDebug("conn:%d, already destroyed", connId); return NULL; } int32_t keepTime = pMnode->shellActivityTimer * 3; pConn->lastAccess = keepTime * 1000 + (uint64_t)taosGetTimestampMs(); + + mTrace("conn:%d, data:%p acquired from cache", pConn->id, pConn); return pConn; } static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { if (pConn == NULL) return; - + mTrace("conn:%d, data:%p released from cache", pConn->id, pConn); + SProfileMgmt *pMgmt = &pMnode->profileMgmt; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); } @@ -250,13 +240,13 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { } pRsp->clusterId = htonl(pMnode->clusterId); - pRsp->connId = htonl(pConn->connId); + pRsp->connId = htonl(pConn->id); mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndReleaseConn(pMnode, pConn); pMsg->contLen = sizeof(SConnectRsp); pMsg->pCont = pRsp; - mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->connId); + mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->id); return 0; } @@ -296,7 +286,9 @@ static int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pMsg) { } static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont; pReq->connId = htonl(pReq->connId); pReq->pid = htonl(pReq->pid); @@ -307,20 +299,33 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { return -1; } - SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId, info.user, info.clientIp, info.clientPort); + SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId); if (pConn == NULL) { pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app); if (pConn == NULL) { mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr()); return -1; } else { - mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->connId); + mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->id); } } else if (pConn->killed) { - mDebug("user:%s, conn:%d is already killed", pMsg->user, pReq->connId, pConn->connId); - terrno = TSDB_CODE_TSC_INVALID_CONNECTION; + mError("user:%s, conn:%d is already killed", pMsg->user, pConn->id); + terrno = TSDB_CODE_MND_INVALID_CONNECTION; return -1; } else { + if (pConn->ip != info.clientIp || pConn->port != info.clientPort /* || strcmp(pConn->user, info.user) != 0 */) { + char oldIpStr[40]; + char newIpStr[40]; + taosIpPort2String(pConn->ip, pConn->port, oldIpStr); + taosIpPort2String(info.clientIp, info.clientPort, newIpStr); + mError("conn:%d, incoming conn user:%s ip:%s, not match exist user:%s ip:%s", pConn->id, info.user, newIpStr, + pConn->user, oldIpStr); + + if (pMgmt->connId < pConn->id) pMgmt->connId = pConn->id + 1; + taosCacheRelease(pMgmt->cache, (void **)&pConn, false); + terrno = TSDB_CODE_MND_INVALID_CONNECTION; + return -1; + } } SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp)); @@ -346,7 +351,7 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { pConn->queryId = 0; } - pRsp->connId = htonl(pConn->connId); + pRsp->connId = htonl(pConn->id); pRsp->totalDnodes = htonl(1); pRsp->onlineDnodes = htonl(1); mndGetMnodeEpSet(pMnode, &pRsp->epSet); @@ -525,47 +530,47 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pMsg->pMnode; int32_t numOfRows = 0; - SConnObj *pConnObj = NULL; + SConnObj *pConn = NULL; int32_t cols = 0; char *pWrite; char ipStr[TSDB_IPv4ADDR_LEN + 6]; while (numOfRows < rows) { - pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj); - if (pConnObj == NULL) break; + pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn); + if (pConn == NULL) break; cols = 0; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pConnObj->connId; + *(int32_t *)pWrite = pConn->id; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]); cols++; // app name pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->app, pShow->bytes[cols]); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->app, pShow->bytes[cols]); cols++; // app pid pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pConnObj->pid; + *(int32_t *)pWrite = pConn->pid; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - taosIpPort2String(pConnObj->ip, pConnObj->port, ipStr); + taosIpPort2String(pConn->ip, pConn->port, ipStr); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pConnObj->stime; + *(int64_t *)pWrite = pConn->stime; cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (pConnObj->lastAccess < pConnObj->stime) pConnObj->lastAccess = pConnObj->stime; - *(int64_t *)pWrite = pConnObj->lastAccess; + if (pConn->lastAccess < pConn->stime) pConn->lastAccess = pConn->stime; + *(int64_t *)pWrite = pConn->lastAccess; cols++; numOfRows++; @@ -576,7 +581,7 @@ static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, in return numOfRows; } -static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow) { +static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { SMnode *pMnode = pMsg->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -592,9 +597,15 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj * int32_t cols = 0; SSchema *pSchema = pMeta->schema; - pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "query_id"); + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "queryId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "connId"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; @@ -687,40 +698,43 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj * static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pMsg->pMnode; int32_t numOfRows = 0; - SConnObj *pConnObj = NULL; + SConnObj *pConn = NULL; int32_t cols = 0; char *pWrite; void *pIter; char str[TSDB_IPv4ADDR_LEN + 6] = {0}; while (numOfRows < rows) { - pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj); - if (pConnObj == NULL) { + pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn); + if (pConn == NULL) { pShow->pIter = pIter; break; } - if (numOfRows + pConnObj->numOfQueries >= rows) { + if (numOfRows + pConn->numOfQueries >= rows) { mndCancelGetNextConn(pMnode, pIter); break; } pShow->pIter = pIter; - for (int32_t i = 0; i < pConnObj->numOfQueries; ++i) { - SQueryDesc *pDesc = pConnObj->pQueries + i; + for (int32_t i = 0; i < pConn->numOfQueries; ++i) { + SQueryDesc *pDesc = pConn->pQueries + i; cols = 0; - snprintf(str, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); + *(int64_t *)pWrite = htobe64(pDesc->queryId); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); + *(int64_t *)pWrite = htobe64(pConn->id); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConn->ip), pConn->port); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); cols++; @@ -749,7 +763,7 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, cols++; char epBuf[TSDB_EP_LEN + 1] = {0}; - snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConnObj->port); + snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConn->port); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]); cols++; @@ -783,7 +797,7 @@ static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); } -static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow) { +static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { SMnode *pMnode = pMsg->pMnode; SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -799,12 +813,18 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj int32_t cols = 0; SSchema *pSchema = pMeta->schema; - pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; strcpy(pSchema[cols].name, "streamId"); pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "connId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; strcpy(pSchema[cols].name, "user"); @@ -867,39 +887,42 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj return 0; } -static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMnodeMsg *pMsg) { +static int32_t mndRetrieveStreams(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { SMnode *pMnode = pMsg->pMnode; int32_t numOfRows = 0; - SConnObj *pConnObj = NULL; + SConnObj *pConn = NULL; int32_t cols = 0; char *pWrite; void *pIter; char ipStr[TSDB_IPv4ADDR_LEN + 6]; while (numOfRows < rows) { - pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj); - if (pConnObj == NULL) { + pIter = mndGetNextConn(pMnode, pShow->pIter, &pConn); + if (pConn == NULL) { pShow->pIter = pIter; break; } - if (numOfRows + pConnObj->numOfStreams >= rows) { + if (numOfRows + pConn->numOfStreams >= rows) { mndCancelGetNextConn(pMnode, pIter); break; } pShow->pIter = pIter; - for (int32_t i = 0; i < pConnObj->numOfStreams; ++i) { - SStreamDesc *pDesc = pConnObj->pStreams + i; + for (int32_t i = 0; i < pConn->numOfStreams; ++i) { + SStreamDesc *pDesc = pConn->pStreams + i; cols = 0; - snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->streamId)); pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + *(int64_t *)pWrite = htobe64(pDesc->streamId); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); + *(int64_t *)pWrite = htobe64(pConn->id); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConn->user, pShow->bytes[cols]); cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; @@ -907,7 +930,7 @@ static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMn cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConn->ip), pConn->port); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); cols++; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 1fe6684a01..ca0d9b9d50 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -16,14 +16,14 @@ #define _DEFAULT_SOURCE #include "mndShow.h" -static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg); -static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg); -static bool mndCheckRetrieveFinished(SShowObj *pShow); static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg); static void mndFreeShowObj(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static char *mndShowStr(int32_t showType); +static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg); +static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg); +static bool mndCheckRetrieveFinished(SShowObj *pShow); int32_t mndInitShow(SMnode *pMnode) { SShowMgmt *pMgmt = &pMnode->showMgmt; @@ -116,6 +116,9 @@ static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) { static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { if (pShow == NULL) return; mTrace("show:%d, data:%p released from cache, force:%d", pShow->id, pShow, forceRemove); + + // A bug in tcache.c + forceRemove = 0; SMnode *pMnode = pShow->pMnode; SShowMgmt *pMgmt = &pMnode->showMgmt; @@ -244,7 +247,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { pMnodeMsg->pCont = pRsp; pMnodeMsg->contLen = size; - if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { + if (rowsRead == 0 || rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { pRsp->completed = 1; mDebug("show:%d, data:%p retrieve completed", pShow->id, pShow); mndReleaseShowObj(pShow, true);