From ec1d86eecca5356828bbb355360653f480ffa5b5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 17 Mar 2022 15:33:16 +0800 Subject: [PATCH] shm --- include/dnode/mgmt/dnode.h | 2 +- source/dnode/mgmt/container/inc/dnd.h | 3 + source/dnode/mgmt/container/inc/dndInt.h | 11 +- source/dnode/mgmt/container/src/dndMsg.c | 71 ++++++++ source/dnode/mgmt/container/src/dndNode.c | 211 +--------------------- source/dnode/mgmt/container/src/dndObj.c | 138 ++++++++++++++ 6 files changed, 225 insertions(+), 211 deletions(-) create mode 100644 source/dnode/mgmt/container/src/dndObj.c diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index d2bf206bb2..8d19ce23df 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -41,7 +41,7 @@ void dndCleanup(); typedef struct { int32_t numOfSupportVnodes; uint16_t serverPort; - char dataDir[TSDB_FILENAME_LEN]; + char dataDir[PATH_MAX]; char localEp[TSDB_EP_LEN]; char localFqdn[TSDB_FQDN_LEN]; char firstEp[TSDB_EP_LEN]; diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index cbcebb2bff..8acefc22b0 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -104,6 +104,9 @@ typedef struct SMgmtFp { typedef struct SMgmtWrapper { const char *name; char *path; + int32_t refCount; + bool deployed; + bool dropped; bool required; EProcType procType; SProcObj *pProc; diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 0d450e3625..29b0ed613c 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -37,12 +37,19 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); TdFilePtr dndCheckRunning(char *dataDir); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); +// dndMsg.c +void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); + // dndNode.c +bool dndRequireNode(SMgmtWrapper *pWrapper); +int32_t dndOpenNode(SMgmtWrapper *pWrapper); +void dndCloseNode(SMgmtWrapper *pWrapper); +int32_t dndRun(SDnode *pDnode); + +// dndObj.c SDnode *dndCreate(const SDnodeOpt *pOption); void dndClose(SDnode *pDnode); -int32_t dndRun(SDnode *pDnode); void dndHandleEvent(SDnode *pDnode, EDndEvent event); -void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet); // dndTransport.c int32_t dndInitServer(SDnode *pDnode); diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index 1067301615..55f1ae2f80 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -16,6 +16,77 @@ #define _DEFAULT_SOURCE #include "dndInt.h" + + +static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { + SRpcConnInfo connInfo = {0}; + if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle); + return -1; + } + + memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); + pMsg->rpcMsg = *pRpc; + + return 0; +} + +void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { + if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { + dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet); + } + + int32_t code = -1; + SNodeMsg *pMsg = NULL; + + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; + if (msgFp == NULL) { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + goto _OVER; + } + + pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + if (pMsg == NULL) { + goto _OVER; + } + + if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) { + goto _OVER; + } + + dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); + + if (pWrapper->procType == PROC_SINGLE) { + code = (*msgFp)(pWrapper->pMgmt, pMsg); + } else if (pWrapper->procType == PROC_PARENT) { + code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); + } else { + terrno = TSDB_CODE_MEMORY_CORRUPTED; + dError("msg:%p, won't be processed for it is child process", pMsg); + } + +_OVER: + + if (code == 0) { + if (pWrapper->procType == PROC_PARENT) { + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); + } + } else { + dError("msg:%p, failed to process since %s", pMsg, terrstr()); + bool isReq = (pRpc->msgType & 1U); + if (isReq) { + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + dndSendRsp(pWrapper, &rsp); + } + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); + } +} + static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = NULL; switch (pMsg->rpcMsg.msgType) { diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 6c972e96ba..b73a536fb4 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -25,7 +25,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) { taosInitLog(logname, 1); } -static bool dndRequireNode(SMgmtWrapper *pMgmt) { +bool dndRequireNode(SMgmtWrapper *pMgmt) { bool required = (*pMgmt->fp.requiredFp)(pMgmt); if (!required) { dDebug("node:%s, no need to start", pMgmt->name); @@ -35,9 +35,9 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) { return required; } -static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); } +int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); } -static void dndCloseNode(SMgmtWrapper *pWrapper) { +void dndCloseNode(SMgmtWrapper *pWrapper) { if (pWrapper->required) { (*pWrapper->fp.closeFp)(pWrapper); pWrapper->required = false; @@ -48,137 +48,6 @@ static void dndCloseNode(SMgmtWrapper *pWrapper) { } } -static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { - pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; - pDnode->serverPort = pOption->serverPort; - pDnode->dataDir = strdup(pOption->dataDir); - pDnode->localEp = strdup(pOption->localEp); - pDnode->localFqdn = strdup(pOption->localFqdn); - pDnode->firstEp = strdup(pOption->firstEp); - pDnode->secondEp = strdup(pOption->secondEp); - pDnode->pDisks = pOption->pDisks; - pDnode->numOfDisks = pOption->numOfDisks; - pDnode->rebootTime = taosGetTimestampMs(); - - if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || - pDnode->secondEp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - return 0; -} - -static void dndClearMemory(SDnode *pDnode) { - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; - tfree(pMgmt->path); - } - if (pDnode->pLockFile != NULL) { - taosUnLockFile(pDnode->pLockFile); - taosCloseFile(&pDnode->pLockFile); - pDnode->pLockFile = NULL; - } - tfree(pDnode->localEp); - tfree(pDnode->localFqdn); - tfree(pDnode->firstEp); - tfree(pDnode->secondEp); - tfree(pDnode->dataDir); - free(pDnode); - dDebug("dnode object memory is cleared, data:%p", pDnode); -} - -SDnode *dndCreate(const SDnodeOpt *pOption) { - dInfo("start to create dnode object"); - int32_t code = -1; - char path[PATH_MAX + 100]; - SDnode *pDnode = NULL; - - pDnode = calloc(1, sizeof(SDnode)); - if (pDnode == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - if (dndInitMemory(pDnode, pOption) != 0) { - goto _OVER; - } - - dndSetStatus(pDnode, DND_STAT_INIT); - pDnode->pLockFile = dndCheckRunning(pDnode->dataDir); - if (pDnode->pLockFile == NULL) { - goto _OVER; - } - - if (dndInitServer(pDnode) != 0) { - dError("failed to init trans server since %s", terrstr()); - goto _OVER; - } - - if (dndInitClient(pDnode) != 0) { - dError("failed to init trans client since %s", terrstr()); - goto _OVER; - } - - dmGetMgmtFp(&pDnode->wrappers[DNODE]); - mmGetMgmtFp(&pDnode->wrappers[MNODE]); - vmGetMgmtFp(&pDnode->wrappers[VNODES]); - qmGetMgmtFp(&pDnode->wrappers[QNODE]); - smGetMgmtFp(&pDnode->wrappers[SNODE]); - bmGetMgmtFp(&pDnode->wrappers[BNODE]); - - if (dndInitMsgHandle(pDnode) != 0) { - goto _OVER; - } - - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); - pWrapper->path = strdup(path); - pWrapper->pDnode = pDnode; - if (pWrapper->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - pWrapper->procType = PROC_SINGLE; - } - - code = 0; - -_OVER: - if (code != 0 && pDnode) { - dndClearMemory(pDnode); - dError("failed to create dnode object since %s", terrstr()); - } else { - dInfo("dnode object is created, data:%p", pDnode); - } - - return pDnode; -} - -void dndClose(SDnode *pDnode) { - if (pDnode == NULL) return; - - if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { - dError("dnode is shutting down, data:%p", pDnode); - return; - } - - dInfo("start to close dnode, data:%p", pDnode); - dndSetStatus(pDnode, DND_STAT_STOPPED); - - dndCleanupServer(pDnode); - dndCleanupClient(pDnode); - - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - dndCloseNode(pWrapper); - } - - dndClearMemory(pDnode); - dInfo("dnode object is closed, data:%p", pDnode); -} - static int32_t dndRunInSingleProcess(SDnode *pDnode) { dInfo("dnode run in single process mode"); @@ -350,77 +219,3 @@ int32_t dndRun(SDnode *pDnode) { return 0; } - -void dndHandleEvent(SDnode *pDnode, EDndEvent event) { - dInfo("dnode object receive event %d, data:%p", event, pDnode); - pDnode->event = event; -} - -static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { - SRpcConnInfo connInfo = {0}; - if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { - terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle); - return -1; - } - - memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); - pMsg->rpcMsg = *pRpc; - - return 0; -} - -void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { - if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { - dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet); - } - - int32_t code = -1; - SNodeMsg *pMsg = NULL; - - NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - if (msgFp == NULL) { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - goto _OVER; - } - - pMsg = taosAllocateQitem(sizeof(SNodeMsg)); - if (pMsg == NULL) { - goto _OVER; - } - - if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) { - goto _OVER; - } - - dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); - - if (pWrapper->procType == PROC_SINGLE) { - code = (*msgFp)(pWrapper->pMgmt, pMsg); - } else if (pWrapper->procType == PROC_PARENT) { - code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); - } else { - terrno = TSDB_CODE_MEMORY_CORRUPTED; - dError("msg:%p, won't be processed for it is child process", pMsg); - } - -_OVER: - - if (code == 0) { - if (pWrapper->procType == PROC_PARENT) { - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pRpc->pCont); - } - } else { - dError("msg:%p, failed to process since %s", pMsg, terrstr()); - bool isReq = (pRpc->msgType & 1U); - if (isReq) { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - dndSendRsp(pWrapper, &rsp); - } - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pRpc->pCont); - } -} diff --git a/source/dnode/mgmt/container/src/dndObj.c b/source/dnode/mgmt/container/src/dndObj.c new file mode 100644 index 0000000000..4cd0300e7e --- /dev/null +++ b/source/dnode/mgmt/container/src/dndObj.c @@ -0,0 +1,138 @@ +#define _DEFAULT_SOURCE +#include "dndInt.h" + +static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { + pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; + pDnode->serverPort = pOption->serverPort; + pDnode->dataDir = strdup(pOption->dataDir); + pDnode->localEp = strdup(pOption->localEp); + pDnode->localFqdn = strdup(pOption->localFqdn); + pDnode->firstEp = strdup(pOption->firstEp); + pDnode->secondEp = strdup(pOption->secondEp); + pDnode->pDisks = pOption->pDisks; + pDnode->numOfDisks = pOption->numOfDisks; + pDnode->rebootTime = taosGetTimestampMs(); + + if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || + pDnode->secondEp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + +static void dndClearMemory(SDnode *pDnode) { + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; + tfree(pMgmt->path); + } + if (pDnode->pLockFile != NULL) { + taosUnLockFile(pDnode->pLockFile); + taosCloseFile(&pDnode->pLockFile); + pDnode->pLockFile = NULL; + } + tfree(pDnode->localEp); + tfree(pDnode->localFqdn); + tfree(pDnode->firstEp); + tfree(pDnode->secondEp); + tfree(pDnode->dataDir); + free(pDnode); + dDebug("dnode object memory is cleared, data:%p", pDnode); +} + +SDnode *dndCreate(const SDnodeOpt *pOption) { + dInfo("start to create dnode object"); + int32_t code = -1; + char path[PATH_MAX]; + SDnode *pDnode = NULL; + + pDnode = calloc(1, sizeof(SDnode)); + if (pDnode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + if (dndInitMemory(pDnode, pOption) != 0) { + goto _OVER; + } + + dndSetStatus(pDnode, DND_STAT_INIT); + pDnode->pLockFile = dndCheckRunning(pDnode->dataDir); + if (pDnode->pLockFile == NULL) { + goto _OVER; + } + + if (dndInitServer(pDnode) != 0) { + dError("failed to init trans server since %s", terrstr()); + goto _OVER; + } + + if (dndInitClient(pDnode) != 0) { + dError("failed to init trans client since %s", terrstr()); + goto _OVER; + } + + dmGetMgmtFp(&pDnode->wrappers[DNODE]); + mmGetMgmtFp(&pDnode->wrappers[MNODE]); + vmGetMgmtFp(&pDnode->wrappers[VNODES]); + qmGetMgmtFp(&pDnode->wrappers[QNODE]); + smGetMgmtFp(&pDnode->wrappers[SNODE]); + bmGetMgmtFp(&pDnode->wrappers[BNODE]); + + if (dndInitMsgHandle(pDnode) != 0) { + goto _OVER; + } + + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); + pWrapper->path = strdup(path); + pWrapper->pDnode = pDnode; + if (pWrapper->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + pWrapper->procType = PROC_SINGLE; + } + + code = 0; + +_OVER: + if (code != 0 && pDnode) { + dndClearMemory(pDnode); + dError("failed to create dnode object since %s", terrstr()); + } else { + dInfo("dnode object is created, data:%p", pDnode); + } + + return pDnode; +} + +void dndClose(SDnode *pDnode) { + if (pDnode == NULL) return; + + if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { + dError("dnode is shutting down, data:%p", pDnode); + return; + } + + dInfo("start to close dnode, data:%p", pDnode); + dndSetStatus(pDnode, DND_STAT_STOPPED); + + dndCleanupServer(pDnode); + dndCleanupClient(pDnode); + + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + dndCloseNode(pWrapper); + } + + dndClearMemory(pDnode); + dInfo("dnode object is closed, data:%p", pDnode); +} + +void dndHandleEvent(SDnode *pDnode, EDndEvent event) { + dInfo("dnode object receive event %d, data:%p", event, pDnode); + pDnode->event = event; +}