From 5bd37b75dc2230807d59e6250a42f7c73269667e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Mar 2022 15:57:12 +0800 Subject: [PATCH 1/4] shm --- source/dnode/mgmt/dm/src/dmFile.c | 2 +- source/dnode/mgmt/dm/src/dmInt.c | 14 +++++++++++ source/dnode/mgmt/main/inc/dnd.h | 6 ++++- source/dnode/mgmt/main/inc/dndInt.h | 4 --- source/dnode/mgmt/main/src/dndExec.c | 22 +++++------------ source/dnode/mgmt/main/src/dndInt.c | 4 +-- source/dnode/mgmt/main/src/dndObj.c | 37 +++++++++------------------- 7 files changed, 40 insertions(+), 49 deletions(-) diff --git a/source/dnode/mgmt/dm/src/dmFile.c b/source/dnode/mgmt/dm/src/dmFile.c index d44b1222a3..444f18e6e0 100644 --- a/source/dnode/mgmt/dm/src/dmFile.c +++ b/source/dnode/mgmt/dm/src/dmFile.c @@ -130,7 +130,7 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) { } code = 0; - dInfo("succcessed to read file %s", file); + dDebug("succcessed to read file %s", file); dmPrintDnodes(pMgmt); PRASE_DNODE_OVER: diff --git a/source/dnode/mgmt/dm/src/dmInt.c b/source/dnode/mgmt/dm/src/dmInt.c index b729888a72..3c5f394d5e 100644 --- a/source/dnode/mgmt/dm/src/dmInt.c +++ b/source/dnode/mgmt/dm/src/dmInt.c @@ -112,6 +112,16 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { return -1; } + if (dndInitServer(pDnode) != 0) { + dError("failed to init trans server since %s", terrstr()); + return -1; + } + + if (dndInitClient(pDnode) != 0) { + dError("failed to init trans client since %s", terrstr()); + return -1; + } + pWrapper->pMgmt = pMgmt; dInfo("dnode-mgmt is initialized"); return 0; @@ -122,6 +132,7 @@ void dmCleanup(SMgmtWrapper *pWrapper) { if (pMgmt == NULL) return; dInfo("dnode-mgmt start to clean up"); + SDnode *pDnode = pMgmt->pDnode; dmStopWorker(pMgmt); taosWLockLatch(&pMgmt->latch); @@ -140,6 +151,9 @@ void dmCleanup(SMgmtWrapper *pWrapper) { taosMemoryFree(pMgmt); pWrapper->pMgmt = NULL; + dndCleanupServer(pDnode); + dndCleanupClient(pDnode); + dInfo("dnode-mgmt is cleaned up"); } diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index b416ee4f7a..d228194237 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -141,13 +141,17 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp no void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dndSendMonitorReport(SDnode *pDnode); +int32_t dndInitServer(SDnode *pDnode); +void dndCleanupServer(SDnode *pDnode); +int32_t dndInitClient(SDnode *pDnode); +void dndCleanupClient(SDnode *pDnode); +int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); -int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed); diff --git a/source/dnode/mgmt/main/inc/dndInt.h b/source/dnode/mgmt/main/inc/dndInt.h index 56782f872b..27f716f823 100644 --- a/source/dnode/mgmt/main/inc/dndInt.h +++ b/source/dnode/mgmt/main/inc/dndInt.h @@ -50,10 +50,6 @@ void dndClose(SDnode *pDnode); void dndHandleEvent(SDnode *pDnode, EDndEvent event); // dndTransport.c -int32_t dndInitServer(SDnode *pDnode); -void dndCleanupServer(SDnode *pDnode); -int32_t dndInitClient(SDnode *pDnode); -void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index c41d4f28e4..b289fe91c6 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -16,15 +16,6 @@ #define _DEFAULT_SOURCE #include "dndInt.h" -static void dndResetLog(SMgmtWrapper *pMgmt) { - char logname[24] = {0}; - snprintf(logname, sizeof(logname), "%slog", pMgmt->name); - - dInfo("node:%s, reset log to %s in child process", pMgmt->name, logname); - taosCloseLog(); - taosInitLog(logname, 1); -} - static bool dndRequireNode(SMgmtWrapper *pWrapper) { bool required = false; int32_t code =(*pWrapper->fp.requiredFp)(pWrapper, &required); @@ -71,22 +62,21 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { } static int32_t dndRunInSingleProcess(SDnode *pDnode) { - dInfo("dnode run in single process mode"); + dDebug("dnode run in single process mode"); + SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); + tmsgSetDefaultMsgCb(&msgCb); - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (ENodeType n = DNODE; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; - SMsgCb msgCb = dndCreateMsgcb(pWrapper); - tmsgSetDefaultMsgCb(&msgCb); if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); + dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); return -1; } - dInfo("node:%s, will start in single process", pWrapper->name); pWrapper->procType = PROC_SINGLE; if (dndOpenNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); @@ -215,7 +205,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { if (taosProcIsChild(pProc)) { dInfo("node:%s, will start in child process", pWrapper->name); pWrapper->procType = PROC_CHILD; - dndResetLog(pWrapper); + // dndResetLog(pWrapper); dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); dndClearNodesExecpt(pDnode, n); diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 7dde3561fb..8792147822 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -41,7 +41,7 @@ int32_t dndInit() { return -1; } - dDebug("dnode env is initialized"); + dInfo("dnode env is initialized"); return 0; } @@ -55,7 +55,7 @@ void dndCleanup() { monCleanup(); walCleanUp(); taosStopCacheRefreshWorker(); - dDebug("dnode env is cleaned up"); + dInfo("dnode env is cleaned up"); } void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index 99dc782a9b..6afe0c5b5f 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -53,7 +53,7 @@ static void dndClearVars(SDnode *pDnode) { } SDnode *dndCreate(const SDnodeOpt *pOption) { - dInfo("start to create dnode object"); + dDebug("start to create dnode object"); int32_t code = -1; char path[PATH_MAX] = {0}; SDnode *pDnode = NULL; @@ -77,25 +77,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { smGetMgmtFp(&pDnode->wrappers[SNODE]); bmGetMgmtFp(&pDnode->wrappers[BNODE]); - if (dndOpenRuntimeFile(pDnode) != 0) { - dError("failed to open runtime file since %s", terrstr()); - goto _OVER; - } - - if (dndInitServer(pDnode) != 0) { - dError("failed to init trans server since %s", terrstr()); - goto _OVER; - } - - if (dndInitClient(pDnode) != 0) { - dError("failed to init trans client since %s", terrstr()); - goto _OVER; - } - - if (dndInitMsgHandle(pDnode) != 0) { - goto _OVER; - } - for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); @@ -110,6 +91,17 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { taosInitRWLatch(&pWrapper->latch); } + if (dndInitMsgHandle(pDnode) != 0) { + dError("failed to msg handles since %s", terrstr()); + goto _OVER; + } + + if (dndOpenRuntimeFile(pDnode) != 0) { + dError("failed to open runtime file since %s", terrstr()); + goto _OVER; + } + + dInfo("dnode object is created, data:%p", pDnode); code = 0; _OVER: @@ -117,8 +109,6 @@ _OVER: dndClearVars(pDnode); pDnode = NULL; dError("failed to create dnode object since %s", terrstr()); - } else { - dInfo("dnode object is created, data:%p", pDnode); } return pDnode; @@ -135,9 +125,6 @@ void dndClose(SDnode *pDnode) { dInfo("start to close dnode, data:%p", pDnode); dndSetStatus(pDnode, DND_STAT_STOPPED); - dndCleanupServer(pDnode); - dndCleanupClient(pDnode); - for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; dndCloseNode(pWrapper); From 6ece0d9ac1cd7b0060d95a1609e63398bd00da2f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Mar 2022 16:03:21 +0800 Subject: [PATCH 2/4] shm --- include/util/tprocess.h | 4 +- source/dnode/mgmt/main/src/dndExec.c | 194 +++++++++++++-------------- source/dnode/mgmt/main/src/dndObj.c | 3 + source/util/src/tprocess.c | 4 +- 4 files changed, 102 insertions(+), 103 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 51ce0243b7..090762b340 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -32,19 +32,17 @@ typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void ProcFuncType ftype); typedef struct { - int32_t childQueueSize; ProcConsumeFp childConsumeFp; ProcMallocFp childMallocHeadFp; ProcFreeFp childFreeHeadFp; ProcMallocFp childMallocBodyFp; ProcFreeFp childFreeBodyFp; - int32_t parentQueueSize; ProcConsumeFp parentConsumeFp; ProcMallocFp parentdMallocHeadFp; ProcFreeFp parentFreeHeadFp; ProcMallocFp parentMallocBodyFp; ProcFreeFp parentFreeBodyFp; - bool testFlag; + SShm shm; void *pParent; const char *name; } SProcCfg; diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index b289fe91c6..bdce489f76 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -28,14 +28,18 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) { } int32_t dndOpenNode(SMgmtWrapper *pWrapper) { - int32_t code = (*pWrapper->fp.openFp)(pWrapper); - if (code != 0) { - dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); + if (taosMkDir(pWrapper->path) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); return -1; - } else { - dDebug("node:%s, has been opened", pWrapper->name); } + if ((*pWrapper->fp.openFp)(pWrapper) != 0) { + dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); + return -1; + } + + dDebug("node:%s, has been opened", pWrapper->name); pWrapper->deployed = true; return 0; } @@ -62,22 +66,13 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { } static int32_t dndRunInSingleProcess(SDnode *pDnode) { - dDebug("dnode run in single process mode"); - SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); - tmsgSetDefaultMsgCb(&msgCb); + dInfo("dnode start to run in single process"); for (ENodeType n = DNODE; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; - if (taosMkDir(pWrapper->path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); - return -1; - } - - pWrapper->procType = PROC_SINGLE; if (dndOpenNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; @@ -96,18 +91,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { } } + dInfo("dnode running in single process"); return 0; } -static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { - // dndCleanupServer(pDnode); - for (ENodeType n = 0; n < NODE_MAX; ++n) { - if (except == n) continue; - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - pWrapper->required = false; - } -} - static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; @@ -153,115 +140,126 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t taosMemoryFree(pMsg); } -static int32_t dndRunInMultiProcess(SDnode *pDnode) { - dInfo("dnode run in multi process mode"); +static int32_t dndRunInParentProcess(SDnode *pDnode) { + dInfo("dnode start to run in parent process"); + SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE]; + if (dndOpenNode(pDWrapper) != 0) { + dError("node:%s, failed to start since %s", pDWrapper->name, terrstr()); + return -1; + } - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; - SMsgCb msgCb = dndCreateMsgcb(pWrapper); - tmsgSetDefaultMsgCb(&msgCb); - - if (taosMkDir(pWrapper->path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); + int64_t shmsize = 1024 * 1024 * 2; // size will be a configuration item + if (taosCreateShm(&pWrapper->shm, shmsize) != 0) { + terrno = TAOS_SYSTEM_ERROR(terrno); + dError("node:%s, failed to create shm size:%" PRId64 " since %s", pWrapper->name, shmsize, terrstr()); return -1; } - if (n == DNODE) { - dInfo("node:%s, will start in parent process", pWrapper->name); - pWrapper->procType = PROC_SINGLE; - if (dndOpenNode(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; - } - continue; - } + SProcCfg cfg = {.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->shm, + .pParent = pWrapper, + .name = pWrapper->name}; - SProcCfg cfg = {.childQueueSize = 1024 * 1024 * 2, // size will be a configuration item - .childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentQueueSize = 1024 * 1024 * 2, // size will be a configuration item - .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, - .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .pParent = pWrapper, - .name = pWrapper->name}; - SProcObj *pProc = taosProcInit(&cfg); - if (pProc == NULL) { - dError("node:%s, failed to fork since %s", pWrapper->name, terrstr()); + pWrapper->pProc = taosProcInit(&cfg); + if (pWrapper->pProc == NULL) { + dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); return -1; } + } - pWrapper->pProc = pProc; + if (dndWriteRuntimeFile(pDnode) != 0) { + dError("failed to write runtime file since %s", terrstr()); + return -1; + } - if (taosProcIsChild(pProc)) { - dInfo("node:%s, will start in child process", pWrapper->name); - pWrapper->procType = PROC_CHILD; - // dndResetLog(pWrapper); + for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; - dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); - dndClearNodesExecpt(pDnode, n); + dInfo("node:%s, will not start in parent process", pWrapper->name); + // exec new node - dInfo("node:%s, will be initialized in child process", pWrapper->name); - if (dndOpenNode(pWrapper) != 0) { - dInfo("node:%s, failed to init in child process since %s", pWrapper->name, terrstr()); - return -1; - } - - if (taosProcRun(pProc) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); - return -1; - } - break; - } else { - dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc)); - pWrapper->procType = PROC_PARENT; - if (taosProcRun(pProc) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); - return -1; - } + pWrapper->procType = PROC_PARENT; + if (taosProcRun(pWrapper->pProc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; } } dndSetStatus(pDnode, DND_STAT_RUNNING); - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pWrapper->fp.startFp == NULL) continue; - if (pWrapper->procType == PROC_PARENT && n != DNODE) continue; - if (pWrapper->procType == PROC_CHILD && n == DNODE) continue; - if ((*pWrapper->fp.startFp)(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; - } + if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) { + dError("node:%s, failed to start since %s", pDWrapper->name, terrstr()); + return -1; } + dInfo("dnode running in parent process"); return 0; } -int32_t dndRun(SDnode *pDnode) { +static int32_t dndRunInChildProcess(SDnode *pDnode) { + dInfo("dnode start to run in child process"); + + SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + if (dndOpenNode(pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; + } + + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->shm, + .pParent = pWrapper, + .name = pWrapper->name}; + + pWrapper->pProc = taosProcInit(&cfg); + if (pWrapper->pProc == NULL) { + dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); + return -1; + } + + pWrapper->procType = PROC_CHILD; + if (taosProcRun(pWrapper->pProc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; + } + + dInfo("dnode running in child process"); + return 0; +} + +int32_t dndRun(SDnode * pDnode) { if (!tsMultiProcess) { if (dndRunInSingleProcess(pDnode) != 0) { - dError("failed to run dnode in single process mode since %s", terrstr()); + dError("failed to run dnode since %s", terrstr()); + return -1; + } + } else if (pDnode->ntype == DNODE) { + if (dndRunInParentProcess(pDnode) != 0) { + dError("failed to run dnode in parent process since %s", terrstr()); return -1; } } else { - if (dndRunInMultiProcess(pDnode) != 0) { - dError("failed to run dnode in multi process mode since %s", terrstr()); + if (dndRunInChildProcess(pDnode) != 0) { + dError("failed to run dnode in child process since %s", terrstr()); return -1; } } dndReportStartup(pDnode, "TDengine", "initialized successfully"); + dInfo("TDengine initialized successfully"); while (1) { if (pDnode->event == DND_EVENT_STOP) { diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index 6afe0c5b5f..b7e91f3039 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -101,6 +101,9 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { goto _OVER; } + SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); + tmsgSetDefaultMsgCb(&msgCb); + dInfo("dnode object is created, data:%p", pDnode); code = 0; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 86cec2d271..7afbe56587 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -359,8 +359,8 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { } pProc->name = pCfg->name; - pProc->pChildQueue = taosProcInitQueue(pCfg->childQueueSize); - pProc->pParentQueue = taosProcInitQueue(pCfg->parentQueueSize); + pProc->pChildQueue = taosProcInitQueue(pCfg->shm.size / 2); + pProc->pParentQueue = taosProcInitQueue(pCfg->shm.size / 2); if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcCleanupQueue(pProc->pChildQueue); taosMemoryFree(pProc); From ae182cd75a5a10f9e142720c86c24c3b02cda755 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Mar 2022 16:52:14 +0800 Subject: [PATCH 3/4] shm --- include/os/osShm.h | 4 +- source/dnode/mgmt/main/inc/dnd.h | 2 +- source/dnode/mgmt/main/inc/dndInt.h | 6 +-- source/dnode/mgmt/main/src/dndExec.c | 14 ++++- source/dnode/mgmt/main/src/dndFile.c | 79 +++++++++++++++------------- source/dnode/mgmt/main/src/dndObj.c | 16 ++++-- source/os/src/osShm.c | 4 +- 7 files changed, 76 insertions(+), 49 deletions(-) diff --git a/include/os/osShm.h b/include/os/osShm.h index 82ee2339f2..a5d6716d0d 100644 --- a/include/os/osShm.h +++ b/include/os/osShm.h @@ -22,11 +22,11 @@ extern "C" { typedef struct { int32_t id; - int32_t size; + int64_t size; void* ptr; } SShm; -int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ; +int32_t taosCreateShm(SShm *pShm, int64_t shmsize) ; void taosDropShm(SShm *pShm); int32_t taosAttachShm(SShm *pShm); void taosDetachShm(SShm *pShm); diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index d228194237..294413a54d 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -128,7 +128,7 @@ typedef struct SDnode { EDndStatus status; EDndEvent event; SStartupReq startup; - TdFilePtr runtimeFile; + TdFilePtr lockfile; STransMgmt trans; SMgmtWrapper wrappers[NODE_MAX]; } SDnode; diff --git a/source/dnode/mgmt/main/inc/dndInt.h b/source/dnode/mgmt/main/inc/dndInt.h index 27f716f823..612d35d513 100644 --- a/source/dnode/mgmt/main/inc/dndInt.h +++ b/source/dnode/mgmt/main/inc/dndInt.h @@ -54,9 +54,9 @@ int32_t dndInitMsgHandle(SDnode *pDnode); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); // dndFile.c -int32_t dndOpenRuntimeFile(SDnode *pDnode); -int32_t dndWriteRuntimeFile(SDnode *pDnode); -void dndCloseRuntimeFile(SDnode *pDnode); +TdFilePtr dndCheckRunning(const char *dataDir); +int32_t dndReadShmFile(SDnode *pDnode); +int32_t dndWriteShmFile(SDnode *pDnode); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index bdce489f76..d4dfae2d69 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -160,7 +160,12 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { return -1; } - SProcCfg cfg = {.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, @@ -176,7 +181,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { } } - if (dndWriteRuntimeFile(pDnode) != 0) { + if (dndWriteShmFile(pDnode) != 0) { dError("failed to write runtime file since %s", terrstr()); return -1; } @@ -220,6 +225,11 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = pWrapper->shm, .pParent = pWrapper, .name = pWrapper->name}; diff --git a/source/dnode/mgmt/main/src/dndFile.c b/source/dnode/mgmt/main/src/dndFile.c index 51d4ff3902..bcfb90af13 100644 --- a/source/dnode/mgmt/main/src/dndFile.c +++ b/source/dnode/mgmt/main/src/dndFile.c @@ -117,7 +117,30 @@ _OVER: return code; } -int32_t dndOpenRuntimeFile(SDnode *pDnode) { +TdFilePtr dndCheckRunning(const char *dataDir) { + char filepath[PATH_MAX] = {0}; + snprintf(filepath, sizeof(filepath), "%s%s.running", dataDir, TD_DIRSEP); + + TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to lock file:%s since %s", filepath, terrstr()); + return NULL; + } + + int32_t ret = taosLockFile(pFile); + if (ret != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to lock file:%s since %s", filepath, terrstr()); + taosCloseFile(&pFile); + return NULL; + } + + dDebug("file:%s is locked", filepath); + return pFile; +} + +int32_t dndReadShmFile(SDnode *pDnode) { int32_t code = -1; char itemName[24] = {0}; char content[MAXLEN + 1] = {0}; @@ -125,17 +148,11 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { cJSON *root = NULL; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s.running", pDnode->dataDir, TD_DIRSEP); - pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + snprintf(file, sizeof(file), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP); + pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to open file:%s since %s", file, terrstr()); - goto _OVER; - } - - if (taosLockFile(pFile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to lock file:%s since %s", file, terrstr()); + dDebug("file %s not exist", file); + code = 0; goto _OVER; } @@ -150,14 +167,14 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); cJSON *shmid = cJSON_GetObjectItem(root, itemName); - if (shmid && shmid->type == cJSON_Number) { - pDnode->wrappers[ntype].shm.id = shmid->valueint; + if (shmid && shmid->type == cJSON_String) { + pDnode->wrappers[ntype].shm.id = atoi(shmid->valuestring); } snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype)); cJSON *shmsize = cJSON_GetObjectItem(root, itemName); - if (shmsize && shmsize->type == cJSON_Number) { - pDnode->wrappers[ntype].shm.size = shmsize->valueint; + if (shmsize && shmsize->type == cJSON_String) { + pDnode->wrappers[ntype].shm.size = atoll(shmsize->valuestring); } } } @@ -166,7 +183,7 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; if (pWrapper->shm.id > 0) { - dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); + dDebug("shmid:%d, is closed, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size); taosDropShm(&pWrapper->shm); } } @@ -177,7 +194,7 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr()); goto _OVER; } - dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size); + dDebug("shmid:%d, is attached, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size); } dDebug("successed to open %s", file); @@ -185,16 +202,12 @@ int32_t dndOpenRuntimeFile(SDnode *pDnode) { _OVER: if (root != NULL) cJSON_Delete(root); - if (code != 0) { - if (pFile != NULL) taosCloseFile(&pFile); - } else { - pDnode->runtimeFile = pFile; - } + if (pFile != NULL) taosCloseFile(&pFile); return code; } -int32_t dndWriteRuntimeFile(SDnode *pDnode) { +int32_t dndWriteShmFile(SDnode *pDnode) { int32_t code = -1; int32_t len = 0; char content[MAXLEN + 1] = {0}; @@ -202,8 +215,8 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) { char realfile[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s.running.bak", pDnode->dataDir, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%s.running", pDnode->dataDir, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->dataDir, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -215,11 +228,13 @@ int32_t dndWriteRuntimeFile(SDnode *pDnode) { len += snprintf(content + len, MAXLEN - len, "{\n"); for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; - len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": \"%d\",\n", dndNodeProcStr(ntype), pWrapper->shm.id); if (ntype == NODE_MAX - 1) { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d\n", dndNodeProcStr(ntype), pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\"\n", dndNodeProcStr(ntype), + pWrapper->shm.size); } else { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\",\n", dndNodeProcStr(ntype), + pWrapper->shm.size); } } len += snprintf(content + len, MAXLEN - len, "}\n"); @@ -254,11 +269,3 @@ _OVER: return code; } - -void dndCloseRuntimeFile(SDnode *pDnode) { - if (pDnode->runtimeFile) { - taosUnLockFile(pDnode->runtimeFile); - taosCloseFile(&pDnode->runtimeFile); - pDnode->runtimeFile = NULL; - } -} \ No newline at end of file diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index b7e91f3039..91f2cb233b 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -34,6 +34,12 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + + pDnode->lockfile = dndCheckRunning(pDnode->dataDir); + if (pDnode->lockfile == NULL) { + return -1; + } + return 0; } @@ -42,7 +48,11 @@ static void dndClearVars(SDnode *pDnode) { SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; taosMemoryFreeClear(pMgmt->path); } - dndCloseRuntimeFile(pDnode); + if (pDnode->lockfile != NULL) { + taosUnLockFile(pDnode->lockfile); + taosCloseFile(&pDnode->lockfile); + pDnode->lockfile = NULL; + } taosMemoryFreeClear(pDnode->localEp); taosMemoryFreeClear(pDnode->localFqdn); taosMemoryFreeClear(pDnode->firstEp); @@ -96,8 +106,8 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { goto _OVER; } - if (dndOpenRuntimeFile(pDnode) != 0) { - dError("failed to open runtime file since %s", terrstr()); + if (dndReadShmFile(pDnode) != 0) { + dError("failed to read shm file since %s", terrstr()); goto _OVER; } diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index e7a22c3da1..cb80aeb5f3 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -17,8 +17,8 @@ #define _DEFAULT_SOURCE #include "os.h" -int32_t taosCreateShm(SShm* pShm, int32_t shmsize) { - int32_t shmid = shmget(IPC_PRIVATE, shmsize, IPC_CREAT | 0600); +int32_t taosCreateShm(SShm* pShm, int64_t shmsize) { + int32_t shmid = shmget(IPC_PRIVATE, (size_t)shmsize, IPC_CREAT | 0600); if (shmid < 0) { return -1; } From 79936567c5cc20dda71f43babf7b5c8975237b89 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Mar 2022 19:54:41 +0800 Subject: [PATCH 4/4] shm --- include/os/osShm.h | 5 +- include/util/tprocess.h | 6 +- source/dnode/mgmt/main/exe/dndMain.c | 4 + source/dnode/mgmt/main/src/dndExec.c | 10 +- source/dnode/mgmt/main/src/dndFile.c | 30 +-- source/dnode/mgmt/main/src/dndObj.c | 1 + source/os/src/osShm.c | 25 +- source/util/src/tprocess.c | 372 ++++++++++++--------------- 8 files changed, 204 insertions(+), 249 deletions(-) diff --git a/include/os/osShm.h b/include/os/osShm.h index a5d6716d0d..d26a99e277 100644 --- a/include/os/osShm.h +++ b/include/os/osShm.h @@ -22,14 +22,13 @@ extern "C" { typedef struct { int32_t id; - int64_t size; + int32_t size; void* ptr; } SShm; -int32_t taosCreateShm(SShm *pShm, int64_t shmsize) ; +int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ; void taosDropShm(SShm *pShm); int32_t taosAttachShm(SShm *pShm); -void taosDetachShm(SShm *pShm); #ifdef __cplusplus } diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 090762b340..3a47450eec 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -38,21 +38,19 @@ typedef struct { ProcMallocFp childMallocBodyFp; ProcFreeFp childFreeBodyFp; ProcConsumeFp parentConsumeFp; - ProcMallocFp parentdMallocHeadFp; + ProcMallocFp parentMallocHeadFp; ProcFreeFp parentFreeHeadFp; ProcMallocFp parentMallocBodyFp; ProcFreeFp parentFreeBodyFp; SShm shm; void *pParent; const char *name; + bool isChild; } SProcCfg; SProcObj *taosProcInit(const SProcCfg *pCfg); void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); -void taosProcStop(SProcObj *pProc); -bool taosProcIsChild(SProcObj *pProc); -int32_t taosProcChildId(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, diff --git a/source/dnode/mgmt/main/exe/dndMain.c b/source/dnode/mgmt/main/exe/dndMain.c index 525b26d967..5acab06216 100644 --- a/source/dnode/mgmt/main/exe/dndMain.c +++ b/source/dnode/mgmt/main/exe/dndMain.c @@ -83,6 +83,10 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) { global.generateGrant = true; } else if (strcmp(argv[i], "-n") == 0) { global.ntype = atoi(argv[++i]); + if (global.ntype <= DNODE || global.ntype > NODE_MAX) { + printf("'-n' range is [1-5], default is 0\n"); + return -1; + } } else if (strcmp(argv[i], "-C") == 0) { global.dumpConfig = true; } else if (strcmp(argv[i], "-V") == 0) { diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index d4dfae2d69..d2a203107a 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -166,14 +166,16 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = pWrapper->shm, .pParent = pWrapper, + .isChild = false, .name = pWrapper->name}; + pWrapper->procType = PROC_PARENT; pWrapper->pProc = taosProcInit(&cfg); if (pWrapper->pProc == NULL) { dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); @@ -193,7 +195,6 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { dInfo("node:%s, will not start in parent process", pWrapper->name); // exec new node - pWrapper->procType = PROC_PARENT; if (taosProcRun(pWrapper->pProc) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; @@ -226,21 +227,22 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .shm = pWrapper->shm, .pParent = pWrapper, + .isChild = true, .name = pWrapper->name}; + pWrapper->procType = PROC_CHILD; pWrapper->pProc = taosProcInit(&cfg); if (pWrapper->pProc == NULL) { dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); return -1; } - pWrapper->procType = PROC_CHILD; if (taosProcRun(pWrapper->pProc) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; diff --git a/source/dnode/mgmt/main/src/dndFile.c b/source/dnode/mgmt/main/src/dndFile.c index bcfb90af13..bbd1cd3b92 100644 --- a/source/dnode/mgmt/main/src/dndFile.c +++ b/source/dnode/mgmt/main/src/dndFile.c @@ -167,23 +167,23 @@ int32_t dndReadShmFile(SDnode *pDnode) { for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); cJSON *shmid = cJSON_GetObjectItem(root, itemName); - if (shmid && shmid->type == cJSON_String) { - pDnode->wrappers[ntype].shm.id = atoi(shmid->valuestring); + if (shmid && shmid->type == cJSON_Number) { + pDnode->wrappers[ntype].shm.id = shmid->valueint; } snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype)); cJSON *shmsize = cJSON_GetObjectItem(root, itemName); - if (shmsize && shmsize->type == cJSON_String) { - pDnode->wrappers[ntype].shm.size = atoll(shmsize->valuestring); + if (shmsize && shmsize->type == cJSON_Number) { + pDnode->wrappers[ntype].shm.size = shmsize->valueint; } } } - if (tsMultiProcess || pDnode->ntype == DNODE) { + if (!tsMultiProcess || pDnode->ntype == DNODE) { for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; - if (pWrapper->shm.id > 0) { - dDebug("shmid:%d, is closed, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size); + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + if (pWrapper->shm.id >= 0) { + dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); taosDropShm(&pWrapper->shm); } } @@ -194,7 +194,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr()); goto _OVER; } - dDebug("shmid:%d, is attached, size:%" PRId64, pWrapper->shm.id, pWrapper->shm.size); + dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size); } dDebug("successed to open %s", file); @@ -227,14 +227,12 @@ int32_t dndWriteShmFile(SDnode *pDnode) { len += snprintf(content + len, MAXLEN - len, "{\n"); for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; - len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": \"%d\",\n", dndNodeProcStr(ntype), pWrapper->shm.id); + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); if (ntype == NODE_MAX - 1) { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\"\n", dndNodeProcStr(ntype), - pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d\n", dndNodeProcStr(ntype), pWrapper->shm.size); } else { - len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": \"%" PRId64 "\",\n", dndNodeProcStr(ntype), - pWrapper->shm.size); + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\":%d,\n", dndNodeProcStr(ntype), pWrapper->shm.size); } } len += snprintf(content + len, MAXLEN - len, "}\n"); @@ -259,7 +257,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) { return -1; } - dDebug("successed to write %s", realfile); + dInfo("successed to write %s", realfile); code = 0; _OVER: diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index 91f2cb233b..387efca846 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -91,6 +91,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); pWrapper->path = strdup(path); + pWrapper->shm.id = -1; pWrapper->pDnode = pDnode; if (pWrapper->path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index cb80aeb5f3..74717878a0 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -17,8 +17,10 @@ #define _DEFAULT_SOURCE #include "os.h" -int32_t taosCreateShm(SShm* pShm, int64_t shmsize) { - int32_t shmid = shmget(IPC_PRIVATE, (size_t)shmsize, IPC_CREAT | 0600); +int32_t taosCreateShm(SShm* pShm, int32_t shmsize) { + pShm->id = -1; + + int32_t shmid = shmget(0X95279527, shmsize, IPC_CREAT | 0600); if (shmid < 0) { return -1; } @@ -35,19 +37,19 @@ int32_t taosCreateShm(SShm* pShm, int64_t shmsize) { } void taosDropShm(SShm* pShm) { - if (pShm->id > 0) { + if (pShm->id >= 0) { if (pShm->ptr != NULL) { shmdt(pShm->ptr); } shmctl(pShm->id, IPC_RMID, NULL); } - pShm->id = 0; + pShm->id = -1; pShm->size = 0; pShm->ptr = NULL; } int32_t taosAttachShm(SShm* pShm) { - if (pShm->id > 0 && pShm->size > 0) { + if (pShm->id >= 0) { pShm->ptr = shmat(pShm->id, NULL, 0); if (pShm->ptr != NULL) { return 0; @@ -56,16 +58,3 @@ int32_t taosAttachShm(SShm* pShm) { return -1; } - -void taosDetachShm(SShm* pShm) { - if (pShm->id > 0) { - if (pShm->ptr != NULL) { - shmdt(pShm->ptr); - pShm->ptr = NULL; - } - } - - pShm->id = 0; - pShm->size = 0; - pShm->ptr = NULL; -} diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 7afbe56587..1d565083e1 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -23,34 +23,36 @@ typedef void *(*ProcThreadFp)(void *param); typedef struct SProcQueue { - int32_t head; - int32_t tail; - int32_t total; - int32_t avail; - int32_t items; - char *pBuffer; - ProcMallocFp mallocHeadFp; - ProcFreeFp freeHeadFp; - ProcMallocFp mallocBodyFp; - ProcFreeFp freeBodyFp; - ProcConsumeFp consumeFp; - void *pParent; - tsem_t sem; - TdThreadMutex *mutex; - int32_t mutexShmid; - int32_t bufferShmid; - const char *name; + int32_t head; + int32_t tail; + int32_t total; + int32_t avail; + int32_t items; + char name[8]; + TdThreadMutex mutex; + tsem_t sem; + char pBuffer[]; } SProcQueue; typedef struct SProcObj { - TdThread childThread; - SProcQueue *pChildQueue; - TdThread parentThread; - SProcQueue *pParentQueue; - const char *name; - int32_t pid; - bool isChild; - bool stopFlag; + TdThread thread; + SProcQueue *pChildQueue; + SProcQueue *pParentQueue; + ProcConsumeFp childConsumeFp; + ProcMallocFp childMallocHeadFp; + ProcFreeFp childFreeHeadFp; + ProcMallocFp childMallocBodyFp; + ProcFreeFp childFreeBodyFp; + ProcConsumeFp parentConsumeFp; + ProcMallocFp parentMallocHeadFp; + ProcFreeFp parentFreeHeadFp; + ProcMallocFp parentMallocBodyFp; + ProcFreeFp parentFreeBodyFp; + void *pParent; + const char *name; + int32_t pid; + bool isChild; + bool stopFlag; } SProcObj; static inline int32_t CEIL8(int32_t v) { @@ -58,150 +60,95 @@ static inline int32_t CEIL8(int32_t v) { return c < 8 ? 8 : c; } -static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { - TdThreadMutex *pMutex = NULL; +static int32_t taosProcInitMutex(SProcQueue *pQueue) { TdThreadMutexAttr mattr = {0}; - int32_t shmid = -1; - int32_t code = -1; if (pthread_mutexattr_init(&mattr) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to init mutex while init attr since %s", terrstr()); - goto _OVER; + return -1; } if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) { + pthread_mutexattr_destroy(&mattr); terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to init mutex while set shared since %s", terrstr()); - goto _OVER; + return -1; } - shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), IPC_CREAT | 0600); - if (shmid <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init mutex while shmget since %s", terrstr()); - goto _OVER; - } - - pMutex = (TdThreadMutex *)shmat(shmid, NULL, 0); - if (pMutex == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init mutex while shmat since %s", terrstr()); - goto _OVER; - } - - if (taosThreadMutexInit(pMutex, &mattr) != 0) { + if (taosThreadMutexInit(&pQueue->mutex, &mattr) != 0) { + pthread_mutexattr_destroy(&mattr); terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to init mutex since %s", terrstr()); - goto _OVER; - } - - code = 0; - -_OVER: - if (code != 0) { - if (pMutex != NULL) { - taosThreadMutexDestroy(pMutex); - shmdt(pMutex); - } - if (shmid >= 0) { - shmctl(shmid, IPC_RMID, NULL); - } - } else { - *ppMutex = pMutex; - *pShmid = shmid; + return -1; } pthread_mutexattr_destroy(&mattr); - return code; + return 0; } -static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t shmid) { - if (pMutex != NULL) { - taosThreadMutexDestroy(pMutex); - } - if (shmid >= 0) { - shmctl(shmid, IPC_RMID, NULL); - } -} - -static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) { - int32_t shmid = shmget(IPC_PRIVATE, size, IPC_CREAT | 0600); - if (shmid <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init buffer while shmget since %s", terrstr()); - return -1; - } - - void *shmptr = shmat(shmid, NULL, 0); - if (shmptr == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to init buffer while shmat since %s", terrstr()); - shmctl(shmid, IPC_RMID, NULL); - return -1; - } - - *ppBuffer = shmptr; - return shmid; -} - -static void taosProcDestroyBuffer(void *pBuffer, int32_t shmid) { - if (shmid > 0) { - shmdt(pBuffer); - shmctl(shmid, IPC_RMID, NULL); - } -} - -static SProcQueue *taosProcInitQueue(int32_t size) { - if (size <= 0) size = SHM_DEFAULT_SIZE; - - int32_t bufSize = CEIL8(size); - int32_t headSize = CEIL8(sizeof(SProcQueue)); - - SProcQueue *pQueue = NULL; - int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize); - if (shmId < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - pQueue->bufferShmid = shmId; - - if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); - return NULL; - } - +static int32_t taosProcInitSem(SProcQueue *pQueue) { if (tsem_init(&pQueue->sem, 1, 0) != 0) { - taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid); - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to init sem"); + return -1; + } + + return 0; +} + +static SProcQueue *taosProcInitQueue(const char *name, bool isChild, char *ptr, int32_t size) { + int32_t bufSize = size - CEIL8(sizeof(SProcQueue)); + if (bufSize <= 1024) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) { - tsem_destroy(&pQueue->sem); - taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid); - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); - return NULL; + SProcQueue *pQueue = (SProcQueue *)(ptr); + + if (!isChild) { + if (taosProcInitMutex(pQueue) != 0) { + return NULL; + } + + if (taosProcInitSem(pQueue) != 0) { + return NULL; + } + + tstrncpy(pQueue->name, name, sizeof(pQueue->name)); + pQueue->head = 0; + pQueue->tail = 0; + pQueue->total = bufSize; + pQueue->avail = bufSize; + pQueue->items = 0; } - pQueue->head = 0; - pQueue->tail = 0; - pQueue->total = bufSize; - pQueue->avail = bufSize; - pQueue->items = 0; - pQueue->pBuffer = (char *)pQueue + headSize; return pQueue; } +#if 0 +static void taosProcDestroyMutex(SProcQueue *pQueue) { + if (pQueue->mutex != NULL) { + taosThreadMutexDestroy(pQueue->mutex); + pQueue->mutex = NULL; + } +} + +static void taosProcDestroySem(SProcQueue *pQueue) { + if (pQueue->sem != NULL) { + tsem_destroy(pQueue->sem); + pQueue->sem = NULL; + } + +} + static void taosProcCleanupQueue(SProcQueue *pQueue) { if (pQueue != NULL) { - uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue); - tsem_destroy(&pQueue->sem); - taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid); - taosProcDestroyBuffer(pQueue, pQueue->bufferShmid); + taosProcDestroyMutex(pQueue); + taosProcDestroySem(pQueue); } } +#endif static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, int32_t rawBodyLen, ProcFuncType ftype) { @@ -209,9 +156,9 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; - taosThreadMutexLock(pQueue->mutex); + taosThreadMutexLock(&pQueue->mutex); if (fullLen > pQueue->avail) { - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); terrno = TSDB_CODE_OUT_OF_SHM_MEM; return -1; } @@ -260,7 +207,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t pQueue->avail -= fullLen; pQueue->items++; - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); tsem_post(&pQueue->sem); uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, @@ -268,13 +215,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t return 0; } -static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, - int32_t *pBodyLen, ProcFuncType *pFuncType) { +static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen, + ProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp, + ProcMallocFp mallocBodyFp, ProcFreeFp freeBodyFp) { tsem_wait(&pQueue->sem); - taosThreadMutexLock(pQueue->mutex); + taosThreadMutexLock(&pQueue->mutex); if (pQueue->total - pQueue->avail <= 0) { - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); tsem_post(&pQueue->sem); terrno = TSDB_CODE_OUT_OF_SHM_MEM; return 0; @@ -293,13 +241,13 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea bodyLen = *(int32_t *)(pQueue->pBuffer + 4); } - void *pHead = (*pQueue->mallocHeadFp)(headLen); - void *pBody = (*pQueue->mallocBodyFp)(bodyLen); + void *pHead = (*mallocHeadFp)(headLen); + void *pBody = (*mallocBodyFp)(bodyLen); if (pHead == NULL || pBody == NULL) { - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); tsem_post(&pQueue->sem); - (*pQueue->freeHeadFp)(pHead); - (*pQueue->freeBodyFp)(pBody); + (*freeHeadFp)(pHead); + (*freeBodyFp)(pBody); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -338,7 +286,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea pQueue->avail = pQueue->avail + headLen + bodyLen + 8; pQueue->items--; - taosThreadMutexUnlock(pQueue->mutex); + taosThreadMutexUnlock(&pQueue->mutex); *ppHead = pHead; *ppBody = pBody; @@ -358,65 +306,85 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { return NULL; } + int32_t cstart = 0; + int32_t csize = CEIL8(pCfg->shm.size / 2); + int32_t pstart = csize; + int32_t psize = CEIL8(pCfg->shm.size - pstart); + if (pstart + psize > pCfg->shm.size) { + psize -= 8; + } + pProc->name = pCfg->name; - pProc->pChildQueue = taosProcInitQueue(pCfg->shm.size / 2); - pProc->pParentQueue = taosProcInitQueue(pCfg->shm.size / 2); + pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize); + pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize); if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcCleanupQueue(pProc->pChildQueue); taosMemoryFree(pProc); return NULL; } - pProc->pChildQueue->name = pCfg->name; - pProc->pChildQueue->pParent = pCfg->pParent; - pProc->pChildQueue->mallocHeadFp = pCfg->childMallocHeadFp; - pProc->pChildQueue->freeHeadFp = pCfg->childFreeHeadFp; - pProc->pChildQueue->mallocBodyFp = pCfg->childMallocBodyFp; - pProc->pChildQueue->freeBodyFp = pCfg->childFreeBodyFp; - pProc->pChildQueue->consumeFp = pCfg->childConsumeFp; - pProc->pParentQueue->name = pCfg->name; - pProc->pParentQueue->pParent = pCfg->pParent; - pProc->pParentQueue->mallocHeadFp = pCfg->parentdMallocHeadFp; - pProc->pParentQueue->freeHeadFp = pCfg->parentFreeHeadFp; - pProc->pParentQueue->mallocBodyFp = pCfg->parentMallocBodyFp; - pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp; - pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp; + pProc->name = pCfg->name; + pProc->pParent = pCfg->pParent; + pProc->childMallocHeadFp = pCfg->childMallocHeadFp; + pProc->childFreeHeadFp = pCfg->childFreeHeadFp; + pProc->childMallocBodyFp = pCfg->childMallocBodyFp; + pProc->childFreeBodyFp = pCfg->childFreeBodyFp; + pProc->childConsumeFp = pCfg->childConsumeFp; + pProc->parentMallocHeadFp = pCfg->parentMallocHeadFp; + pProc->parentFreeHeadFp = pCfg->parentFreeHeadFp; + pProc->parentMallocBodyFp = pCfg->parentMallocBodyFp; + pProc->parentFreeBodyFp = pCfg->parentFreeBodyFp; + pProc->parentConsumeFp = pCfg->parentConsumeFp; + pProc->isChild = pCfg->isChild; - uDebug("proc:%s, is initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue); - - pProc->pid = fork(); - if (pProc->pid == 0) { - pProc->isChild = 1; - prctl(PR_SET_NAME, pProc->name, NULL, NULL, NULL); - } else { - pProc->isChild = 0; - uInfo("this is parent process, child pid:%d", pProc->pid); - } + uDebug("proc:%s, is initialized, child:%d child queue:%p parent queue:%p", pProc->name, pProc->isChild, + pProc->pChildQueue, pProc->pParentQueue); return pProc; } -static void taosProcThreadLoop(SProcQueue *pQueue) { - ProcConsumeFp consumeFp = pQueue->consumeFp; - void *pParent = pQueue->pParent; +static void taosProcThreadLoop(SProcObj *pProc) { void *pHead, *pBody; int16_t headLen; ProcFuncType ftype; int32_t bodyLen; + SProcQueue *pQueue; + ProcConsumeFp consumeFp; + ProcMallocFp mallocHeadFp; + ProcFreeFp freeHeadFp; + ProcMallocFp mallocBodyFp; + ProcFreeFp freeBodyFp; - uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue); + if (pProc->isChild) { + pQueue = pProc->pChildQueue; + consumeFp = pProc->childConsumeFp; + mallocHeadFp = pProc->childMallocHeadFp; + freeHeadFp = pProc->childFreeHeadFp; + mallocBodyFp = pProc->childMallocBodyFp; + freeBodyFp = pProc->childFreeBodyFp; + } else { + pQueue = pProc->pParentQueue; + consumeFp = pProc->parentConsumeFp; + mallocHeadFp = pProc->parentMallocHeadFp; + freeHeadFp = pProc->parentFreeHeadFp; + mallocBodyFp = pProc->parentMallocBodyFp; + freeBodyFp = pProc->parentFreeBodyFp; + } + + uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue); while (1) { - int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype); + int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp, + mallocBodyFp, freeBodyFp); if (numOfMsgs == 0) { - uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue); + uInfo("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue); break; } else if (numOfMsgs < 0) { - uTrace("proc:%s, get no msg from queue:%p since %s", pQueue->name, pQueue, terrstr()); + uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr()); taosMsleep(1); continue; } else { - (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen, ftype); + (*consumeFp)(pProc->pParent, pHead, headLen, pBody, bodyLen, ftype); } } } @@ -426,34 +394,30 @@ int32_t taosProcRun(SProcObj *pProc) { taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (pProc->isChild) { - if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create thread since %s", terrstr()); - return -1; - } - uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue); - } else { - if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create thread since %s", terrstr()); - return -1; - } - uDebug("proc:%s, parent start to consume queue:%p", pProc->name, pProc->pParentQueue); + if (taosThreadCreate(&pProc->thread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to create thread since %s", terrstr()); + return -1; } + uDebug("proc:%s, start to consume queue:%p", pProc->name, pProc->pChildQueue); return 0; } -void taosProcStop(SProcObj *pProc) { - pProc->stopFlag = true; - // todo join +static void taosProcStop(SProcObj *pProc) { + if (!taosCheckPthreadValid(pProc->thread)) return; + + uDebug("proc:%s, start to join thread", pProc->name); + SProcQueue *pQueue; + if (pProc->isChild) { + pQueue = pProc->pParentQueue; + } else { + pQueue = pProc->pChildQueue; + } + tsem_post(&pQueue->sem); + taosThreadJoin(pProc->thread, NULL); } -bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } - -int32_t taosProcChildId(SProcObj *pProc) { return pProc->pid; } - void taosProcCleanup(SProcObj *pProc) { if (pProc != NULL) { uDebug("proc:%s, clean up", pProc->name);