This commit is contained in:
Shengliang Guan 2022-03-17 14:33:25 +08:00
parent b16e686809
commit 43083fa3e2
6 changed files with 99 additions and 62 deletions

View File

@ -158,6 +158,9 @@ int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, cons
void dndCleanupWorker(SDnodeWorker *pWorker);
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen);
int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif

View File

@ -20,7 +20,6 @@
#include "bm.h"
#include "dm.h"
#include "dndInt.h"
#include "mm.h"
#include "qmInt.h"
#include "smInt.h"

View File

@ -0,0 +1,72 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndInt.h"
static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = NULL;
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_CREATE_MNODE:
return dndGetWrapper(pDnode, MNODE);
case TDMT_DND_CREATE_QNODE:
return dndGetWrapper(pDnode, QNODE);
case TDMT_DND_CREATE_SNODE:
return dndGetWrapper(pDnode, SNODE);
case TDMT_DND_CREATE_BNODE:
return dndGetWrapper(pDnode, BNODE);
default:
return NULL;
}
}
int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg);
if (pWrapper->procType == PROC_SINGLE) {
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_CREATE_MNODE:
return mmProcessCreateReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_CREATE_QNODE:
return qmProcessCreateReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_CREATE_SNODE:
return smProcessCreateReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_CREATE_BNODE:
return bmProcessCreateReq(pWrapper->pMgmt, pMsg);
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
}
int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg);
switch (pMsg->rpcMsg.msgType) {
case TDMT_DND_DROP_MNODE:
return mmProcessDropReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_DROP_QNODE:
return qmProcessDropReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_DROP_SNODE:
return smProcessDropReq(pWrapper->pMgmt, pMsg);
case TDMT_DND_DROP_BNODE:
return bmProcessDropReq(pWrapper->pMgmt, pMsg);
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
}

View File

@ -53,62 +53,27 @@ static void *dmThreadRoutine(void *param) {
}
static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
SDnode *pDnode = pMgmt->pDnode;
SMgmtWrapper *pWrapper = NULL;
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
SDnode *pDnode = pMgmt->pDnode;
dTrace("msg:%p, will be processed", pMsg);
switch (msgType) {
case TDMT_DND_CREATE_MNODE:
pWrapper = dndGetWrapper(pDnode, MNODE);
code = mmProcessCreateReq(pWrapper->pMgmt, pMsg);
break;
case TDMT_DND_DROP_MNODE:
pWrapper = dndGetWrapper(pDnode, MNODE);
code = mmProcessDropReq(pWrapper->pMgmt, pMsg);
break;
case TDMT_DND_CREATE_QNODE:
pWrapper = dndGetWrapper(pDnode, QNODE);
code = qmProcessCreateReq(pWrapper->pMgmt, pMsg);
break;
case TDMT_DND_DROP_QNODE:
pWrapper = dndGetWrapper(pDnode, QNODE);
code = qmProcessDropReq(pWrapper->pMgmt, pMsg);
break;
case TDMT_DND_CREATE_SNODE:
pWrapper = dndGetWrapper(pDnode, SNODE);
code = smProcessCreateReq(pWrapper->pMgmt, pMsg);
break;
case TDMT_DND_DROP_SNODE:
pWrapper = dndGetWrapper(pDnode, SNODE);
code = smProcessDropReq(pWrapper->pMgmt, pMsg);
break;
case TDMT_DND_CREATE_BNODE:
pWrapper = dndGetWrapper(pDnode, BNODE);
code = bmProcessCreateReq(pWrapper->pMgmt, pMsg);
break;
code = dndProcessCreateNodeMsg(pMgmt->pDnode, pMsg);
case TDMT_DND_DROP_MNODE:
case TDMT_DND_DROP_QNODE:
case TDMT_DND_DROP_SNODE:
case TDMT_DND_DROP_BNODE:
pWrapper = dndGetWrapper(pDnode, BNODE);
code = bmProcessDropReq(pWrapper->pMgmt, pMsg);
break;
case TDMT_DND_CONFIG_DNODE:
code = dmProcessConfigReq(pMgmt, pMsg);
break;
case TDMT_MND_STATUS_RSP:
code = dmProcessStatusRsp(pMgmt, pMsg);
break;
case TDMT_MND_AUTH_RSP:
code = dmProcessAuthRsp(pMgmt, pMsg);
break;
case TDMT_MND_GRANT_RSP:
code = dmProcessGrantRsp(pMgmt, pMsg);
break;
code = dndProcessDropNodeMsg(pMgmt->pDnode, pMsg);
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType));
break;
}
if (msgType & 1u) {
@ -117,10 +82,9 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.pCont = NULL;
taosFreeQitem(pMsg);
dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {

View File

@ -28,7 +28,7 @@ class TestServer {
private:
SDnode* pDnode;
pthread_t* threadId;
pthread_t threadId;
char path[PATH_MAX];
char fqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN];

View File

@ -16,10 +16,9 @@
#include "sut.h"
void* serverLoop(void* param) {
while (1) {
taosMsleep(100);
pthread_testcancel();
}
SDnode* pDnode = (SDnode*)param;
dndRun(pDnode);
return NULL;
}
SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) {
@ -38,14 +37,16 @@ bool TestServer::DoStart() {
taosMkDir(path);
pDnode = dndCreate(&option);
if (pDnode != NULL) {
if (pDnode == NULL) {
return false;
}
threadId = taosCreateThread(serverLoop, NULL);
if (threadId != NULL) {
return false;
}
pthread_attr_t thAttr;
pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threadId, &thAttr, serverLoop, pDnode);
pthread_attr_destroy(&thAttr);
taosMsleep(1000);
return true;
}
@ -67,10 +68,8 @@ bool TestServer::Start(const char* path, const char* fqdn, uint16_t port, const
}
void TestServer::Stop() {
if (threadId != NULL) {
taosDestoryThread(threadId);
threadId = NULL;
}
dndHandleEvent(pDnode, DND_EVENT_STOP);
pthread_join(threadId, NULL);
if (pDnode != NULL) {
dndClose(pDnode);