From 43083fa3e26a796cca3593c41d6449af2d8f9fc4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 17 Mar 2022 14:33:25 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/container/inc/dnd.h | 3 + source/dnode/mgmt/container/inc/dndInt.h | 1 - source/dnode/mgmt/container/src/dndMsg.c | 72 +++++++++++++++++++++++ source/dnode/mgmt/dnode/src/dmWorker.c | 58 ++++-------------- source/dnode/mgmt/test/sut/inc/server.h | 2 +- source/dnode/mgmt/test/sut/src/server.cpp | 25 ++++---- 6 files changed, 99 insertions(+), 62 deletions(-) create mode 100644 source/dnode/mgmt/container/src/dndMsg.c diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 27f0886e3a..cbcebb2bff 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -158,6 +158,9 @@ int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, cons void dndCleanupWorker(SDnodeWorker *pWorker); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); +int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); +int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index aa00774603..0d450e3625 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -20,7 +20,6 @@ #include "bm.h" #include "dm.h" -#include "dndInt.h" #include "mm.h" #include "qmInt.h" #include "smInt.h" diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c new file mode 100644 index 0000000000..1067301615 --- /dev/null +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndInt.h" + +static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) { + SMgmtWrapper *pWrapper = NULL; + switch (pMsg->rpcMsg.msgType) { + case TDMT_DND_CREATE_MNODE: + return dndGetWrapper(pDnode, MNODE); + case TDMT_DND_CREATE_QNODE: + return dndGetWrapper(pDnode, QNODE); + case TDMT_DND_CREATE_SNODE: + return dndGetWrapper(pDnode, SNODE); + case TDMT_DND_CREATE_BNODE: + return dndGetWrapper(pDnode, BNODE); + default: + return NULL; + } +} + +int32_t dndProcessCreateNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { + SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg); + if (pWrapper->procType == PROC_SINGLE) { + switch (pMsg->rpcMsg.msgType) { + case TDMT_DND_CREATE_MNODE: + return mmProcessCreateReq(pWrapper->pMgmt, pMsg); + case TDMT_DND_CREATE_QNODE: + return qmProcessCreateReq(pWrapper->pMgmt, pMsg); + case TDMT_DND_CREATE_SNODE: + return smProcessCreateReq(pWrapper->pMgmt, pMsg); + case TDMT_DND_CREATE_BNODE: + return bmProcessCreateReq(pWrapper->pMgmt, pMsg); + default: + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; + } + } else { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; + } +} + +int32_t dndProcessDropNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { + SMgmtWrapper *pWrapper = dndGetWrapperFromMsg(pDnode, pMsg); + switch (pMsg->rpcMsg.msgType) { + case TDMT_DND_DROP_MNODE: + return mmProcessDropReq(pWrapper->pMgmt, pMsg); + case TDMT_DND_DROP_QNODE: + return qmProcessDropReq(pWrapper->pMgmt, pMsg); + case TDMT_DND_DROP_SNODE: + return smProcessDropReq(pWrapper->pMgmt, pMsg); + case TDMT_DND_DROP_BNODE: + return bmProcessDropReq(pWrapper->pMgmt, pMsg); + default: + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; + } +} \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 80a6b374aa..c135473d25 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -53,62 +53,27 @@ static void *dmThreadRoutine(void *param) { } static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; - SDnode *pDnode = pMgmt->pDnode; - SMgmtWrapper *pWrapper = NULL; + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + SDnode *pDnode = pMgmt->pDnode; dTrace("msg:%p, will be processed", pMsg); switch (msgType) { case TDMT_DND_CREATE_MNODE: - pWrapper = dndGetWrapper(pDnode, MNODE); - code = mmProcessCreateReq(pWrapper->pMgmt, pMsg); - break; - case TDMT_DND_DROP_MNODE: - pWrapper = dndGetWrapper(pDnode, MNODE); - code = mmProcessDropReq(pWrapper->pMgmt, pMsg); - break; case TDMT_DND_CREATE_QNODE: - pWrapper = dndGetWrapper(pDnode, QNODE); - code = qmProcessCreateReq(pWrapper->pMgmt, pMsg); - break; - case TDMT_DND_DROP_QNODE: - pWrapper = dndGetWrapper(pDnode, QNODE); - code = qmProcessDropReq(pWrapper->pMgmt, pMsg); - break; case TDMT_DND_CREATE_SNODE: - pWrapper = dndGetWrapper(pDnode, SNODE); - code = smProcessCreateReq(pWrapper->pMgmt, pMsg); - break; - case TDMT_DND_DROP_SNODE: - pWrapper = dndGetWrapper(pDnode, SNODE); - code = smProcessDropReq(pWrapper->pMgmt, pMsg); - break; case TDMT_DND_CREATE_BNODE: - pWrapper = dndGetWrapper(pDnode, BNODE); - code = bmProcessCreateReq(pWrapper->pMgmt, pMsg); - break; + code = dndProcessCreateNodeMsg(pMgmt->pDnode, pMsg); + + case TDMT_DND_DROP_MNODE: + case TDMT_DND_DROP_QNODE: + case TDMT_DND_DROP_SNODE: case TDMT_DND_DROP_BNODE: - pWrapper = dndGetWrapper(pDnode, BNODE); - code = bmProcessDropReq(pWrapper->pMgmt, pMsg); - break; - case TDMT_DND_CONFIG_DNODE: - code = dmProcessConfigReq(pMgmt, pMsg); - break; - case TDMT_MND_STATUS_RSP: - code = dmProcessStatusRsp(pMgmt, pMsg); - break; - case TDMT_MND_AUTH_RSP: - code = dmProcessAuthRsp(pMgmt, pMsg); - break; - case TDMT_MND_GRANT_RSP: - code = dmProcessGrantRsp(pMgmt, pMsg); - break; + code = dndProcessDropNodeMsg(pMgmt->pDnode, pMsg); default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; code = -1; dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType)); - break; } if (msgType & 1u) { @@ -117,10 +82,9 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { rpcSendResponse(&rsp); } - rpcFreeCont(pMsg->rpcMsg.pCont); - pMsg->rpcMsg.pCont = NULL; - taosFreeQitem(pMsg); dTrace("msg:%p, is freed", pMsg); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/test/sut/inc/server.h b/source/dnode/mgmt/test/sut/inc/server.h index 5f9e4846a7..0da880dee6 100644 --- a/source/dnode/mgmt/test/sut/inc/server.h +++ b/source/dnode/mgmt/test/sut/inc/server.h @@ -28,7 +28,7 @@ class TestServer { private: SDnode* pDnode; - pthread_t* threadId; + pthread_t threadId; char path[PATH_MAX]; char fqdn[TSDB_FQDN_LEN]; char firstEp[TSDB_EP_LEN]; diff --git a/source/dnode/mgmt/test/sut/src/server.cpp b/source/dnode/mgmt/test/sut/src/server.cpp index e8ccc700e2..a9f6a99fc8 100644 --- a/source/dnode/mgmt/test/sut/src/server.cpp +++ b/source/dnode/mgmt/test/sut/src/server.cpp @@ -16,10 +16,9 @@ #include "sut.h" void* serverLoop(void* param) { - while (1) { - taosMsleep(100); - pthread_testcancel(); - } + SDnode* pDnode = (SDnode*)param; + dndRun(pDnode); + return NULL; } SDnodeOpt TestServer::BuildOption(const char* path, const char* fqdn, uint16_t port, const char* firstEp) { @@ -38,14 +37,16 @@ bool TestServer::DoStart() { taosMkDir(path); pDnode = dndCreate(&option); - if (pDnode != NULL) { + if (pDnode == NULL) { return false; } - threadId = taosCreateThread(serverLoop, NULL); - if (threadId != NULL) { - return false; - } + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + pthread_create(&threadId, &thAttr, serverLoop, pDnode); + pthread_attr_destroy(&thAttr); + taosMsleep(1000); return true; } @@ -67,10 +68,8 @@ bool TestServer::Start(const char* path, const char* fqdn, uint16_t port, const } void TestServer::Stop() { - if (threadId != NULL) { - taosDestoryThread(threadId); - threadId = NULL; - } + dndHandleEvent(pDnode, DND_EVENT_STOP); + pthread_join(threadId, NULL); if (pDnode != NULL) { dndClose(pDnode);