From 4c2ce0b407ed816e39519a5f8a1fba80f4716e09 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 31 Mar 2022 11:15:20 +0800 Subject: [PATCH] shm --- include/os/osProc.h | 9 +- include/os/osSignal.h | 2 +- source/dnode/mgmt/main/exe/dndMain.c | 12 +- source/dnode/mgmt/main/src/dndExec.c | 213 +++++++++++++++------------ source/dnode/mgmt/main/src/dndObj.c | 4 +- source/os/src/osProc.c | 35 ++++- source/os/src/osSignal.c | 2 +- 7 files changed, 163 insertions(+), 114 deletions(-) diff --git a/include/os/osProc.h b/include/os/osProc.h index e76e22a54e..0b22105e5e 100644 --- a/include/os/osProc.h +++ b/include/os/osProc.h @@ -20,11 +20,10 @@ extern "C" { #endif -// start a copy of itself -int32_t taosNewProc(const char *args); - -// the length of the new name must be less than the original name to take effect -void taosSetProcName(char **argv, const char *name); +int32_t taosNewProc(char **args); +void taosSetProcName(int32_t argc, char **argv, const char *name); +void taosSetProcPath(int32_t argc, char **argv); +bool taosProcExists(int32_t pid); #ifdef __cplusplus } diff --git a/include/os/osSignal.h b/include/os/osSignal.h index e9fb13e870..e22c43684c 100644 --- a/include/os/osSignal.h +++ b/include/os/osSignal.h @@ -49,7 +49,7 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp); void taosIgnSignal(int32_t signum); void taosDflSignal(int32_t signum); -void taosKillChildOnSelfStopped(); +void taosKillChildOnParentStopped(); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/main/exe/dndMain.c b/source/dnode/mgmt/main/exe/dndMain.c index 1dcb312724..1bfa23c022 100644 --- a/source/dnode/mgmt/main/exe/dndMain.c +++ b/source/dnode/mgmt/main/exe/dndMain.c @@ -50,13 +50,10 @@ static void dndSetSignalHandle() { taosSetSignal(SIGBREAK, dndStopDnode); if (!tsMultiProcess) { - // Set the single process signal } else if (global.ntype == DNODE) { - // When the child process exits, the parent process receives a signal taosSetSignal(SIGCHLD, dndHandleChild); } else { - // When the parent process exits, the child process will receive the SIGKILL signal - taosKillChildOnSelfStopped(); + taosKillChildOnParentStopped(); } } @@ -140,10 +137,11 @@ static int32_t dndInitLog() { return taosCreateLog(logName, 1, configDir, global.envFile, global.apolloUrl, global.pArgs, 0); } -static void dndSetProcName(char **argv) { +static void dndSetProcInfo(int32_t argc, char **argv) { + taosSetProcPath(argc, argv); if (global.ntype != DNODE) { const char *name = dndNodeProcStr(global.ntype); - taosSetProcName(argv, name); + taosSetProcName(argc, argv, name); } } @@ -186,6 +184,7 @@ int main(int argc, char const *argv[]) { return -1; } + dndSetProcInfo(argc, (char **)argv); if (global.generateGrant) { dndGenerateGrant(); return 0; @@ -213,6 +212,5 @@ int main(int argc, char const *argv[]) { return 0; } - dndSetProcName((char **)argv); return dndRunDnode(); } diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index d74adf45d1..7b8975da88 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -17,8 +17,8 @@ #include "dndInt.h" static bool dndRequireNode(SMgmtWrapper *pWrapper) { - bool required = false; - int32_t code =(*pWrapper->fp.requiredFp)(pWrapper, &required); + bool required = false; + int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required); if (!required) { dDebug("node:%s, no need to start", pWrapper->name); } else { @@ -65,36 +65,6 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { dDebug("node:%s, has been closed", pWrapper->name); } -static int32_t dndRunInSingleProcess(SDnode *pDnode) { - dInfo("dnode start to run in single process"); - - for (ENodeType n = DNODE; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - pWrapper->required = dndRequireNode(pWrapper); - if (!pWrapper->required) continue; - - if (dndOpenNode(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; - } - } - - dndSetStatus(pDnode, DND_STAT_RUNNING); - - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pWrapper->fp.startFp == NULL) continue; - if ((*pWrapper->fp.startFp)(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; - } - } - - dInfo("dnode running in single process"); - return 0; -} - static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; @@ -140,6 +110,84 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t taosMemoryFree(pMsg); } +static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { + char tstr[8] = {0}; + char *args[6] = {0}; + snprintf(tstr, sizeof(tstr), "%d", n); + args[1] = "-c"; + args[2] = configDir; + args[3] = "-n"; + args[4] = tstr; + args[5] = NULL; + + int32_t pid = taosNewProc(args); + if (pid <= 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr()); + return -1; + } + + pWrapper->procId = pid; + dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid); + return 0; +} + +static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->shm, + .pParent = pWrapper, + .name = pWrapper->name}; + return cfg; +} + +static int32_t dndRunInSingleProcess(SDnode *pDnode) { + dInfo("dnode start to run in single process"); + + for (ENodeType n = DNODE; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + pWrapper->required = dndRequireNode(pWrapper); + if (!pWrapper->required) continue; + + if (dndOpenNode(pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; + } + } + + dndSetStatus(pDnode, DND_STAT_RUNNING); + + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; + if (pWrapper->fp.startFp == NULL) continue; + if ((*pWrapper->fp.startFp)(pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; + } + } + + dInfo("TDengine initialized successfully"); + dndReportStartup(pDnode, "TDengine", "initialized successfully"); + while (1) { + if (pDnode->event == DND_EVENT_STOP) { + dInfo("dnode is about to stop"); + break; + } + taosMsleep(100); + } + + return 0; +} + static int32_t dndRunInParentProcess(SDnode *pDnode) { dInfo("dnode start to run in parent process"); SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE]; @@ -160,21 +208,8 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { return -1; } - SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, - .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .shm = pWrapper->shm, - .pParent = pWrapper, - .isChild = false, - .name = pWrapper->name}; - + SProcCfg cfg = dndGenProcCfg(pWrapper); + cfg.isChild = false; pWrapper->procType = PROC_PARENT; pWrapper->pProc = taosProcInit(&cfg); if (pWrapper->pProc == NULL) { @@ -195,15 +230,9 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { if (pDnode->ntype == NODE_MAX) { dInfo("node:%s, should be started manually", pWrapper->name); } else { - char args[PATH_MAX]; - int32_t pid = taosNewProc(args); - if (pid <= 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr()); + if (dndNewProc(pWrapper, n) != 0) { return -1; } - pWrapper->procId = pid; - dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid); } if (taosProcRun(pWrapper->pProc) != 0) { @@ -219,7 +248,29 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { return -1; } - dInfo("dnode running in parent process"); + dInfo("TDengine initialized successfully"); + dndReportStartup(pDnode, "TDengine", "initialized successfully"); + + while (1) { + if (pDnode->event == DND_EVENT_STOP) { + dInfo("dnode is about to stop"); + break; + } + + for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; + if (pDnode->ntype == NODE_MAX) continue; + + if (pWrapper->procId != 0 && !taosProcExists(pWrapper->procId)) { + dInfo("node:%s, process not exist, pid:%d", pWrapper->name, pWrapper->procId); + dndNewProc(pWrapper, n); + } + + taosMsleep(100); + } + } + return 0; } @@ -236,21 +287,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { return -1; } - SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, - .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .shm = pWrapper->shm, - .pParent = pWrapper, - .isChild = true, - .name = pWrapper->name}; - + SProcCfg cfg = dndGenProcCfg(pWrapper); + cfg.isChild = true; pWrapper->pProc = taosProcInit(&cfg); if (pWrapper->pProc == NULL) { dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); @@ -269,31 +307,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { return -1; } - dInfo("dnode running in child process"); - return 0; -} - -int32_t dndRun(SDnode * pDnode) { - if (!tsMultiProcess) { - if (dndRunInSingleProcess(pDnode) != 0) { - dError("failed to run dnode since %s", terrstr()); - return -1; - } - } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { - if (dndRunInParentProcess(pDnode) != 0) { - dError("failed to run dnode in parent process since %s", terrstr()); - return -1; - } - } else { - if (dndRunInChildProcess(pDnode) != 0) { - dError("failed to run dnode in child process since %s", terrstr()); - return -1; - } - } - - dndReportStartup(pDnode, "TDengine", "initialized successfully"); dInfo("TDengine initialized successfully"); - + dndReportStartup(pDnode, "TDengine", "initialized successfully"); while (1) { if (pDnode->event == DND_EVENT_STOP) { dInfo("dnode is about to stop"); @@ -301,6 +316,16 @@ int32_t dndRun(SDnode * pDnode) { } taosMsleep(100); } +} + +int32_t dndRun(SDnode *pDnode) { + if (!tsMultiProcess) { + return dndRunInSingleProcess(pDnode); + } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { + return dndRunInParentProcess(pDnode); + } else { + return dndRunInChildProcess(pDnode); + } return 0; } diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index 44013deed8..ed2b669587 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -152,7 +152,9 @@ void dndClose(SDnode *pDnode) { void dndHandleEvent(SDnode *pDnode, EDndEvent event) { dInfo("dnode object receive event %d, data:%p", event, pDnode); - pDnode->event = event; + if (event == DND_EVENT_STOP) { + pDnode->event = event; + } } SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) { diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index 6c58e71003..7da25ae8c6 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -17,11 +17,36 @@ #define _DEFAULT_SOURCE #include "os.h" -int32_t taosNewProc(const char *args) { - return 0; +static char *tsProcPath = NULL; + +int32_t taosNewProc(char **args) { + int32_t pid = fork(); + if (pid == 0) { + args[0] = tsProcPath; + return execvp(tsProcPath, args); + } else { + return pid; + } } -void taosSetProcName(char **argv, const char *name) { +// the length of the new name must be less than the original name to take effect +void taosSetProcName(int32_t argc, char **argv, const char *name) { prctl(PR_SET_NAME, name); - strcpy(argv[0], name); -} \ No newline at end of file + + for (int32_t i = 0; i < argc; ++i) { + int32_t len = strlen(argv[i]); + for (int32_t j = 0; j < len; ++j) { + argv[i][j] = 0; + } + if (i == 0) { + tstrncpy(argv[0], name, len); + } + } +} + +void taosSetProcPath(int32_t argc, char **argv) { tsProcPath = argv[0]; } + +bool taosProcExists(int32_t pid) { + int32_t p = getpgid(pid); + return p == 0; +} diff --git a/source/os/src/osSignal.c b/source/os/src/osSignal.c index ce029cdfe5..d4e6cb3318 100644 --- a/source/os/src/osSignal.c +++ b/source/os/src/osSignal.c @@ -71,6 +71,6 @@ void taosIgnSignal(int32_t signum) { signal(signum, SIG_IGN); } void taosDflSignal(int32_t signum) { signal(signum, SIG_DFL); } -void taosKillChildOnSelfStopped() { prctl(PR_SET_PDEATHSIG, SIGKILL); } +void taosKillChildOnParentStopped() { prctl(PR_SET_PDEATHSIG, SIGKILL); } #endif