From d7e13ef739236875e63b368dfabd7afe826d4228 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 25 Mar 2022 18:22:59 +0800 Subject: [PATCH] shm --- include/util/tprocess.h | 1 + source/dnode/mgmt/container/src/dndExec.c | 25 ++++++++++++++--------- source/dnode/mgmt/container/src/dndObj.c | 2 +- source/util/src/tprocess.c | 10 +++++++-- 4 files changed, 25 insertions(+), 13 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 4ce536fd96..a0be38a3ad 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -51,6 +51,7 @@ void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc); +int32_t taosProcChildId(SProcObj *pProc); int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index 4dcf7e80ec..a6c9852546 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -20,7 +20,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) { char logname[24] = {0}; snprintf(logname, sizeof(logname), "%slog", pMgmt->name); - dInfo("node:%s, reset log to %s", pMgmt->name, logname); + dInfo("node:%s, reset log to %s in child process", pMgmt->name, logname); taosCloseLog(); taosInitLog(logname, 1); } @@ -51,6 +51,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) { void dndCloseNode(SMgmtWrapper *pWrapper) { dDebug("node:%s, start to close", pWrapper->name); + pWrapper->required = false; taosWLockLatch(&pWrapper->latch); if (pWrapper->deployed) { (*pWrapper->fp.closeFp)(pWrapper); @@ -199,7 +200,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { dInfo("node:%s, will be initialized in child process", pWrapper->name); dndOpenNode(pWrapper); } else { - dInfo("node:%s, will not start in parent process", pWrapper->name); + dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc)); pWrapper->procType = PROC_PARENT; } @@ -209,16 +210,20 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { } } -#if 0 - SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); - if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) { - dndReleaseWrapper(pWrapper); - dError("failed to start dnode worker since %s", terrstr()); - return -1; + dndSetStatus(pDnode, DND_STAT_RUNNING); + + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; + if (pWrapper->fp.startFp == NULL) continue; + if (pWrapper->procType == PROC_PARENT && n != DNODE) continue; + if (pWrapper->procType == PROC_CHILD && n == DNODE) continue; + if ((*pWrapper->fp.startFp)(pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; + } } - dndReleaseWrapper(pWrapper); -#endif return 0; } diff --git a/source/dnode/mgmt/container/src/dndObj.c b/source/dnode/mgmt/container/src/dndObj.c index d618f4e503..2123f5946b 100644 --- a/source/dnode/mgmt/container/src/dndObj.c +++ b/source/dnode/mgmt/container/src/dndObj.c @@ -175,7 +175,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { int32_t code = 0; taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed) { + if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) { int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); } else { diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 24f3f8a6e4..f32f578952 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -208,6 +208,11 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; + if (headLen <= 0 || bodyLen <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + taosThreadMutexLock(pQueue->mutex); if (fullLen > pQueue->avail) { taosThreadMutexUnlock(pQueue->mutex); @@ -259,7 +264,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea taosThreadMutexUnlock(pQueue->mutex); tsem_post(&pQueue->sem); - uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, rawHeadLen, pBody, rawBodyLen, pQueue); + uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue); return 0; } @@ -376,7 +381,6 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { pProc->pid = fork(); if (pProc->pid == 0) { pProc->isChild = 1; - uInfo("this is child process, pid:%d", pProc->pid); } else { pProc->isChild = 0; uInfo("this is parent process, child pid:%d", pProc->pid); @@ -439,6 +443,8 @@ void taosProcStop(SProcObj *pProc) { bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } +int32_t taosProcChildId(SProcObj *pProc) { return pProc->pid; } + void taosProcCleanup(SProcObj *pProc) { if (pProc != NULL) { uDebug("proc:%s, clean up", pProc->name);