diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index da3295b5d7..1b2e56edfe 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -27,7 +27,7 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) { int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); - if (pMgmt->deployed && !pMgmt->dropped) { + if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pBnode != NULL) { refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); pBnode = pMgmt->pBnode; } else { @@ -170,7 +170,7 @@ static void dndStopBnodeWorker(SDnode *pDnode) { pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); - while (pMgmt->refCount > 1) { + while (pMgmt->refCount > 0) { taosMsleep(10); } @@ -189,10 +189,18 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { static int32_t dndOpenBnode(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; - SBnodeOpt option = {0}; + SBnode *pBnode = dndAcquireBnode(pDnode); + if (pBnode != NULL) { + dndReleaseBnode(pDnode, pBnode); + terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED; + dError("failed to create bnode since %s", terrstr()); + return -1; + } + + SBnodeOpt option = {0}; dndBuildBnodeOption(pDnode, &option); - SBnode *pBnode = bndOpen(pDnode->dir.bnode, &option); + pBnode = bndOpen(pDnode->dir.bnode, &option); if (pBnode == NULL) { dError("failed to open bnode since %s", terrstr()); return -1; @@ -261,6 +269,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + dError("failed to create bnode since %s", terrstr()); return -1; } else { return dndOpenBnode(pDnode); @@ -273,6 +282,7 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + dError("failed to drop bnode since %s", terrstr()); return -1; } else { return dndDropBnode(pDnode); diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 642c17b661..7dd3e168ce 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -27,7 +27,7 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) { int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); - if (pMgmt->deployed && !pMgmt->dropped) { + if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pQnode != NULL) { refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); pQnode = pMgmt->pQnode; } else { @@ -175,7 +175,7 @@ static void dndStopQnodeWorker(SDnode *pDnode) { pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); - while (pMgmt->refCount > 1) { + while (pMgmt->refCount > 0) { taosMsleep(10); } @@ -195,10 +195,19 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { static int32_t dndOpenQnode(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SQnodeOpt option = {0}; + + SQnode *pQnode = dndAcquireQnode(pDnode); + if (pQnode != NULL) { + dndReleaseQnode(pDnode, pQnode); + terrno = TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED; + dError("failed to create qnode since %s", terrstr()); + return -1; + } + + SQnodeOpt option = {0}; dndBuildQnodeOption(pDnode, &option); - SQnode *pQnode = qndOpen(&option); + pQnode = qndOpen(&option); if (pQnode == NULL) { dError("failed to open qnode since %s", terrstr()); return -1; @@ -266,6 +275,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_QNODE_ID_INVALID; + dError("failed to create qnode since %s", terrstr()); return -1; } else { return dndOpenQnode(pDnode); @@ -278,6 +288,7 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_QNODE_ID_INVALID; + dError("failed to drop qnode since %s", terrstr()); return -1; } else { return dndDropQnode(pDnode); @@ -293,6 +304,7 @@ static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { if (pQnode != NULL) { code = qndProcessMsg(pQnode, pMsg, &pRsp); } + dndReleaseQnode(pDnode, pQnode); if (pRsp != NULL) { pRsp->ahandle = pMsg->ahandle; diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index a295a2d481..567c7dcdb0 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -27,7 +27,7 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) { int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); - if (pMgmt->deployed && !pMgmt->dropped) { + if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pSnode != NULL) { refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); pSnode = pMgmt->pSnode; } else { @@ -170,9 +170,9 @@ static void dndStopSnodeWorker(SDnode *pDnode) { pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); - while (pMgmt->refCount > 1) { + while (pMgmt->refCount > 0) { taosMsleep(10); - } + } dndCleanupWorker(&pMgmt->writeWorker); } @@ -189,10 +189,18 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { static int32_t dndOpenSnode(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - SSnodeOpt option = {0}; + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + dndReleaseSnode(pDnode, pSnode); + terrno = TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED; + dError("failed to create snode since %s", terrstr()); + return -1; + } + + SSnodeOpt option = {0}; dndBuildSnodeOption(pDnode, &option); - SSnode *pSnode = sndOpen(pDnode->dir.snode, &option); + pSnode = sndOpen(pDnode->dir.snode, &option); if (pSnode == NULL) { dError("failed to open snode since %s", terrstr()); return -1; @@ -261,6 +269,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + dError("failed to create snode since %s", terrstr()); return -1; } else { return dndOpenSnode(pDnode); @@ -273,6 +282,7 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + dError("failed to drop snode since %s", terrstr()); return -1; } else { return dndDropSnode(pDnode); @@ -288,6 +298,7 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { if (pSnode != NULL) { code = sndProcessMsg(pSnode, pMsg, &pRsp); } + dndReleaseSnode(pDnode, pSnode); if (pRsp != NULL) { pRsp->ahandle = pMsg->ahandle; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 33cec5ff47..ef5c15743c 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -14,8 +14,11 @@ */ #define _DEFAULT_SOURCE +#include "dndBnode.h" #include "dndDnode.h" #include "dndMnode.h" +#include "dndQnode.h" +#include "dndSnode.h" #include "dndTransport.h" #include "dndVnodes.h" #include "sync.h" @@ -200,6 +203,24 @@ SDnode *dndInit(SDnodeOpt *pOption) { return NULL; } + if (dndInitQnode(pDnode) != 0) { + dError("failed to init qnode"); + dndCleanup(pDnode); + return NULL; + } + + if (dndInitSnode(pDnode) != 0) { + dError("failed to init snode"); + dndCleanup(pDnode); + return NULL; + } + + if (dndInitBnode(pDnode) != 0) { + dError("failed to init bnode"); + dndCleanup(pDnode); + return NULL; + } + if (dndInitMnode(pDnode) != 0) { dError("failed to init mnode"); dndCleanup(pDnode); @@ -232,6 +253,9 @@ void dndCleanup(SDnode *pDnode) { dndSetStat(pDnode, DND_STAT_STOPPED); dndCleanupTrans(pDnode); dndCleanupMnode(pDnode); + dndCleanupBnode(pDnode); + dndCleanupSnode(pDnode); + dndCleanupQnode(pDnode); dndCleanupVnodes(pDnode); dndCleanupDnode(pDnode); vnodeClear(); diff --git a/source/dnode/mgmt/impl/test/qnode/dqnode.cpp b/source/dnode/mgmt/impl/test/qnode/dqnode.cpp index fa9038107e..7c8b502a58 100644 --- a/source/dnode/mgmt/impl/test/qnode/dqnode.cpp +++ b/source/dnode/mgmt/impl/test/qnode/dqnode.cpp @@ -25,43 +25,50 @@ class DndTestQnode : public ::testing::Test { Testbase DndTestQnode::test; -TEST_F(DndTestQnode, 04_Drop_User) { +TEST_F(DndTestQnode, 01_Create_Qnode_Invalid) { { - int32_t contLen = sizeof(SDropUserReq); + int32_t contLen = sizeof(SDCreateQnodeReq); - SDropUserReq* pReq = (SDropUserReq*)rpcMallocCont(contLen); - strcpy(pReq->user, ""); + SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); - SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_USER, pReq, contLen); + SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_USER_FORMAT); + ASSERT_EQ(pMsg->code, TSDB_CODE_DND_QNODE_ID_INVALID); } - // { - // int32_t contLen = sizeof(SDropUserReq); + { + int32_t contLen = sizeof(SDCreateQnodeReq); - // SDropUserReq* pReq = (SDropUserReq*)rpcMallocCont(contLen); - // strcpy(pReq->user, "u4"); + SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); - // SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_USER, pReq, contLen); - // ASSERT_NE(pMsg, nullptr); - // ASSERT_EQ(pMsg->code, TSDB_CODE_MND_USER_NOT_EXIST); - // } + SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } - // { - // int32_t contLen = sizeof(SDropUserReq); + { + int32_t contLen = sizeof(SDCreateQnodeReq); - // SDropUserReq* pReq = (SDropUserReq*)rpcMallocCont(contLen); - // strcpy(pReq->user, "u1"); + SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); - // SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_USER, pReq, contLen); - // ASSERT_NE(pMsg, nullptr); - // ASSERT_EQ(pMsg->code, 0); - // } + SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED); + } - // test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, ""); - // CHECK_META("show users", 4); + test.Restart(); - // test.SendShowRetrieveMsg(); - // EXPECT_EQ(test.GetShowRows(), 1); + { + int32_t contLen = sizeof(SDCreateQnodeReq); + + SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED); + } } \ No newline at end of file