test for dqnode
This commit is contained in:
parent
268c3343d5
commit
c5decfec28
|
@ -27,7 +27,7 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) {
|
||||||
int32_t refCount = 0;
|
int32_t refCount = 0;
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
if (pMgmt->deployed && !pMgmt->dropped) {
|
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pBnode != NULL) {
|
||||||
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
||||||
pBnode = pMgmt->pBnode;
|
pBnode = pMgmt->pBnode;
|
||||||
} else {
|
} else {
|
||||||
|
@ -170,7 +170,7 @@ static void dndStopBnodeWorker(SDnode *pDnode) {
|
||||||
pMgmt->deployed = 0;
|
pMgmt->deployed = 0;
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
while (pMgmt->refCount > 1) {
|
while (pMgmt->refCount > 0) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,10 +189,18 @@ static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) {
|
||||||
|
|
||||||
static int32_t dndOpenBnode(SDnode *pDnode) {
|
static int32_t dndOpenBnode(SDnode *pDnode) {
|
||||||
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
SBnodeMgmt *pMgmt = &pDnode->bmgmt;
|
||||||
SBnodeOpt option = {0};
|
SBnode *pBnode = dndAcquireBnode(pDnode);
|
||||||
|
if (pBnode != NULL) {
|
||||||
|
dndReleaseBnode(pDnode, pBnode);
|
||||||
|
terrno = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
|
||||||
|
dError("failed to create bnode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SBnodeOpt option = {0};
|
||||||
dndBuildBnodeOption(pDnode, &option);
|
dndBuildBnodeOption(pDnode, &option);
|
||||||
|
|
||||||
SBnode *pBnode = bndOpen(pDnode->dir.bnode, &option);
|
pBnode = bndOpen(pDnode->dir.bnode, &option);
|
||||||
if (pBnode == NULL) {
|
if (pBnode == NULL) {
|
||||||
dError("failed to open bnode since %s", terrstr());
|
dError("failed to open bnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -261,6 +269,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
|
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
|
||||||
|
dError("failed to create bnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return dndOpenBnode(pDnode);
|
return dndOpenBnode(pDnode);
|
||||||
|
@ -273,6 +282,7 @@ int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
|
terrno = TSDB_CODE_DND_BNODE_ID_INVALID;
|
||||||
|
dError("failed to drop bnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return dndDropBnode(pDnode);
|
return dndDropBnode(pDnode);
|
||||||
|
|
|
@ -27,7 +27,7 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) {
|
||||||
int32_t refCount = 0;
|
int32_t refCount = 0;
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
if (pMgmt->deployed && !pMgmt->dropped) {
|
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pQnode != NULL) {
|
||||||
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
||||||
pQnode = pMgmt->pQnode;
|
pQnode = pMgmt->pQnode;
|
||||||
} else {
|
} else {
|
||||||
|
@ -175,7 +175,7 @@ static void dndStopQnodeWorker(SDnode *pDnode) {
|
||||||
pMgmt->deployed = 0;
|
pMgmt->deployed = 0;
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
while (pMgmt->refCount > 1) {
|
while (pMgmt->refCount > 0) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,10 +195,19 @@ static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
|
||||||
|
|
||||||
static int32_t dndOpenQnode(SDnode *pDnode) {
|
static int32_t dndOpenQnode(SDnode *pDnode) {
|
||||||
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
|
||||||
SQnodeOpt option = {0};
|
|
||||||
|
SQnode *pQnode = dndAcquireQnode(pDnode);
|
||||||
|
if (pQnode != NULL) {
|
||||||
|
dndReleaseQnode(pDnode, pQnode);
|
||||||
|
terrno = TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED;
|
||||||
|
dError("failed to create qnode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SQnodeOpt option = {0};
|
||||||
dndBuildQnodeOption(pDnode, &option);
|
dndBuildQnodeOption(pDnode, &option);
|
||||||
|
|
||||||
SQnode *pQnode = qndOpen(&option);
|
pQnode = qndOpen(&option);
|
||||||
if (pQnode == NULL) {
|
if (pQnode == NULL) {
|
||||||
dError("failed to open qnode since %s", terrstr());
|
dError("failed to open qnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -266,6 +275,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
terrno = TSDB_CODE_DND_QNODE_ID_INVALID;
|
terrno = TSDB_CODE_DND_QNODE_ID_INVALID;
|
||||||
|
dError("failed to create qnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return dndOpenQnode(pDnode);
|
return dndOpenQnode(pDnode);
|
||||||
|
@ -278,6 +288,7 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
terrno = TSDB_CODE_DND_QNODE_ID_INVALID;
|
terrno = TSDB_CODE_DND_QNODE_ID_INVALID;
|
||||||
|
dError("failed to drop qnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return dndDropQnode(pDnode);
|
return dndDropQnode(pDnode);
|
||||||
|
@ -293,6 +304,7 @@ static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
if (pQnode != NULL) {
|
if (pQnode != NULL) {
|
||||||
code = qndProcessMsg(pQnode, pMsg, &pRsp);
|
code = qndProcessMsg(pQnode, pMsg, &pRsp);
|
||||||
}
|
}
|
||||||
|
dndReleaseQnode(pDnode, pQnode);
|
||||||
|
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pMsg->ahandle;
|
pRsp->ahandle = pMsg->ahandle;
|
||||||
|
|
|
@ -27,7 +27,7 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) {
|
||||||
int32_t refCount = 0;
|
int32_t refCount = 0;
|
||||||
|
|
||||||
taosRLockLatch(&pMgmt->latch);
|
taosRLockLatch(&pMgmt->latch);
|
||||||
if (pMgmt->deployed && !pMgmt->dropped) {
|
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pSnode != NULL) {
|
||||||
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
|
||||||
pSnode = pMgmt->pSnode;
|
pSnode = pMgmt->pSnode;
|
||||||
} else {
|
} else {
|
||||||
|
@ -170,9 +170,9 @@ static void dndStopSnodeWorker(SDnode *pDnode) {
|
||||||
pMgmt->deployed = 0;
|
pMgmt->deployed = 0;
|
||||||
taosWUnLockLatch(&pMgmt->latch);
|
taosWUnLockLatch(&pMgmt->latch);
|
||||||
|
|
||||||
while (pMgmt->refCount > 1) {
|
while (pMgmt->refCount > 0) {
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
dndCleanupWorker(&pMgmt->writeWorker);
|
dndCleanupWorker(&pMgmt->writeWorker);
|
||||||
}
|
}
|
||||||
|
@ -189,10 +189,18 @@ static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) {
|
||||||
|
|
||||||
static int32_t dndOpenSnode(SDnode *pDnode) {
|
static int32_t dndOpenSnode(SDnode *pDnode) {
|
||||||
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
SSnodeMgmt *pMgmt = &pDnode->smgmt;
|
||||||
SSnodeOpt option = {0};
|
SSnode *pSnode = dndAcquireSnode(pDnode);
|
||||||
|
if (pSnode != NULL) {
|
||||||
|
dndReleaseSnode(pDnode, pSnode);
|
||||||
|
terrno = TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED;
|
||||||
|
dError("failed to create snode since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSnodeOpt option = {0};
|
||||||
dndBuildSnodeOption(pDnode, &option);
|
dndBuildSnodeOption(pDnode, &option);
|
||||||
|
|
||||||
SSnode *pSnode = sndOpen(pDnode->dir.snode, &option);
|
pSnode = sndOpen(pDnode->dir.snode, &option);
|
||||||
if (pSnode == NULL) {
|
if (pSnode == NULL) {
|
||||||
dError("failed to open snode since %s", terrstr());
|
dError("failed to open snode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -261,6 +269,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
|
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
|
||||||
|
dError("failed to create snode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return dndOpenSnode(pDnode);
|
return dndOpenSnode(pDnode);
|
||||||
|
@ -273,6 +282,7 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
|
||||||
|
|
||||||
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
|
||||||
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
|
terrno = TSDB_CODE_DND_SNODE_ID_INVALID;
|
||||||
|
dError("failed to drop snode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return dndDropSnode(pDnode);
|
return dndDropSnode(pDnode);
|
||||||
|
@ -288,6 +298,7 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
if (pSnode != NULL) {
|
if (pSnode != NULL) {
|
||||||
code = sndProcessMsg(pSnode, pMsg, &pRsp);
|
code = sndProcessMsg(pSnode, pMsg, &pRsp);
|
||||||
}
|
}
|
||||||
|
dndReleaseSnode(pDnode, pSnode);
|
||||||
|
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pMsg->ahandle;
|
pRsp->ahandle = pMsg->ahandle;
|
||||||
|
|
|
@ -14,8 +14,11 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "dndBnode.h"
|
||||||
#include "dndDnode.h"
|
#include "dndDnode.h"
|
||||||
#include "dndMnode.h"
|
#include "dndMnode.h"
|
||||||
|
#include "dndQnode.h"
|
||||||
|
#include "dndSnode.h"
|
||||||
#include "dndTransport.h"
|
#include "dndTransport.h"
|
||||||
#include "dndVnodes.h"
|
#include "dndVnodes.h"
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
|
@ -200,6 +203,24 @@ SDnode *dndInit(SDnodeOpt *pOption) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dndInitQnode(pDnode) != 0) {
|
||||||
|
dError("failed to init qnode");
|
||||||
|
dndCleanup(pDnode);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndInitSnode(pDnode) != 0) {
|
||||||
|
dError("failed to init snode");
|
||||||
|
dndCleanup(pDnode);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dndInitBnode(pDnode) != 0) {
|
||||||
|
dError("failed to init bnode");
|
||||||
|
dndCleanup(pDnode);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (dndInitMnode(pDnode) != 0) {
|
if (dndInitMnode(pDnode) != 0) {
|
||||||
dError("failed to init mnode");
|
dError("failed to init mnode");
|
||||||
dndCleanup(pDnode);
|
dndCleanup(pDnode);
|
||||||
|
@ -232,6 +253,9 @@ void dndCleanup(SDnode *pDnode) {
|
||||||
dndSetStat(pDnode, DND_STAT_STOPPED);
|
dndSetStat(pDnode, DND_STAT_STOPPED);
|
||||||
dndCleanupTrans(pDnode);
|
dndCleanupTrans(pDnode);
|
||||||
dndCleanupMnode(pDnode);
|
dndCleanupMnode(pDnode);
|
||||||
|
dndCleanupBnode(pDnode);
|
||||||
|
dndCleanupSnode(pDnode);
|
||||||
|
dndCleanupQnode(pDnode);
|
||||||
dndCleanupVnodes(pDnode);
|
dndCleanupVnodes(pDnode);
|
||||||
dndCleanupDnode(pDnode);
|
dndCleanupDnode(pDnode);
|
||||||
vnodeClear();
|
vnodeClear();
|
||||||
|
|
|
@ -25,43 +25,50 @@ class DndTestQnode : public ::testing::Test {
|
||||||
|
|
||||||
Testbase DndTestQnode::test;
|
Testbase DndTestQnode::test;
|
||||||
|
|
||||||
TEST_F(DndTestQnode, 04_Drop_User) {
|
TEST_F(DndTestQnode, 01_Create_Qnode_Invalid) {
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(SDropUserReq);
|
int32_t contLen = sizeof(SDCreateQnodeReq);
|
||||||
|
|
||||||
SDropUserReq* pReq = (SDropUserReq*)rpcMallocCont(contLen);
|
SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen);
|
||||||
strcpy(pReq->user, "");
|
pReq->dnodeId = htonl(2);
|
||||||
|
|
||||||
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_USER, pReq, contLen);
|
SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen);
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_INVALID_USER_FORMAT);
|
ASSERT_EQ(pMsg->code, TSDB_CODE_DND_QNODE_ID_INVALID);
|
||||||
}
|
}
|
||||||
|
|
||||||
// {
|
{
|
||||||
// int32_t contLen = sizeof(SDropUserReq);
|
int32_t contLen = sizeof(SDCreateQnodeReq);
|
||||||
|
|
||||||
// SDropUserReq* pReq = (SDropUserReq*)rpcMallocCont(contLen);
|
SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen);
|
||||||
// strcpy(pReq->user, "u4");
|
pReq->dnodeId = htonl(1);
|
||||||
|
|
||||||
// SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_USER, pReq, contLen);
|
SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen);
|
||||||
// ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
// ASSERT_EQ(pMsg->code, TSDB_CODE_MND_USER_NOT_EXIST);
|
ASSERT_EQ(pMsg->code, 0);
|
||||||
// }
|
}
|
||||||
|
|
||||||
// {
|
{
|
||||||
// int32_t contLen = sizeof(SDropUserReq);
|
int32_t contLen = sizeof(SDCreateQnodeReq);
|
||||||
|
|
||||||
// SDropUserReq* pReq = (SDropUserReq*)rpcMallocCont(contLen);
|
SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen);
|
||||||
// strcpy(pReq->user, "u1");
|
pReq->dnodeId = htonl(1);
|
||||||
|
|
||||||
// SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_USER, pReq, contLen);
|
SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen);
|
||||||
// ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
// ASSERT_EQ(pMsg->code, 0);
|
ASSERT_EQ(pMsg->code, TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED);
|
||||||
// }
|
}
|
||||||
|
|
||||||
// test.SendShowMetaMsg(TSDB_MGMT_TABLE_USER, "");
|
test.Restart();
|
||||||
// CHECK_META("show users", 4);
|
|
||||||
|
|
||||||
// test.SendShowRetrieveMsg();
|
{
|
||||||
// EXPECT_EQ(test.GetShowRows(), 1);
|
int32_t contLen = sizeof(SDCreateQnodeReq);
|
||||||
|
|
||||||
|
SDCreateQnodeReq* pReq = (SDCreateQnodeReq*)rpcMallocCont(contLen);
|
||||||
|
pReq->dnodeId = htonl(1);
|
||||||
|
|
||||||
|
SRpcMsg* pMsg = test.SendMsg(TDMT_DND_CREATE_QNODE, pReq, contLen);
|
||||||
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
ASSERT_EQ(pMsg->code, TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue