diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 2cdebab6bf..f880085351 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -281,9 +281,8 @@ PRASE_DNODE_OVER: if (pMgmt->dnodeEps == NULL) { pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); pMgmt->dnodeEps->num = 1; - pMgmt->dnodeEps->eps[0].isMnode = 1; - pMgmt->dnodeEps->eps[0].port = pDnode->opt.serverPort; - tstrncpy(pMgmt->dnodeEps->eps[0].fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); + pMgmt->dnodeEps->eps[0].isMnode = 1; + taosGetFqdnPortFromEp(pDnode->opt.firstEp, pMgmt->dnodeEps->eps[0].fqdn, &pMgmt->dnodeEps->eps[0].port); } dndResetDnodes(pDnode, pMgmt->dnodeEps); @@ -371,6 +370,8 @@ void dndSendStatusMsg(SDnode *pDnode) { SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TSDB_MSG_TYPE_STATUS}; pMgmt->statusSent = 1; + + dTrace("pDnode:%p, send status msg to mnode", pDnode); dndSendMsgToMnode(pDnode, &rpcMsg); } @@ -447,7 +448,7 @@ static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { a static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { - dDebug("config msg is received"); + dError("config msg is received, but not supported yet"); SCfgDnodeMsg *pCfg = pMsg->pCont; int32_t code = TSDB_CODE_OPS_NOT_SUPPORT; @@ -476,11 +477,11 @@ static void *dnodeThreadRoutine(void *param) { while (true) { pthread_testcancel(); + taosMsleep(ms); if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent) { dndSendStatusMsg(pDnode); } - taosMsleep(ms); } } @@ -584,4 +585,5 @@ void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { default: dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, taosMsg[pMsg->msgType]); } + rpcFreeCont(pMsg->pCont); } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 7eb6ee980a..3cf08e619e 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -559,9 +559,12 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { break; } - SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; - rpcSendResponse(&rsp); + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; + rpcSendResponse(&rsp); + } rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; taosFreeQitem(pMsg); } @@ -645,9 +648,12 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; - rpcSendResponse(&rsp); + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; + rpcSendResponse(&rsp); + } rpcFreeCont(pRpcMsg->pCont); + pRpcMsg->pCont = NULL; taosFreeQitem(pMsg); } } @@ -656,9 +662,12 @@ void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + rpcSendResponse(&rsp); + } rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; } dndReleaseMnode(pDnode, pMnode); @@ -668,9 +677,12 @@ void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + rpcSendResponse(&rsp); + } rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; } dndReleaseMnode(pDnode, pMnode); @@ -680,9 +692,12 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; + rpcSendResponse(&rsp); + } rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; } dndReleaseMnode(pDnode, pMnode); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 245a1e41f6..2ec02ac203 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -141,8 +141,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { dTrace("RPC %p, rsp:%s is processed, code:0x%0X", pMsg->handle, taosMsg[msgType], pMsg->code & 0XFFFF); } else { dError("RPC %p, rsp:%s not processed", pMsg->handle, taosMsg[msgType]); + rpcFreeCont(pMsg->pCont); } - rpcFreeCont(pMsg->pCont); } static int32_t dndInitClient(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index a5b118e67b..7f51979246 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -196,7 +196,7 @@ SDnode *dndInit(SDnodeOpt *pOption) { dndSetStat(pDnode, DND_STAT_RUNNING); dndSendStatusMsg(pDnode); dndReportStartup(pDnode, "TDengine", "initialized successfully"); - dInfo("TDengine is initialized successfully"); + dInfo("TDengine is initialized successfully, pDnode:%p", pDnode); return pDnode; } diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index dd8ba40b4f..6484e83846 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -24,15 +24,15 @@ class DndTestDnode : public ::testing::Test { } static void SetUpTestSuite() { - initLog("/tmp/dndTestDnode1"); + initLog("/tmp/dndTestDnode"); const char* fqdn = "localhost"; const char* firstEp = "localhost:9521"; pServer1 = CreateServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp); pServer2 = CreateServer("/tmp/dndTestDnode2", fqdn, 9522, firstEp); - // pServer3 = CreateServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp); - // pServer4 = CreateServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp); - // pServer5 = CreateServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp); + pServer3 = CreateServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp); + pServer4 = CreateServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp); + pServer5 = CreateServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp); pClient = createClient("root", "taosdata", fqdn, 9521); taosMsleep(300); } @@ -58,7 +58,6 @@ class DndTestDnode : public ::testing::Test { void TearDown() override {} void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) { - //--- meta --- SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); pShow->type = showType; strcpy(pShow->db, ""); @@ -108,7 +107,7 @@ class DndTestDnode : public ::testing::Test { EXPECT_STREQ(pSchema->name, name); } - void SendThenCheckShowRetrieveMsg(int32_t rows) { + void SendThenCheckShowRetrieveMsg(int32_t rows, int32_t completed) { SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); pRetrieve->showId = htonl(showId); pRetrieve->free = 0; @@ -134,7 +133,7 @@ class DndTestDnode : public ::testing::Test { EXPECT_EQ(pRetrieveRsp->numOfRows, rows); EXPECT_EQ(pRetrieveRsp->offset, 0); EXPECT_EQ(pRetrieveRsp->useconds, 0); - EXPECT_EQ(pRetrieveRsp->completed, 1); + EXPECT_EQ(pRetrieveRsp->completed, completed); EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(pRetrieveRsp->compressed, 0); EXPECT_EQ(pRetrieveRsp->reserved, 0); @@ -193,7 +192,7 @@ TEST_F(DndTestDnode, ShowDnode) { CheckSchema(5, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "offline reason"); - SendThenCheckShowRetrieveMsg(1); + SendThenCheckShowRetrieveMsg(1, 1); CheckInt16(1); CheckBinary("localhost:9521", TSDB_EP_LEN); CheckInt16(0); @@ -203,6 +202,22 @@ TEST_F(DndTestDnode, ShowDnode) { CheckBinary("", 24); } +TEST_F(DndTestDnode, ConfigDnode_01) { + SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(sizeof(SCfgDnodeMsg)); + pReq->dnodeId = htonl(1); + strcpy(pReq->config, "ddebugflag 131"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCfgDnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CONFIG_DNODE; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); +} + TEST_F(DndTestDnode, CreateDnode_01) { SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg)); strcpy(pReq->ep, "localhost:9522"); @@ -219,7 +234,7 @@ TEST_F(DndTestDnode, CreateDnode_01) { taosMsleep(1300); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); - SendThenCheckShowRetrieveMsg(2); + SendThenCheckShowRetrieveMsg(2, 1); CheckInt16(1); CheckInt16(2); CheckBinary("localhost:9521", TSDB_EP_LEN); @@ -236,135 +251,107 @@ TEST_F(DndTestDnode, CreateDnode_01) { CheckBinary("", 24); } -#if 0 -TEST_F(DndTestDnode, AlterUser_01) { - ASSERT_NE(pClient, nullptr); - - //--- drop user --- - SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg)); - strcpy(pReq->user, "u1"); - strcpy(pReq->pass, "p2"); +TEST_F(DndTestDnode, DropDnode_01) { + SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(sizeof(SDropDnodeMsg)); + pReq->dnodeId = htonl(2); SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SAlterUserMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_USER; + rpcMsg.contLen = sizeof(SDropDnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_DROP_DNODE; sendMsg(pClient, &rpcMsg); SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - //--- meta --- - SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); - pShow->type = TSDB_MGMT_TABLE_USER; - SRpcMsg showRpcMsg = {0}; - showRpcMsg.pCont = pShow; - showRpcMsg.contLen = sizeof(SShowMsg); - showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; - - sendMsg(pClient, &showRpcMsg); - SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; - STableMetaMsg* pMeta = &pShowRsp->tableMeta; - pMeta->numOfColumns = htons(pMeta->numOfColumns); - EXPECT_EQ(pMeta->numOfColumns, 4); - - //--- retrieve --- - SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); - pRetrieve->showId = pShowRsp->showId; - SRpcMsg retrieveRpcMsg = {0}; - retrieveRpcMsg.pCont = pRetrieve; - retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); - retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - - sendMsg(pClient, &retrieveRpcMsg); - SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; - pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); - EXPECT_EQ(pRetrieveRsp->numOfRows, 3); - - char* pData = pRetrieveRsp->data; - int32_t pos = 0; - char* strVal = NULL; - - //--- name --- - { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "u1"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); - - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "_root"); - } + taosMsleep(1300); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); + SendThenCheckShowRetrieveMsg(1, 0); + CheckInt16(1); + CheckBinary("localhost:9521", TSDB_EP_LEN); + CheckInt16(0); + CheckInt16(1); + CheckBinary("ready", 10); + CheckTimestamp(); + CheckBinary("", 24); } -TEST_F(DndTestDnode, DropUser_01) { - ASSERT_NE(pClient, nullptr); - - //--- drop user --- - SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg)); - strcpy(pReq->user, "u1"); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SDropUserMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_DROP_USER; - - sendMsg(pClient, &rpcMsg); - SRpcMsg* pMsg = pClient->pRsp; - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - - //--- meta --- - SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); - pShow->type = TSDB_MGMT_TABLE_USER; - SRpcMsg showRpcMsg = {0}; - showRpcMsg.pCont = pShow; - showRpcMsg.contLen = sizeof(SShowMsg); - showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; - - sendMsg(pClient, &showRpcMsg); - SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; - STableMetaMsg* pMeta = &pShowRsp->tableMeta; - pMeta->numOfColumns = htons(pMeta->numOfColumns); - EXPECT_EQ(pMeta->numOfColumns, 4); - - //--- retrieve --- - SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); - pRetrieve->showId = pShowRsp->showId; - SRpcMsg retrieveRpcMsg = {0}; - retrieveRpcMsg.pCont = pRetrieve; - retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); - retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; - - sendMsg(pClient, &retrieveRpcMsg); - SRetrieveTableRsp* pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; - pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); - EXPECT_EQ(pRetrieveRsp->numOfRows, 2); - - char* pData = pRetrieveRsp->data; - int32_t pos = 0; - char* strVal = NULL; - - //--- name --- +TEST_F(DndTestDnode, CreateDnode_02) { { - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "root"); + SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg)); + strcpy(pReq->ep, "localhost:9523"); - pos += sizeof(VarDataLenT); - strVal = (char*)(pData + pos); - pos += TSDB_USER_LEN; - EXPECT_STREQ(strVal, "_root"); + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateDnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); } -} -#endif \ No newline at end of file + { + SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg)); + strcpy(pReq->ep, "localhost:9524"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateDnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } + + { + SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg)); + strcpy(pReq->ep, "localhost:9525"); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateDnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } + + taosMsleep(1300); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); + SendThenCheckShowRetrieveMsg(4, 0); + CheckInt16(1); + CheckInt16(3); + CheckInt16(4); + CheckInt16(5); + CheckBinary("localhost:9521", TSDB_EP_LEN); + CheckBinary("localhost:9523", TSDB_EP_LEN); + CheckBinary("localhost:9524", TSDB_EP_LEN); + CheckBinary("localhost:9525", TSDB_EP_LEN); + CheckInt16(0); + CheckInt16(0); + CheckInt16(0); + CheckInt16(0); + CheckInt16(1); + CheckInt16(1); + CheckInt16(1); + CheckInt16(1); + CheckBinary("ready", 10); + CheckBinary("ready", 10); + CheckBinary("ready", 10); + CheckBinary("ready", 10); + CheckTimestamp(); + CheckTimestamp(); + CheckTimestamp(); + CheckTimestamp(); + CheckBinary("", 24); + CheckBinary("", 24); + CheckBinary("", 24); + CheckBinary("", 24); +} diff --git a/source/dnode/mgmt/impl/test/sut/deploy.cpp b/source/dnode/mgmt/impl/test/sut/deploy.cpp index 0aa01ed0d0..13baab54fa 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.cpp +++ b/source/dnode/mgmt/impl/test/sut/deploy.cpp @@ -16,9 +16,9 @@ #include "deploy.h" void initLog(const char* path) { - dDebugFlag = 207; + dDebugFlag = 143; vDebugFlag = 0; - mDebugFlag = 207; + mDebugFlag = 143; cDebugFlag = 0; jniDebugFlag = 0; tmrDebugFlag = 0; @@ -35,6 +35,8 @@ void initLog(const char* path) { tsdbDebugFlag = 0; cqDebugFlag = 0; + taosMkDir(path); + char temp[PATH_MAX]; snprintf(temp, PATH_MAX, "%s/taosdlog", path); if (taosInitLog(temp, tsNumOfLogLines, 1) != 0) { diff --git a/source/dnode/mgmt/impl/test/sut/deploy.h b/source/dnode/mgmt/impl/test/sut/deploy.h index d3b911640d..d77bebc6f8 100644 --- a/source/dnode/mgmt/impl/test/sut/deploy.h +++ b/source/dnode/mgmt/impl/test/sut/deploy.h @@ -45,5 +45,3 @@ void dropServer(SServer* pServer); SClient* createClient(const char* user, const char* pass, const char* fqdn, uint16_t port); void dropClient(SClient* pClient); void sendMsg(SClient* pClient, SRpcMsg* pMsg); - -// class DndTest \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index a610eaeeb9..c2b2acfa0c 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "mndShow.h" +#define SHOW_STEP_SIZE 100 + static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowMsg *pMsg); static void mndFreeShowObj(SShowObj *pShow); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int32_t showId); @@ -211,7 +213,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { } /* return no more than 100 tables in one round trip */ - if (rowsToRead > 100) rowsToRead = 100; + if (rowsToRead > SHOW_STEP_SIZE) rowsToRead = SHOW_STEP_SIZE; /* * the actual number of table may be larger than the value of pShow->numOfRows, if a query is @@ -220,7 +222,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) { if (rowsToRead < 0) rowsToRead = 0; size = pShow->rowSize * rowsToRead; - size += 100; + size += SHOW_STEP_SIZE; SRetrieveTableRsp *pRsp = rpcMallocCont(size); if (pRsp == NULL) { mndReleaseShowObj(pShow, false); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index c83fd9124c..e543478608 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -336,8 +336,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { } SRpcConnInfo connInfo = {0}; - if (rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { - mndCleanupMsg(pMsg); + if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { + taosFreeQitem(pMsg); terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; mError("RPC:%p, app:%p failed to create msg since %s", pRpcMsg->handle, pRpcMsg->ahandle, terrstr()); return NULL; @@ -354,6 +354,8 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { void mndCleanupMsg(SMnodeMsg *pMsg) { mTrace("msg:%p, app:%p is destroyed, RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle); + rpcFreeCont(pMsg->rpcMsg.pCont); + pMsg->rpcMsg.pCont = NULL; taosFreeQitem(pMsg); } @@ -367,7 +369,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) { int32_t code = 0; int32_t msgType = pMsg->rpcMsg.msgType; void *ahandle = pMsg->rpcMsg.ahandle; - bool isReq = (msgType % 2 == 1); + bool isReq = (msgType & 1U); mTrace("msg:%p, app:%p type:%s will be processed", pMsg, ahandle, taosMsg[msgType]);