diff --git a/source/dnode/mgmt/main/exe/dndMain.c b/source/dnode/mgmt/main/exe/dndMain.c index 61b480990d..0d2ddbfbbc 100644 --- a/source/dnode/mgmt/main/exe/dndMain.c +++ b/source/dnode/mgmt/main/exe/dndMain.c @@ -37,13 +37,6 @@ static void dndStopDnode(int signum, void *info, void *ctx) { } } -static void dndHandleChild(int signum, void *info, void *ctx) { - dInfo("sigchild received"); - if (global.pDnode != NULL) { - dndHandleEvent(global.pDnode, DND_EVENT_CHILD); - } -} - static void dndSetSignalHandle() { taosSetSignal(SIGTERM, dndStopDnode); taosSetSignal(SIGHUP, dndStopDnode); @@ -53,7 +46,7 @@ static void dndSetSignalHandle() { if (!tsMultiProcess) { } else if (global.ntype == DNODE || global.ntype == NODE_MAX) { - taosSetSignal(SIGCHLD, dndHandleChild); + taosIgnSignal(SIGCHLD); } else { taosKillChildOnParentStopped(); } diff --git a/source/dnode/mgmt/main/src/dndEnv.c b/source/dnode/mgmt/main/src/dndEnv.c new file mode 100644 index 0000000000..8792147822 --- /dev/null +++ b/source/dnode/mgmt/main/src/dndEnv.c @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndInt.h" +#include "wal.h" + +static int8_t once = DND_ENV_INIT; + +int32_t dndInit() { + dDebug("start to init dnode env"); + if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { + terrno = TSDB_CODE_REPEAT_INIT; + dError("failed to init dnode env since %s", terrstr()); + return -1; + } + + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + + SMonCfg monCfg = {0}; + monCfg.maxLogs = tsMonitorMaxLogs; + monCfg.port = tsMonitorPort; + monCfg.server = tsMonitorFqdn; + monCfg.comp = tsMonitorComp; + if (monInit(&monCfg) != 0) { + dError("failed to init monitor since %s", terrstr()); + return -1; + } + + dInfo("dnode env is initialized"); + return 0; +} + +void dndCleanup() { + dDebug("start to cleanup dnode env"); + if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { + dError("dnode env is already cleaned up"); + return; + } + + monCleanup(); + walCleanUp(); + taosStopCacheRefreshWorker(); + dInfo("dnode env is cleaned up"); +} + +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { + pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; + pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; +} + +EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } + +void dndSetStatus(SDnode *pDnode, EDndStatus status) { + if (pDnode->status != status) { + dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status)); + pDnode->status = status; + } +} + +void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { + SStartupReq *pStartup = &pDnode->startup; + tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); + tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); + pStartup->finished = 0; +} + +void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { + memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); + pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); +} + +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { + dDebug("startup req is received"); + SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); + dndGetStartup(pDnode, pStartup); + + dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); + SRpcMsg rpcRsp = { + .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle}; + rpcSendResponse(&rpcRsp); +} diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 2999a30d67..fdc6125fb0 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -128,7 +128,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) { } pWrapper->procId = pid; - dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid); + dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid); return 0; } @@ -263,21 +263,21 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { if (!pWrapper->required) continue; if (pDnode->ntype == NODE_MAX) continue; - if (pWrapper->procId != 0 && !taosProcExists(pWrapper->procId)) { - dInfo("node:%s, process not exist, pid:%d", pWrapper->name, pWrapper->procId); + if (pWrapper->procId <= 0 || !taosProcExists(pWrapper->procId)) { + dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId); dndNewProc(pWrapper, n); } - - taosMsleep(100); } + + taosMsleep(100); } return 0; } static int32_t dndRunInChildProcess(SDnode *pDnode) { - dInfo("dnode run in child process"); SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + dInfo("%s run in child process", pWrapper->name); SMsgCb msgCb = dndCreateMsgcb(pWrapper); tmsgSetDefaultMsgCb(&msgCb); diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 8792147822..602ebc6b3c 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -15,82 +15,186 @@ #define _DEFAULT_SOURCE #include "dndInt.h" -#include "wal.h" -static int8_t once = DND_ENV_INIT; +static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { + pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; + pDnode->serverPort = pOption->serverPort; + pDnode->dataDir = strdup(pOption->dataDir); + pDnode->localEp = strdup(pOption->localEp); + pDnode->localFqdn = strdup(pOption->localFqdn); + pDnode->firstEp = strdup(pOption->firstEp); + pDnode->secondEp = strdup(pOption->secondEp); + pDnode->disks = pOption->disks; + pDnode->numOfDisks = pOption->numOfDisks; + pDnode->ntype = pOption->ntype; + pDnode->rebootTime = taosGetTimestampMs(); -int32_t dndInit() { - dDebug("start to init dnode env"); - if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { - terrno = TSDB_CODE_REPEAT_INIT; - dError("failed to init dnode env since %s", terrstr()); + if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || + pDnode->secondEp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - taosIgnSIGPIPE(); - taosBlockSIGPIPE(); - taosResolveCRC(); - - SMonCfg monCfg = {0}; - monCfg.maxLogs = tsMonitorMaxLogs; - monCfg.port = tsMonitorPort; - monCfg.server = tsMonitorFqdn; - monCfg.comp = tsMonitorComp; - if (monInit(&monCfg) != 0) { - dError("failed to init monitor since %s", terrstr()); - return -1; + if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { + pDnode->lockfile = dndCheckRunning(pDnode->dataDir); + if (pDnode->lockfile == NULL) { + return -1; + } } - dInfo("dnode env is initialized"); return 0; } -void dndCleanup() { - dDebug("start to cleanup dnode env"); - if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { - dError("dnode env is already cleaned up"); +static void dndClearVars(SDnode *pDnode) { + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; + taosMemoryFreeClear(pMgmt->path); + } + if (pDnode->lockfile != NULL) { + taosUnLockFile(pDnode->lockfile); + taosCloseFile(&pDnode->lockfile); + pDnode->lockfile = NULL; + } + taosMemoryFreeClear(pDnode->localEp); + taosMemoryFreeClear(pDnode->localFqdn); + taosMemoryFreeClear(pDnode->firstEp); + taosMemoryFreeClear(pDnode->secondEp); + taosMemoryFreeClear(pDnode->dataDir); + taosMemoryFree(pDnode); + dDebug("dnode memory is cleared, data:%p", pDnode); +} + +SDnode *dndCreate(const SDnodeOpt *pOption) { + dDebug("start to create dnode object"); + int32_t code = -1; + char path[PATH_MAX] = {0}; + SDnode *pDnode = NULL; + + pDnode = taosMemoryCalloc(1, sizeof(SDnode)); + if (pDnode == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + if (dndInitVars(pDnode, pOption) != 0) { + dError("failed to init variables since %s", terrstr()); + goto _OVER; + } + + dndSetStatus(pDnode, DND_STAT_INIT); + dmGetMgmtFp(&pDnode->wrappers[DNODE]); + mmGetMgmtFp(&pDnode->wrappers[MNODE]); + vmGetMgmtFp(&pDnode->wrappers[VNODES]); + qmGetMgmtFp(&pDnode->wrappers[QNODE]); + smGetMgmtFp(&pDnode->wrappers[SNODE]); + bmGetMgmtFp(&pDnode->wrappers[BNODE]); + + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); + pWrapper->path = strdup(path); + pWrapper->shm.id = -1; + pWrapper->pDnode = pDnode; + if (pWrapper->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + + pWrapper->procType = PROC_SINGLE; + taosInitRWLatch(&pWrapper->latch); + } + + if (dndInitMsgHandle(pDnode) != 0) { + dError("failed to msg handles since %s", terrstr()); + goto _OVER; + } + + if (dndReadShmFile(pDnode) != 0) { + dError("failed to read shm file since %s", terrstr()); + goto _OVER; + } + + SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); + tmsgSetDefaultMsgCb(&msgCb); + + dInfo("dnode is created, data:%p", pDnode); + code = 0; + +_OVER: + if (code != 0 && pDnode) { + dndClearVars(pDnode); + pDnode = NULL; + dError("failed to create dnode since %s", terrstr()); + } + + return pDnode; +} + +void dndClose(SDnode *pDnode) { + if (pDnode == NULL) return; + + if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { + dError("dnode is shutting down, data:%p", pDnode); return; } - monCleanup(); - walCleanUp(); - taosStopCacheRefreshWorker(); - dInfo("dnode env is cleaned up"); + dInfo("start to close dnode, data:%p", pDnode); + dndSetStatus(pDnode, DND_STAT_STOPPED); + + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + dndCloseNode(pWrapper); + } + + dndClearVars(pDnode); + dInfo("dnode is closed, data:%p", pDnode); } -void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { - pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; - pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; -} - -EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } - -void dndSetStatus(SDnode *pDnode, EDndStatus status) { - if (pDnode->status != status) { - dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status)); - pDnode->status = status; +void dndHandleEvent(SDnode *pDnode, EDndEvent event) { + dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode); + if (event == DND_EVENT_STOP) { + pDnode->event = event; } } -void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { - SStartupReq *pStartup = &pDnode->startup; - tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); - tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); - pStartup->finished = 0; +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; + SMgmtWrapper *pRetWrapper = pWrapper; + + taosRLockLatch(&pWrapper->latch); + if (pWrapper->deployed) { + int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); + dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount); + } else { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + pRetWrapper = NULL; + } + taosRUnLockLatch(&pWrapper->latch); + + return pRetWrapper; } -void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { - memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); - pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); +int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { + int32_t code = 0; + + taosRLockLatch(&pWrapper->latch); + if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) { + int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); + dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); + } else { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + code = -1; + } + taosRUnLockLatch(&pWrapper->latch); + + return code; } -void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("startup req is received"); - SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); - dndGetStartup(pDnode, pStartup); +void dndReleaseWrapper(SMgmtWrapper *pWrapper) { + if (pWrapper == NULL) return; - dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished); - SRpcMsg rpcRsp = { - .handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle}; - rpcSendResponse(&rpcRsp); -} + taosRLockLatch(&pWrapper->latch); + int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); + taosRUnLockLatch(&pWrapper->latch); + dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); +} \ No newline at end of file diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c deleted file mode 100644 index 602ebc6b3c..0000000000 --- a/source/dnode/mgmt/main/src/dndObj.c +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dndInt.h" - -static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { - pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; - pDnode->serverPort = pOption->serverPort; - pDnode->dataDir = strdup(pOption->dataDir); - pDnode->localEp = strdup(pOption->localEp); - pDnode->localFqdn = strdup(pOption->localFqdn); - pDnode->firstEp = strdup(pOption->firstEp); - pDnode->secondEp = strdup(pOption->secondEp); - pDnode->disks = pOption->disks; - pDnode->numOfDisks = pOption->numOfDisks; - pDnode->ntype = pOption->ntype; - pDnode->rebootTime = taosGetTimestampMs(); - - if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || - pDnode->secondEp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) { - pDnode->lockfile = dndCheckRunning(pDnode->dataDir); - if (pDnode->lockfile == NULL) { - return -1; - } - } - - return 0; -} - -static void dndClearVars(SDnode *pDnode) { - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; - taosMemoryFreeClear(pMgmt->path); - } - if (pDnode->lockfile != NULL) { - taosUnLockFile(pDnode->lockfile); - taosCloseFile(&pDnode->lockfile); - pDnode->lockfile = NULL; - } - taosMemoryFreeClear(pDnode->localEp); - taosMemoryFreeClear(pDnode->localFqdn); - taosMemoryFreeClear(pDnode->firstEp); - taosMemoryFreeClear(pDnode->secondEp); - taosMemoryFreeClear(pDnode->dataDir); - taosMemoryFree(pDnode); - dDebug("dnode memory is cleared, data:%p", pDnode); -} - -SDnode *dndCreate(const SDnodeOpt *pOption) { - dDebug("start to create dnode object"); - int32_t code = -1; - char path[PATH_MAX] = {0}; - SDnode *pDnode = NULL; - - pDnode = taosMemoryCalloc(1, sizeof(SDnode)); - if (pDnode == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - if (dndInitVars(pDnode, pOption) != 0) { - dError("failed to init variables since %s", terrstr()); - goto _OVER; - } - - dndSetStatus(pDnode, DND_STAT_INIT); - dmGetMgmtFp(&pDnode->wrappers[DNODE]); - mmGetMgmtFp(&pDnode->wrappers[MNODE]); - vmGetMgmtFp(&pDnode->wrappers[VNODES]); - qmGetMgmtFp(&pDnode->wrappers[QNODE]); - smGetMgmtFp(&pDnode->wrappers[SNODE]); - bmGetMgmtFp(&pDnode->wrappers[BNODE]); - - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name); - pWrapper->path = strdup(path); - pWrapper->shm.id = -1; - pWrapper->pDnode = pDnode; - if (pWrapper->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - pWrapper->procType = PROC_SINGLE; - taosInitRWLatch(&pWrapper->latch); - } - - if (dndInitMsgHandle(pDnode) != 0) { - dError("failed to msg handles since %s", terrstr()); - goto _OVER; - } - - if (dndReadShmFile(pDnode) != 0) { - dError("failed to read shm file since %s", terrstr()); - goto _OVER; - } - - SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); - tmsgSetDefaultMsgCb(&msgCb); - - dInfo("dnode is created, data:%p", pDnode); - code = 0; - -_OVER: - if (code != 0 && pDnode) { - dndClearVars(pDnode); - pDnode = NULL; - dError("failed to create dnode since %s", terrstr()); - } - - return pDnode; -} - -void dndClose(SDnode *pDnode) { - if (pDnode == NULL) return; - - if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { - dError("dnode is shutting down, data:%p", pDnode); - return; - } - - dInfo("start to close dnode, data:%p", pDnode); - dndSetStatus(pDnode, DND_STAT_STOPPED); - - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - dndCloseNode(pWrapper); - } - - dndClearVars(pDnode); - dInfo("dnode is closed, data:%p", pDnode); -} - -void dndHandleEvent(SDnode *pDnode, EDndEvent event) { - dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode); - if (event == DND_EVENT_STOP) { - pDnode->event = event; - } -} - -SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SMgmtWrapper *pRetWrapper = pWrapper; - - taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed) { - int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); - dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount); - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - pRetWrapper = NULL; - } - taosRUnLockLatch(&pWrapper->latch); - - return pRetWrapper; -} - -int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) { - int32_t code = 0; - - taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) { - int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); - dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - code = -1; - } - taosRUnLockLatch(&pWrapper->latch); - - return code; -} - -void dndReleaseWrapper(SMgmtWrapper *pWrapper) { - if (pWrapper == NULL) return; - - taosRLockLatch(&pWrapper->latch); - int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); - taosRUnLockLatch(&pWrapper->latch); - dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); -} \ No newline at end of file diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index 00ed650cbb..2d2174a4c8 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -23,6 +23,9 @@ int32_t taosNewProc(char **args) { int32_t pid = fork(); if (pid == 0) { args[0] = tsProcPath; + close(STDIN_FILENO); + close(STDOUT_FILENO); + close(STDERR_FILENO); return execvp(tsProcPath, args); } else { return pid; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 04061cbaf1..9101d3c7c8 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -590,12 +590,12 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) { } int32_t cfgLoadFromEnvVar(SConfig *pConfig) { - uInfo("load from global env variables success"); + uInfo("load from global env variables not implemented yet"); return 0; } int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *filepath) { - uInfo("load from env file [%s] success", filepath); + uInfo("load from env file not implemented yet"); return 0; } @@ -649,11 +649,11 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) { taosCloseFile(&pFile); if (line != NULL) taosMemoryFreeClear(line); - uInfo("load from cfg file [%s] success", filepath); + uInfo("load from cfg file %s success", filepath); return 0; } int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { - uInfo("load from apoll url [%s] success", url); + uInfo("load from apoll url not implemented yet"); return 0; } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index f18e7d5777..1d41bd4a48 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -140,14 +140,16 @@ static void taosProcDestroySem(SProcQueue *pQueue) { pQueue->sem = NULL; } } +#endif static void taosProcCleanupQueue(SProcQueue *pQueue) { +#if 0 if (pQueue != NULL) { taosProcDestroyMutex(pQueue); taosProcDestroySem(pQueue); } -} #endif +} static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, int32_t rawBodyLen, ProcFuncType ftype) { @@ -222,7 +224,6 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea taosThreadMutexLock(&pQueue->mutex); if (pQueue->total - pQueue->avail <= 0) { taosThreadMutexUnlock(&pQueue->mutex); - tsem_post(&pQueue->sem); terrno = TSDB_CODE_OUT_OF_SHM_MEM; return 0; } @@ -317,7 +318,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize); pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize); if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { - // taosProcCleanupQueue(pProc->pChildQueue); + taosProcCleanupQueue(pProc->pChildQueue); taosMemoryFree(pProc); return NULL; } @@ -370,13 +371,13 @@ static void taosProcThreadLoop(SProcObj *pProc) { freeBodyFp = pProc->parentFreeBodyFp; } - uDebug("proc:%s, start to get msg from queue:%p, isChild:%d", pProc->name, pQueue, pProc->isChild); + uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue); while (1) { int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp, mallocBodyFp, freeBodyFp); if (numOfMsgs == 0) { - uInfo("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue); + uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue); break; } else if (numOfMsgs < 0) { uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr()); @@ -406,7 +407,7 @@ int32_t taosProcRun(SProcObj *pProc) { static void taosProcStop(SProcObj *pProc) { if (!taosCheckPthreadValid(pProc->thread)) return; - uDebug("proc:%s, start to join thread:%" PRId64 ", isChild:%d", pProc->name, pProc->thread, pProc->isChild); + uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread); SProcQueue *pQueue; if (pProc->isChild) { pQueue = pProc->pChildQueue; @@ -421,9 +422,9 @@ void taosProcCleanup(SProcObj *pProc) { if (pProc != NULL) { uDebug("proc:%s, start to clean up", pProc->name); taosProcStop(pProc); + taosProcCleanupQueue(pProc->pChildQueue); + taosProcCleanupQueue(pProc->pParentQueue); uDebug("proc:%s, is cleaned up", pProc->name); - // taosProcCleanupQueue(pProc->pChildQueue); - // taosProcCleanupQueue(pProc->pParentQueue); taosMemoryFree(pProc); } }