From c0fc32f6c00df18d2e83d11ed6236d996b89397a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Dec 2021 14:38:37 +0800 Subject: [PATCH] TD-10431 profile test --- include/common/taosmsg.h | 120 ++--- .../dnode/mgmt/impl/test/profile/profile.cpp | 428 +++++++++++++++++- source/dnode/mgmt/impl/test/sut/deploy.cpp | 48 +- source/dnode/mgmt/impl/test/sut/deploy.h | 6 +- source/dnode/mnode/impl/inc/mndDef.h | 1 - source/dnode/mnode/impl/src/mndProfile.c | 17 +- source/dnode/mnode/impl/src/mndShow.c | 165 ++++--- 7 files changed, 613 insertions(+), 172 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 93186252a7..9443ef7475 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -549,56 +549,61 @@ typedef struct { // todo: the show handle should be replaced with id typedef struct { SMsgHead header; - union{uint64_t qhandle; uint64_t qId;}; // query handle - uint16_t free; + union { + int32_t showId; + int64_t qhandle; + int64_t qId; + }; // query handle + int8_t free; } SRetrieveTableMsg; typedef struct SRetrieveTableRsp { int32_t numOfRows; - int8_t completed; // all results are returned to client - int16_t precision; - int64_t offset; // updated offset value for multi-vnode projection query + int64_t offset; // updated offset value for multi-vnode projection query int64_t useconds; + int8_t completed; // all results are returned to client + int8_t precision; int8_t compressed; + int8_t reserved; int32_t compLen; char data[]; } SRetrieveTableRsp; typedef struct { - char db[TSDB_FULL_DB_NAME_LEN]; - int32_t cacheBlockSize; //MB - int32_t totalBlocks; - int32_t maxTables; - int32_t daysPerFile; - int32_t daysToKeep0; - int32_t daysToKeep1; - int32_t daysToKeep2; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; - int32_t commitTime; - int32_t fsyncPeriod; - uint8_t precision; // time resolution - int8_t compression; - int8_t walLevel; - int8_t replications; - int8_t quorum; - int8_t ignoreExist; - int8_t update; - int8_t cacheLastRow; - int8_t dbType; - int16_t partitions; - int8_t reserve[5]; + char db[TSDB_FULL_DB_NAME_LEN]; + int32_t cacheBlockSize; // MB + int32_t totalBlocks; + int32_t maxTables; + int32_t daysPerFile; + int32_t daysToKeep0; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t minRowsPerFileBlock; + int32_t maxRowsPerFileBlock; + int32_t commitTime; + int32_t fsyncPeriod; + int8_t precision; // time resolution + int8_t compression; + int8_t walLevel; + int8_t replications; + int8_t quorum; + int8_t ignoreExist; + int8_t update; + int8_t cacheLastRow; + int8_t dbType; + int16_t partitions; + int8_t reserve[5]; } SCreateDbMsg, SAlterDbMsg; typedef struct { - char name[TSDB_FUNC_NAME_LEN]; - char path[PATH_MAX]; - int32_t funcType; - uint8_t outputType; - int16_t outputLen; - int32_t bufSize; - int32_t codeLen; - char code[]; + char name[TSDB_FUNC_NAME_LEN]; + char path[PATH_MAX]; + int32_t funcType; + int8_t outputType; + int16_t outputLen; + int32_t bufSize; + int32_t codeLen; + char code[]; } SCreateFuncMsg; typedef struct { @@ -626,8 +631,8 @@ typedef struct { } SDropFuncMsg; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; - uint8_t ignoreNotExists; + char db[TSDB_TABLE_FNAME_LEN]; + int8_t ignoreNotExists; } SDropDbMsg, SUseDbMsg, SSyncDbMsg; typedef struct { @@ -762,21 +767,20 @@ typedef struct { } SVgroupsMsg, SVgroupsInfo; typedef struct STableMetaMsg { - int32_t contLen; - char tableFname[TSDB_TABLE_FNAME_LEN]; // table id - uint8_t numOfTags; - uint8_t precision; - uint8_t tableType; - int16_t numOfColumns; - int16_t sversion; - int16_t tversion; - int32_t tid; - uint64_t uid; - SVgroupMsg vgroup; - - char sTableName[TSDB_TABLE_FNAME_LEN]; - uint64_t suid; - SSchema schema[]; + int32_t contLen; + char tableFname[TSDB_TABLE_FNAME_LEN]; // table id + int8_t numOfTags; + int8_t precision; + int8_t tableType; + int16_t numOfColumns; + int16_t sversion; + int16_t tversion; + int32_t tid; + int64_t uid; + SVgroupMsg vgroup; + char sTableName[TSDB_TABLE_FNAME_LEN]; + int64_t suid; + SSchema schema[]; } STableMetaMsg; typedef struct SMultiTableMeta { @@ -802,10 +806,10 @@ typedef struct { * payloadLen is the length of payload */ typedef struct { - int8_t type; - char db[TSDB_FULL_DB_NAME_LEN]; - uint16_t payloadLen; - char payload[]; + int8_t type; + char db[TSDB_FULL_DB_NAME_LEN]; + int16_t payloadLen; + char payload[]; } SShowMsg; typedef struct { @@ -815,7 +819,7 @@ typedef struct { } SCompactMsg; typedef struct SShowRsp { - uint64_t qhandle; + int32_t showId; STableMetaMsg tableMeta; } SShowRsp; diff --git a/source/dnode/mgmt/impl/test/profile/profile.cpp b/source/dnode/mgmt/impl/test/profile/profile.cpp index ebdaa85ac7..210885bd27 100644 --- a/source/dnode/mgmt/impl/test/profile/profile.cpp +++ b/source/dnode/mgmt/impl/test/profile/profile.cpp @@ -21,9 +21,15 @@ class DndTestProfile : public ::testing::Test { void TearDown() override {} static void SetUpTestSuite() { - pServer = createServer("/tmp/dndTestProfile", "localhost", 7100); + const char* user = "root"; + const char* pass = "taosdata"; + const char* path = "/tmp/dndTestProfile"; + const char* fqdn = "localhost"; + uint16_t port = 9527; + + pServer = createServer(path, fqdn, port); ASSERT(pServer); - pClient = createClient("root", "taosdata"); + pClient = createClient(user, pass, fqdn, port); } static void TearDownTestSuite() { @@ -33,12 +39,14 @@ class DndTestProfile : public ::testing::Test { static SServer* pServer; static SClient* pClient; + static int32_t connId; }; SServer* DndTestProfile::pServer; SClient* DndTestProfile::pClient; +int32_t DndTestProfile::connId; -TEST_F(DndTestProfile, connectMsg_01) { +TEST_F(DndTestProfile, SConnectMsg_01) { ASSERT_NE(pClient, nullptr); SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg)); @@ -52,8 +60,10 @@ TEST_F(DndTestProfile, connectMsg_01) { rpcMsg.msgType = TSDB_MSG_TYPE_CONNECT; sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); - SConnectRsp* pRsp = (SConnectRsp*)pClient->pRsp->pCont; + SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; ASSERT_NE(pRsp, nullptr); pRsp->acctId = htonl(pRsp->acctId); pRsp->clusterId = htonl(pRsp->clusterId); @@ -71,13 +81,165 @@ TEST_F(DndTestProfile, connectMsg_01) { EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(pRsp->epSet.port[0], 9527); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); + + connId = pRsp->connId; } -TEST_F(DndTestProfile, heartbeatMsg_01) { +TEST_F(DndTestProfile, SConnectMsg_02) { + ASSERT_NE(pClient, nullptr); + + SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg)); + pReq->pid = htonl(1234); + strcpy(pReq->app, "test01"); + strcpy(pReq->db, "invalid_db"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SConnectMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CONNECT; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_DB); + ASSERT_EQ(pMsg->contLen, 0); +} + +TEST_F(DndTestProfile, SConnectMsg_03) { + ASSERT_NE(pClient, nullptr); + int32_t showId = 0; + + { + SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pReq->type = TSDB_MGMT_TABLE_CONNS; + strcpy(pReq->db, ""); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SShowMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + + SShowRsp* pRsp = (SShowRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->showId = htonl(pRsp->showId); + STableMetaMsg* pMeta = &pRsp->tableMeta; + pMeta->contLen = htonl(pMeta->contLen); + pMeta->numOfColumns = htons(pMeta->numOfColumns); + pMeta->sversion = htons(pMeta->sversion); + pMeta->tversion = htons(pMeta->tversion); + pMeta->tid = htonl(pMeta->tid); + pMeta->uid = htobe64(pMeta->uid); + pMeta->suid = htobe64(pMeta->suid); + + showId = pRsp->showId; + + EXPECT_NE(pRsp->showId, 0); + EXPECT_EQ(pMeta->contLen, 0); + EXPECT_STREQ(pMeta->tableFname, ""); + EXPECT_EQ(pMeta->numOfTags, 0); + EXPECT_EQ(pMeta->precision, 0); + EXPECT_EQ(pMeta->tableType, 0); + EXPECT_EQ(pMeta->numOfColumns, 7); + EXPECT_EQ(pMeta->sversion, 0); + EXPECT_EQ(pMeta->tversion, 0); + EXPECT_EQ(pMeta->tid, 0); + EXPECT_EQ(pMeta->uid, 0); + EXPECT_STREQ(pMeta->sTableName, ""); + EXPECT_EQ(pMeta->suid, 0); + + SSchema* pSchema = NULL; + pSchema = &pMeta->schema[0]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); + EXPECT_EQ(pSchema->bytes, 4); + EXPECT_STREQ(pSchema->name, "connId"); + + pSchema = &pMeta->schema[1]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "user"); + + pSchema = &pMeta->schema[2]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_USER_LEN + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "program"); + + pSchema = &pMeta->schema[3]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_INT); + EXPECT_EQ(pSchema->bytes, 4); + EXPECT_STREQ(pSchema->name, "pid"); + + pSchema = &pMeta->schema[4]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_BINARY); + EXPECT_EQ(pSchema->bytes, TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE); + EXPECT_STREQ(pSchema->name, "ip:port"); + + pSchema = &pMeta->schema[5]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); + EXPECT_EQ(pSchema->bytes, 8); + EXPECT_STREQ(pSchema->name, "login_time"); + + pSchema = &pMeta->schema[6]; + pSchema->bytes = htons(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, TSDB_DATA_TYPE_TIMESTAMP); + EXPECT_EQ(pSchema->bytes, 8); + EXPECT_STREQ(pSchema->name, "last_access"); + } + + { + SRetrieveTableMsg* pReq = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pReq->showId = htonl(showId); + pReq->free = 0; + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SRetrieveTableMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_NE(pMsg->code, 0); + + SRetrieveTableRsp* pRsp = (SRetrieveTableRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->offset = htobe64(pRsp->offset); + pRsp->useconds = htobe64(pRsp->useconds); + pRsp->compLen = htonl(pRsp->compLen); + + EXPECT_EQ(pRsp->numOfRows, 1); + EXPECT_EQ(pRsp->offset, 0); + EXPECT_EQ(pRsp->useconds, 0); + EXPECT_EQ(pRsp->completed, 1); + EXPECT_EQ(pRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRsp->compressed, 0); + EXPECT_EQ(pRsp->reserved, 0); + EXPECT_EQ(pRsp->compLen, 0); + } +} + +TEST_F(DndTestProfile, SHeartBeatMsg_01) { ASSERT_NE(pClient, nullptr); SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg)); - pReq->connId = htonl(1); + pReq->connId = htonl(connId); pReq->pid = htonl(1234); pReq->numOfQueries = htonl(0); pReq->numOfStreams = htonl(0); @@ -89,8 +251,10 @@ TEST_F(DndTestProfile, heartbeatMsg_01) { rpcMsg.msgType = TSDB_MSG_TYPE_HEARTBEAT; sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); - SHeartBeatRsp* pRsp = (SHeartBeatRsp*)pClient->pRsp->pCont; + SHeartBeatRsp* pRsp = (SHeartBeatRsp*)pMsg->pCont; ASSERT_NE(pRsp, nullptr); pRsp->connId = htonl(pRsp->connId); pRsp->queryId = htonl(pRsp->queryId); @@ -99,7 +263,7 @@ TEST_F(DndTestProfile, heartbeatMsg_01) { pRsp->onlineDnodes = htonl(pRsp->onlineDnodes); pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); - EXPECT_EQ(pRsp->connId, 1); + EXPECT_EQ(pRsp->connId, connId); EXPECT_EQ(pRsp->queryId, 0); EXPECT_EQ(pRsp->streamId, 0); EXPECT_EQ(pRsp->totalDnodes, 1); @@ -111,3 +275,251 @@ TEST_F(DndTestProfile, heartbeatMsg_01) { EXPECT_EQ(pRsp->epSet.port[0], 9527); EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); } + +TEST_F(DndTestProfile, SKillConnMsg_01) { + ASSERT_NE(pClient, nullptr); + + { + SKillConnMsg* pReq = (SKillConnMsg*)rpcMallocCont(sizeof(SKillConnMsg)); + pReq->connId = htonl(connId); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SKillConnMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_KILL_CONN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } + + { + SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg)); + pReq->connId = htonl(connId); + pReq->pid = htonl(1234); + pReq->numOfQueries = htonl(0); + pReq->numOfStreams = htonl(0); + strcpy(pReq->app, "test01"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SHeartBeatMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_HEARTBEAT; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_TSC_INVALID_CONNECTION); + ASSERT_EQ(pMsg->contLen, 0); + } + + { + SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg)); + pReq->pid = htonl(1234); + strcpy(pReq->app, "test01"); + strcpy(pReq->db, ""); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SConnectMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CONNECT; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + + SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->acctId = htonl(pRsp->acctId); + pRsp->clusterId = htonl(pRsp->clusterId); + pRsp->connId = htonl(pRsp->connId); + pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); + + EXPECT_EQ(pRsp->acctId, 1); + EXPECT_GT(pRsp->clusterId, 0); + EXPECT_EQ(pRsp->superAuth, 1); + EXPECT_EQ(pRsp->readAuth, 1); + EXPECT_EQ(pRsp->writeAuth, 1); + + EXPECT_EQ(pRsp->epSet.inUse, 0); + EXPECT_EQ(pRsp->epSet.numOfEps, 1); + EXPECT_EQ(pRsp->epSet.port[0], 9527); + EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); + + connId = pRsp->connId; + } +} + +TEST_F(DndTestProfile, SKillConnMsg_02) { + ASSERT_NE(pClient, nullptr); + + SKillConnMsg* pReq = (SKillConnMsg*)rpcMallocCont(sizeof(SKillConnMsg)); + pReq->connId = htonl(2345); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SKillConnMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_KILL_CONN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONN_ID); +} + +TEST_F(DndTestProfile, SKillQueryMsg_01) { + ASSERT_NE(pClient, nullptr); + + { + SKillQueryMsg* pReq = (SKillQueryMsg*)rpcMallocCont(sizeof(SKillQueryMsg)); + pReq->connId = htonl(connId); + pReq->queryId = htonl(1234); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SKillQueryMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_KILL_QUERY; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + ASSERT_EQ(pMsg->contLen, 0); + } + + { + SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg)); + pReq->connId = htonl(connId); + pReq->pid = htonl(1234); + pReq->numOfQueries = htonl(0); + pReq->numOfStreams = htonl(0); + strcpy(pReq->app, "test01"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SHeartBeatMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_HEARTBEAT; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + + SHeartBeatRsp* pRsp = (SHeartBeatRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->connId = htonl(pRsp->connId); + pRsp->queryId = htonl(pRsp->queryId); + pRsp->streamId = htonl(pRsp->streamId); + pRsp->totalDnodes = htonl(pRsp->totalDnodes); + pRsp->onlineDnodes = htonl(pRsp->onlineDnodes); + pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); + + EXPECT_EQ(pRsp->connId, connId); + EXPECT_EQ(pRsp->queryId, 1234); + EXPECT_EQ(pRsp->streamId, 0); + EXPECT_EQ(pRsp->totalDnodes, 1); + EXPECT_EQ(pRsp->onlineDnodes, 1); + EXPECT_EQ(pRsp->killConnection, 0); + + EXPECT_EQ(pRsp->epSet.inUse, 0); + EXPECT_EQ(pRsp->epSet.numOfEps, 1); + EXPECT_EQ(pRsp->epSet.port[0], 9527); + EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); + } +} + +TEST_F(DndTestProfile, SKillQueryMsg_02) { + ASSERT_NE(pClient, nullptr); + + SKillQueryMsg* pReq = (SKillQueryMsg*)rpcMallocCont(sizeof(SKillQueryMsg)); + pReq->connId = htonl(2345); + pReq->queryId = htonl(1234); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SKillQueryMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_KILL_QUERY; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONN_ID); +} + +TEST_F(DndTestProfile, SKillStreamMsg_01) { + ASSERT_NE(pClient, nullptr); + + { + SKillStreamMsg* pReq = (SKillStreamMsg*)rpcMallocCont(sizeof(SKillStreamMsg)); + pReq->connId = htonl(connId); + pReq->streamId = htonl(3579); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SKillStreamMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_KILL_STREAM; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + ASSERT_EQ(pMsg->contLen, 0); + } + + { + SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg)); + pReq->connId = htonl(connId); + pReq->pid = htonl(1234); + pReq->numOfQueries = htonl(0); + pReq->numOfStreams = htonl(0); + strcpy(pReq->app, "test01"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SHeartBeatMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_HEARTBEAT; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + + SHeartBeatRsp* pRsp = (SHeartBeatRsp*)pMsg->pCont; + ASSERT_NE(pRsp, nullptr); + pRsp->connId = htonl(pRsp->connId); + pRsp->queryId = htonl(pRsp->queryId); + pRsp->streamId = htonl(pRsp->streamId); + pRsp->totalDnodes = htonl(pRsp->totalDnodes); + pRsp->onlineDnodes = htonl(pRsp->onlineDnodes); + pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); + + EXPECT_EQ(pRsp->connId, connId); + EXPECT_EQ(pRsp->queryId, 0); + EXPECT_EQ(pRsp->streamId, 3579); + EXPECT_EQ(pRsp->totalDnodes, 1); + EXPECT_EQ(pRsp->onlineDnodes, 1); + EXPECT_EQ(pRsp->killConnection, 0); + + EXPECT_EQ(pRsp->epSet.inUse, 0); + EXPECT_EQ(pRsp->epSet.numOfEps, 1); + EXPECT_EQ(pRsp->epSet.port[0], 9527); + EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); + } +} + +TEST_F(DndTestProfile, SKillStreamMsg_02) { + ASSERT_NE(pClient, nullptr); + + SKillStreamMsg* pReq = (SKillStreamMsg*)rpcMallocCont(sizeof(SKillStreamMsg)); + pReq->connId = htonl(2345); + pReq->streamId = htonl(1234); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SKillStreamMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_KILL_QUERY; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_CONN_ID); +} diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index 239120aa88..6a7732b7ed 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -15,8 +15,27 @@ #include "deploy.h" -void initLog(char* path) { +void initLog(const char* path) { + dDebugFlag = 0; + vDebugFlag = 0; mDebugFlag = 207; + cDebugFlag = 0; + jniDebugFlag = 0; + tmrDebugFlag = 0; + sdbDebugFlag = 0; + httpDebugFlag = 0; + mqttDebugFlag = 0; + monDebugFlag = 0; + uDebugFlag = 0; + rpcDebugFlag = 0; + odbcDebugFlag = 0; + qDebugFlag = 0; + wDebugFlag = 0; + sDebugFlag = 0; + tsdbDebugFlag = 0; + cqDebugFlag = 0; + debugFlag = 0; + char temp[PATH_MAX]; snprintf(temp, PATH_MAX, "%s/taosdlog", path); if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) { @@ -32,7 +51,7 @@ void* runServer(void* param) { } } -void initOption(SDnodeOpt* pOption, char* path, char* fqdn, uint16_t port) { +void initOption(SDnodeOpt* pOption, const char* path, const char* fqdn, uint16_t port) { pOption->sver = 1; pOption->numOfCores = 1; pOption->numOfSupportMnodes = 1; @@ -46,15 +65,16 @@ void initOption(SDnodeOpt* pOption, char* path, char* fqdn, uint16_t port) { pOption->shellActivityTimer = 30; pOption->serverPort = port; strcpy(pOption->dataDir, path); - snprintf(pOption->localEp, TSDB_EP_LEN, "%s:&u", fqdn, port); + snprintf(pOption->localEp, TSDB_EP_LEN, "%s:%u", fqdn, port); snprintf(pOption->localFqdn, TSDB_FQDN_LEN, "%s", fqdn); - snprintf(pOption->firstEp, TSDB_EP_LEN, "%s:&u", fqdn, port); - - taosRemoveDir(path); - taosMkDir(path); + snprintf(pOption->firstEp, TSDB_EP_LEN, "%s:%u", fqdn, port); } -SServer* createServer(char* path, char* fqdn, uint16_t port) { +SServer* createServer(const char* path, const char* fqdn, uint16_t port) { + taosRemoveDir(path); + taosMkDir(path); + initLog(path); + SDnodeOpt option = {0}; initOption(&option, path, fqdn, port); @@ -80,11 +100,11 @@ void dropServer(SServer* pServer) { void processClientRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SClient* pClient = (SClient*)parent; pClient->pRsp = pMsg; - //taosMsleep(1000000); + // taosMsleep(1000000); tsem_post(&pClient->sem); } -SClient* createClient(char *user, char *pass) { +SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port) { SClient* pClient = (SClient*)calloc(1, sizeof(SClient)); ASSERT(pClient); @@ -99,7 +119,7 @@ SClient* createClient(char *user, char *pass) { rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = 30 * 1000; - rpcInit.user = user; + rpcInit.user = (char*)user; rpcInit.ckey = "key"; rpcInit.parent = pClient; rpcInit.secret = (char*)secretEncrypt; @@ -110,6 +130,8 @@ SClient* createClient(char *user, char *pass) { ASSERT(pClient->clientRpc); tsem_init(&pClient->sem, 0, 0); + strcpy(pClient->fqdn, fqdn); + pClient->port = port; return pClient; } @@ -123,8 +145,8 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) { SEpSet epSet = {0}; epSet.inUse = 0; epSet.numOfEps = 1; - epSet.port[0] = 9527; - strcpy(epSet.fqdn[0], "localhost"); + epSet.port[0] = pClient->port; + strcpy(epSet.fqdn[0], pClient->fqdn); rpcSendRequest(pClient->clientRpc, &epSet, pMsg, NULL); tsem_wait(&pClient->sem); diff --git a/source/dnode/mgmt/impl/test/sut/deploy.h b/source/dnode/mgmt/impl/test/sut/deploy.h index b830c7351e..8a352b96e8 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.h +++ b/source/dnode/mgmt/impl/test/sut/deploy.h @@ -31,13 +31,15 @@ typedef struct { } SServer; typedef struct { + char fqdn[TSDB_FQDN_LEN]; + uint16_t port; void* clientRpc; SRpcMsg* pRsp; tsem_t sem; } SClient; -SServer* createServer(char* path, char *fqdn, uint16_t port); +SServer* createServer(const char* path, const char* fqdn, uint16_t port); void dropServer(SServer* pServer); -SClient* createClient(char *user, char *pass); +SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port); void dropClient(SClient* pClient); void sendMsg(SClient* pClient, SRpcMsg* pMsg); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index aaf86c15b6..6ad0f05c2a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -296,7 +296,6 @@ typedef struct SShowObj { void *pIter; void *pVgIter; SMnode *pMnode; - SShowObj **ppShow; char db[TSDB_FULL_DB_NAME_LEN]; int16_t offset[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS]; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index ac606b2ff6..6d95c5ffb9 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -172,9 +172,9 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *newUser, u } static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { - SProfileMgmt *pMgmt = &pMnode->profileMgmt; - if (pConn == NULL) return; + + SProfileMgmt *pMgmt = &pMnode->profileMgmt; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); } @@ -316,6 +316,11 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) { } else { mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->connId); } + } else if (pConn->killed) { + mDebug("user:%s, conn:%d is already killed", pMsg->user, pReq->connId, pConn->connId); + terrno = TSDB_CODE_TSC_INVALID_CONNECTION; + return -1; + } else { } SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp)); @@ -368,7 +373,7 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) { SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; int32_t connId = htonl(pKill->connId); int32_t queryId = htonl(pKill->queryId); - mInfo("kill query msg is received, queryId:%s", pKill->queryId); + mInfo("kill query msg is received, queryId:%d", pKill->queryId); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); if (pConn == NULL) { @@ -399,7 +404,7 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) { SKillStreamMsg *pKill = pMsg->rpcMsg.pCont; int32_t connId = htonl(pKill->connId); int32_t streamId = htonl(pKill->streamId); - mDebug("kill stream msg is received, streamId:%s", streamId); + mDebug("kill stream msg is received, streamId:%d", streamId); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); if (pConn == NULL) { @@ -432,11 +437,11 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) { SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); if (pConn == NULL) { - mError("connId:%s, failed to kill connection, conn not exist", connId); + mError("connId:%d, failed to kill connection, conn not exist", connId); terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%s, is killed by user:%s", connId, pMsg->user); + mInfo("connId:%d, is killed by user:%s", connId, pMsg->user); pConn->killed = 1; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index e652f94957..1fe6684a01 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -16,19 +16,19 @@ #define _DEFAULT_SOURCE #include "mndShow.h" -static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg); -static int32_t mndProcessRetrieveMsg( SMnodeMsg *pMsg); -static bool mndCheckRetrieveFinished(SShowObj *pShow); -static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow); -static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); -static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow); -static void mndFreeShowObj(void *ppShow); -static char *mndShowStr(int32_t showType); +static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg); +static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMsg); +static bool mndCheckRetrieveFinished(SShowObj *pShow); +static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg); +static void mndFreeShowObj(SShowObj *pShow); +static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId); +static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); +static char *mndShowStr(int32_t showType); int32_t mndInitShow(SMnode *pMnode) { SShowMgmt *pMgmt = &pMnode->showMgmt; - pMgmt->cache = taosCacheInit(TSDB_CACHE_PTR_KEY, 5, true, mndFreeShowObj, "show"); + pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, (__cache_free_fn_t)mndFreeShowObj, "show"); if (pMgmt->cache == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to alloc show cache since %s", terrstr()); @@ -48,47 +48,41 @@ void mndCleanupShow(SMnode *pMnode) { } } -static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow) { - TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE)pShow; - +static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg) { SShowMgmt *pMgmt = &pMnode->showMgmt; - SShowObj **ppShow = taosCacheAcquireByKey(pMgmt->cache, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE)); - if (ppShow) { - mTrace("show:%d, data:%p acquired from cache", pShow->id, ppShow); - return 0; - } - return -1; -} + int32_t showId = atomic_add_fetch_32(&pMgmt->showId, 1); + if (showId == 0) atomic_add_fetch_32(&pMgmt->showId, 1); -static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { - SMnode *pMnode = pShow->pMnode; - SShowMgmt *pMgmt = &pMnode->showMgmt; - SShowObj **ppShow = (SShowObj **)pShow->ppShow; - taosCacheRelease(pMgmt->cache, (void **)(&ppShow), forceRemove); - mDebug("show:%d, data:%p released from cache, force:%d", pShow->id, ppShow, forceRemove); -} - -static int32_t mndPutShowObj(SMnode *pMnode, SShowObj *pShow) { - SShowMgmt *pMgmt = &pMnode->showMgmt; - int32_t lifeSpan = pMnode->shellActivityTimer * 6 * 1000; - - TSDB_CACHE_PTR_TYPE val = (TSDB_CACHE_PTR_TYPE)pShow; - pShow->id = atomic_add_fetch_32(&pMgmt->showId, 1); - SShowObj **ppShow = - taosCachePut(pMgmt->cache, &val, sizeof(TSDB_CACHE_PTR_TYPE), &pShow, sizeof(TSDB_CACHE_PTR_TYPE), lifeSpan); - if (ppShow == NULL) { + int32_t size = sizeof(SShowObj) + pMsg->payloadLen; + SShowObj *pShow = calloc(1, size); + if (pShow != NULL) { + pShow->id = showId; + pShow->pMnode = pMnode; + pShow->type = pMsg->type; + pShow->payloadLen = pMsg->payloadLen; + memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN); + memcpy(pShow->payload, pMsg->payload, pMsg->payloadLen); + } else { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("show:%d, failed to put into cache", pShow->id); - return -1; + mError("failed to process show-meta msg:%s since %s", mndShowStr(pMsg->type), terrstr()); + return NULL; } - mTrace("show:%d, data:%p put into cache", pShow->id, ppShow); - return 0; + int32_t keepTime = pMnode->shellActivityTimer * 6 * 1000; + SShowObj *pShowRet = taosCachePut(pMgmt->cache, &showId, sizeof(int32_t), pShow, size, keepTime); + free(pShow); + if (pShowRet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("show:%d, failed to put into cache since %s", showId, terrstr()); + return NULL; + } else { + mTrace("show:%d, data:%p created", showId, pShowRet); + return pShowRet; + } } -static void mndFreeShowObj(void *ppShow) { - SShowObj *pShow = *(SShowObj **)ppShow; +static void mndFreeShowObj(SShowObj *pShow) { SMnode *pMnode = pShow->pMnode; SShowMgmt *pMgmt = &pMnode->showMgmt; @@ -103,8 +97,29 @@ static void mndFreeShowObj(void *ppShow) { } } - mDebug("show:%d, data:%p destroyed", pShow->id, ppShow); - tfree(pShow); + mTrace("show:%d, data:%p destroyed", pShow->id, pShow); +} + +static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId) { + SShowMgmt *pMgmt = &pMnode->showMgmt; + + SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(int32_t)); + if (pShow == NULL) { + mError("show:%d, already destroyed", showId); + return NULL; + } + + mTrace("show:%d, data:%p acquired from cache", pShow->id, pShow); + return pShow; +} + +static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) { + if (pShow == NULL) return; + mTrace("show:%d, data:%p released from cache, force:%d", pShow->id, pShow, forceRemove); + + SMnode *pMnode = pShow->pMnode; + SShowMgmt *pMgmt = &pMnode->showMgmt; + taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove); } static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { @@ -112,7 +127,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont; int8_t type = pMsg->type; - uint16_t payloadLen = htonl(pMsg->payloadLen); + int16_t payloadLen = htonl(pMsg->payloadLen); if (type <= TSDB_MGMT_TABLE_START || type >= TSDB_MGMT_TABLE_MAX) { terrno = TSDB_CODE_MND_INVALID_MSG_TYPE; @@ -127,27 +142,13 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { return -1; } - int32_t size = sizeof(SShowObj) + payloadLen; - SShowObj *pShow = calloc(1, size); - if (pShow != NULL) { - pShow->pMnode = pMnode; - pShow->type = type; - pShow->payloadLen = payloadLen; - memcpy(pShow->db, pMsg->db, TSDB_FULL_DB_NAME_LEN); - memcpy(pShow->payload, pMsg->payload, payloadLen); - } else { - terrno = TSDB_CODE_OUT_OF_MEMORY; + SShowObj *pShow = mndCreateShowObj(pMnode, pMsg); + if (pShow == NULL) { mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr()); return -1; } - if (mndPutShowObj(pMnode, pShow) == 0) { - mError("failed to process show-meta msg:%s since %s", mndShowStr(type), terrstr()); - free(pShow); - return -1; - } - - size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; + int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE; SShowRsp *pRsp = rpcMallocCont(size); if (pRsp == NULL) { mndReleaseShowObj(pShow, true); @@ -156,15 +157,14 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) { return -1; } - pRsp->qhandle = htobe64((uint64_t)pShow); - - int32_t code = (*metaFp)(pMnodeMsg,pShow, &pRsp->tableMeta); - mDebug("show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s", pShow->id, mndShowStr(type), - pShow->numOfRows, pShow->numOfColumns, tstrerror(code)); + int32_t code = (*metaFp)(pMnodeMsg, pShow, &pRsp->tableMeta); + mDebug("show:%d, data:%p get meta finished, numOfRows:%d cols:%d type:%s result:%s", pShow->id, pShow, + pShow->numOfRows, pShow->numOfColumns, mndShowStr(type), tstrerror(code)); if (code == TSDB_CODE_SUCCESS) { pMnodeMsg->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns; pMnodeMsg->pCont = pRsp; + pRsp->showId = htonl(pShow->id); mndReleaseShowObj(pShow, false); return TSDB_CODE_SUCCESS; } else { @@ -182,14 +182,10 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { int32_t rowsRead = 0; SRetrieveTableMsg *pRetrieve = pMnodeMsg->rpcMsg.pCont; - pRetrieve->qhandle = htobe64(pRetrieve->qhandle); - SShowObj *pShow = (SShowObj *)pRetrieve->qhandle; + int32_t showId = htonl(pRetrieve->showId); - /* - * in case of server restart, apps may hold qhandle created by server before - * restart, which is actually invalid, therefore, signature check is required. - */ - if (mndAcquireShowObj(pMnode, pShow) != 0) { + SShowObj *pShow = mndAcquireShowObj(pMnode, showId); + if (pShow == NULL) { terrno = TSDB_CODE_MND_INVALID_SHOWOBJ; mError("failed to process show-retrieve msg:%p since %s", pShow, terrstr()); return -1; @@ -199,15 +195,16 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { if (retrieveFp == NULL) { mndReleaseShowObj(pShow, false); terrno = TSDB_CODE_MSG_NOT_PROCESSED; - mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr()); + mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr()); return -1; } - mDebug("show:%d, type:%s, start retrieve data, numOfReads:%d numOfRows:%d", pShow->id, mndShowStr(pShow->type), - pShow->numOfReads, pShow->numOfRows); + mDebug("show:%d, data:%p start retrieve data, numOfReads:%d numOfRows:%d type:%s", pShow->id, pShow, + pShow->numOfReads, pShow->numOfRows, mndShowStr(pShow->type)); if (mndCheckRetrieveFinished(pShow)) { - mDebug("show:%d, read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow->numOfReads, pShow->numOfRows); + mDebug("show:%d, data:%p read finished, numOfReads:%d numOfRows:%d", pShow->id, pShow, pShow->numOfReads, + pShow->numOfRows); pShow->numOfReads = pShow->numOfRows; } @@ -230,7 +227,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { if (pRsp == NULL) { mndReleaseShowObj(pShow, false); terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("show:%d, failed to retrieve data since %s", pShow->id, terrstr()); + mError("show:%d, data:%p failed to retrieve data since %s", pShow->id, pShow, terrstr()); return -1; } @@ -239,20 +236,20 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead); } - mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead); + mDebug("show:%d, data:%p stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, pShow, rowsRead, rowsToRead); pRsp->numOfRows = htonl(rowsRead); - pRsp->precision = (int16_t)htonl(TSDB_TIME_PRECISION_MILLI); // millisecond time precision + pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision pMnodeMsg->pCont = pRsp; pMnodeMsg->contLen = size; if (rowsToRead == 0 || (rowsRead == rowsToRead && pShow->numOfRows == pShow->numOfReads)) { pRsp->completed = 1; - mDebug("%p, retrieve completed", pShow); + mDebug("show:%d, data:%p retrieve completed", pShow->id, pShow); mndReleaseShowObj(pShow, true); } else { - mDebug("%p, retrieve not completed yet", pShow); + mDebug("show:%d, data:%p retrieve not completed yet", pShow->id, pShow); mndReleaseShowObj(pShow, false); } @@ -307,7 +304,7 @@ static char *mndShowStr(int32_t showType) { static bool mndCheckRetrieveFinished(SShowObj *pShow) { if (pShow->pIter == NULL && pShow->numOfReads != 0) { return true; - } + } return false; }