commit
c649e345e7
|
@ -37,13 +37,6 @@ static void dndStopDnode(int signum, void *info, void *ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
static void dndHandleChild(int signum, void *info, void *ctx) {
|
||||
dInfo("sigchild received");
|
||||
if (global.pDnode != NULL) {
|
||||
dndHandleEvent(global.pDnode, DND_EVENT_CHILD);
|
||||
}
|
||||
}
|
||||
|
||||
static void dndSetSignalHandle() {
|
||||
taosSetSignal(SIGTERM, dndStopDnode);
|
||||
taosSetSignal(SIGHUP, dndStopDnode);
|
||||
|
@ -53,7 +46,7 @@ static void dndSetSignalHandle() {
|
|||
|
||||
if (!tsMultiProcess) {
|
||||
} else if (global.ntype == DNODE || global.ntype == NODE_MAX) {
|
||||
taosSetSignal(SIGCHLD, dndHandleChild);
|
||||
taosIgnSignal(SIGCHLD);
|
||||
} else {
|
||||
taosKillChildOnParentStopped();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dndInt.h"
|
||||
#include "wal.h"
|
||||
|
||||
static int8_t once = DND_ENV_INIT;
|
||||
|
||||
int32_t dndInit() {
|
||||
dDebug("start to init dnode env");
|
||||
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
|
||||
terrno = TSDB_CODE_REPEAT_INIT;
|
||||
dError("failed to init dnode env since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosIgnSIGPIPE();
|
||||
taosBlockSIGPIPE();
|
||||
taosResolveCRC();
|
||||
|
||||
SMonCfg monCfg = {0};
|
||||
monCfg.maxLogs = tsMonitorMaxLogs;
|
||||
monCfg.port = tsMonitorPort;
|
||||
monCfg.server = tsMonitorFqdn;
|
||||
monCfg.comp = tsMonitorComp;
|
||||
if (monInit(&monCfg) != 0) {
|
||||
dError("failed to init monitor since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("dnode env is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dndCleanup() {
|
||||
dDebug("start to cleanup dnode env");
|
||||
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
|
||||
dError("dnode env is already cleaned up");
|
||||
return;
|
||||
}
|
||||
|
||||
monCleanup();
|
||||
walCleanUp();
|
||||
taosStopCacheRefreshWorker();
|
||||
dInfo("dnode env is cleaned up");
|
||||
}
|
||||
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
|
||||
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
|
||||
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
|
||||
}
|
||||
|
||||
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
|
||||
|
||||
void dndSetStatus(SDnode *pDnode, EDndStatus status) {
|
||||
if (pDnode->status != status) {
|
||||
dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status));
|
||||
pDnode->status = status;
|
||||
}
|
||||
}
|
||||
|
||||
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
||||
SStartupReq *pStartup = &pDnode->startup;
|
||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||
pStartup->finished = 0;
|
||||
}
|
||||
|
||||
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
||||
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
|
||||
}
|
||||
|
||||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
dDebug("startup req is received");
|
||||
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
|
||||
dndGetStartup(pDnode, pStartup);
|
||||
|
||||
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
|
@ -128,7 +128,7 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
|
|||
}
|
||||
|
||||
pWrapper->procId = pid;
|
||||
dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid);
|
||||
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -263,21 +263,21 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
|||
if (!pWrapper->required) continue;
|
||||
if (pDnode->ntype == NODE_MAX) continue;
|
||||
|
||||
if (pWrapper->procId != 0 && !taosProcExists(pWrapper->procId)) {
|
||||
dInfo("node:%s, process not exist, pid:%d", pWrapper->name, pWrapper->procId);
|
||||
if (pWrapper->procId <= 0 || !taosProcExists(pWrapper->procId)) {
|
||||
dInfo("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
|
||||
dndNewProc(pWrapper, n);
|
||||
}
|
||||
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t dndRunInChildProcess(SDnode *pDnode) {
|
||||
dInfo("dnode run in child process");
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
|
||||
dInfo("%s run in child process", pWrapper->name);
|
||||
|
||||
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
|
||||
tmsgSetDefaultMsgCb(&msgCb);
|
||||
|
|
|
@ -15,82 +15,186 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dndInt.h"
|
||||
#include "wal.h"
|
||||
|
||||
static int8_t once = DND_ENV_INIT;
|
||||
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
|
||||
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
|
||||
pDnode->serverPort = pOption->serverPort;
|
||||
pDnode->dataDir = strdup(pOption->dataDir);
|
||||
pDnode->localEp = strdup(pOption->localEp);
|
||||
pDnode->localFqdn = strdup(pOption->localFqdn);
|
||||
pDnode->firstEp = strdup(pOption->firstEp);
|
||||
pDnode->secondEp = strdup(pOption->secondEp);
|
||||
pDnode->disks = pOption->disks;
|
||||
pDnode->numOfDisks = pOption->numOfDisks;
|
||||
pDnode->ntype = pOption->ntype;
|
||||
pDnode->rebootTime = taosGetTimestampMs();
|
||||
|
||||
int32_t dndInit() {
|
||||
dDebug("start to init dnode env");
|
||||
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
|
||||
terrno = TSDB_CODE_REPEAT_INIT;
|
||||
dError("failed to init dnode env since %s", terrstr());
|
||||
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
|
||||
pDnode->secondEp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosIgnSIGPIPE();
|
||||
taosBlockSIGPIPE();
|
||||
taosResolveCRC();
|
||||
|
||||
SMonCfg monCfg = {0};
|
||||
monCfg.maxLogs = tsMonitorMaxLogs;
|
||||
monCfg.port = tsMonitorPort;
|
||||
monCfg.server = tsMonitorFqdn;
|
||||
monCfg.comp = tsMonitorComp;
|
||||
if (monInit(&monCfg) != 0) {
|
||||
dError("failed to init monitor since %s", terrstr());
|
||||
return -1;
|
||||
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
|
||||
pDnode->lockfile = dndCheckRunning(pDnode->dataDir);
|
||||
if (pDnode->lockfile == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
dInfo("dnode env is initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dndCleanup() {
|
||||
dDebug("start to cleanup dnode env");
|
||||
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
|
||||
dError("dnode env is already cleaned up");
|
||||
static void dndClearVars(SDnode *pDnode) {
|
||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
|
||||
taosMemoryFreeClear(pMgmt->path);
|
||||
}
|
||||
if (pDnode->lockfile != NULL) {
|
||||
taosUnLockFile(pDnode->lockfile);
|
||||
taosCloseFile(&pDnode->lockfile);
|
||||
pDnode->lockfile = NULL;
|
||||
}
|
||||
taosMemoryFreeClear(pDnode->localEp);
|
||||
taosMemoryFreeClear(pDnode->localFqdn);
|
||||
taosMemoryFreeClear(pDnode->firstEp);
|
||||
taosMemoryFreeClear(pDnode->secondEp);
|
||||
taosMemoryFreeClear(pDnode->dataDir);
|
||||
taosMemoryFree(pDnode);
|
||||
dDebug("dnode memory is cleared, data:%p", pDnode);
|
||||
}
|
||||
|
||||
SDnode *dndCreate(const SDnodeOpt *pOption) {
|
||||
dDebug("start to create dnode object");
|
||||
int32_t code = -1;
|
||||
char path[PATH_MAX] = {0};
|
||||
SDnode *pDnode = NULL;
|
||||
|
||||
pDnode = taosMemoryCalloc(1, sizeof(SDnode));
|
||||
if (pDnode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (dndInitVars(pDnode, pOption) != 0) {
|
||||
dError("failed to init variables since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
dndSetStatus(pDnode, DND_STAT_INIT);
|
||||
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
|
||||
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
|
||||
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
|
||||
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
|
||||
smGetMgmtFp(&pDnode->wrappers[SNODE]);
|
||||
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
|
||||
|
||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
|
||||
pWrapper->path = strdup(path);
|
||||
pWrapper->shm.id = -1;
|
||||
pWrapper->pDnode = pDnode;
|
||||
if (pWrapper->path == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pWrapper->procType = PROC_SINGLE;
|
||||
taosInitRWLatch(&pWrapper->latch);
|
||||
}
|
||||
|
||||
if (dndInitMsgHandle(pDnode) != 0) {
|
||||
dError("failed to msg handles since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (dndReadShmFile(pDnode) != 0) {
|
||||
dError("failed to read shm file since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
|
||||
tmsgSetDefaultMsgCb(&msgCb);
|
||||
|
||||
dInfo("dnode is created, data:%p", pDnode);
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && pDnode) {
|
||||
dndClearVars(pDnode);
|
||||
pDnode = NULL;
|
||||
dError("failed to create dnode since %s", terrstr());
|
||||
}
|
||||
|
||||
return pDnode;
|
||||
}
|
||||
|
||||
void dndClose(SDnode *pDnode) {
|
||||
if (pDnode == NULL) return;
|
||||
|
||||
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
|
||||
dError("dnode is shutting down, data:%p", pDnode);
|
||||
return;
|
||||
}
|
||||
|
||||
monCleanup();
|
||||
walCleanUp();
|
||||
taosStopCacheRefreshWorker();
|
||||
dInfo("dnode env is cleaned up");
|
||||
dInfo("start to close dnode, data:%p", pDnode);
|
||||
dndSetStatus(pDnode, DND_STAT_STOPPED);
|
||||
|
||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
dndCloseNode(pWrapper);
|
||||
}
|
||||
|
||||
dndClearVars(pDnode);
|
||||
dInfo("dnode is closed, data:%p", pDnode);
|
||||
}
|
||||
|
||||
void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
|
||||
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
|
||||
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
|
||||
}
|
||||
|
||||
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
|
||||
|
||||
void dndSetStatus(SDnode *pDnode, EDndStatus status) {
|
||||
if (pDnode->status != status) {
|
||||
dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status));
|
||||
pDnode->status = status;
|
||||
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
|
||||
dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode);
|
||||
if (event == DND_EVENT_STOP) {
|
||||
pDnode->event = event;
|
||||
}
|
||||
}
|
||||
|
||||
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
||||
SStartupReq *pStartup = &pDnode->startup;
|
||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||
pStartup->finished = 0;
|
||||
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||
SMgmtWrapper *pRetWrapper = pWrapper;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
if (pWrapper->deployed) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
pRetWrapper = NULL;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
||||
return pRetWrapper;
|
||||
}
|
||||
|
||||
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
|
||||
memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq));
|
||||
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
|
||||
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
code = -1;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
dDebug("startup req is received");
|
||||
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
|
||||
dndGetStartup(pDnode, pStartup);
|
||||
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
|
||||
if (pWrapper == NULL) return;
|
||||
|
||||
dDebug("startup req is sent, step:%s desc:%s finished:%d", pStartup->name, pStartup->desc, pStartup->finished);
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pReq->handle, .pCont = pStartup, .contLen = sizeof(SStartupReq), .ahandle = pReq->ahandle};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
}
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
|
||||
}
|
|
@ -1,200 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dndInt.h"
|
||||
|
||||
static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
|
||||
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
|
||||
pDnode->serverPort = pOption->serverPort;
|
||||
pDnode->dataDir = strdup(pOption->dataDir);
|
||||
pDnode->localEp = strdup(pOption->localEp);
|
||||
pDnode->localFqdn = strdup(pOption->localFqdn);
|
||||
pDnode->firstEp = strdup(pOption->firstEp);
|
||||
pDnode->secondEp = strdup(pOption->secondEp);
|
||||
pDnode->disks = pOption->disks;
|
||||
pDnode->numOfDisks = pOption->numOfDisks;
|
||||
pDnode->ntype = pOption->ntype;
|
||||
pDnode->rebootTime = taosGetTimestampMs();
|
||||
|
||||
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
|
||||
pDnode->secondEp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
|
||||
pDnode->lockfile = dndCheckRunning(pDnode->dataDir);
|
||||
if (pDnode->lockfile == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void dndClearVars(SDnode *pDnode) {
|
||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||
SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
|
||||
taosMemoryFreeClear(pMgmt->path);
|
||||
}
|
||||
if (pDnode->lockfile != NULL) {
|
||||
taosUnLockFile(pDnode->lockfile);
|
||||
taosCloseFile(&pDnode->lockfile);
|
||||
pDnode->lockfile = NULL;
|
||||
}
|
||||
taosMemoryFreeClear(pDnode->localEp);
|
||||
taosMemoryFreeClear(pDnode->localFqdn);
|
||||
taosMemoryFreeClear(pDnode->firstEp);
|
||||
taosMemoryFreeClear(pDnode->secondEp);
|
||||
taosMemoryFreeClear(pDnode->dataDir);
|
||||
taosMemoryFree(pDnode);
|
||||
dDebug("dnode memory is cleared, data:%p", pDnode);
|
||||
}
|
||||
|
||||
SDnode *dndCreate(const SDnodeOpt *pOption) {
|
||||
dDebug("start to create dnode object");
|
||||
int32_t code = -1;
|
||||
char path[PATH_MAX] = {0};
|
||||
SDnode *pDnode = NULL;
|
||||
|
||||
pDnode = taosMemoryCalloc(1, sizeof(SDnode));
|
||||
if (pDnode == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (dndInitVars(pDnode, pOption) != 0) {
|
||||
dError("failed to init variables since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
dndSetStatus(pDnode, DND_STAT_INIT);
|
||||
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
|
||||
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
|
||||
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
|
||||
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
|
||||
smGetMgmtFp(&pDnode->wrappers[SNODE]);
|
||||
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
|
||||
|
||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
snprintf(path, sizeof(path), "%s%s%s", pDnode->dataDir, TD_DIRSEP, pWrapper->name);
|
||||
pWrapper->path = strdup(path);
|
||||
pWrapper->shm.id = -1;
|
||||
pWrapper->pDnode = pDnode;
|
||||
if (pWrapper->path == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
pWrapper->procType = PROC_SINGLE;
|
||||
taosInitRWLatch(&pWrapper->latch);
|
||||
}
|
||||
|
||||
if (dndInitMsgHandle(pDnode) != 0) {
|
||||
dError("failed to msg handles since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (dndReadShmFile(pDnode) != 0) {
|
||||
dError("failed to read shm file since %s", terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
|
||||
tmsgSetDefaultMsgCb(&msgCb);
|
||||
|
||||
dInfo("dnode is created, data:%p", pDnode);
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && pDnode) {
|
||||
dndClearVars(pDnode);
|
||||
pDnode = NULL;
|
||||
dError("failed to create dnode since %s", terrstr());
|
||||
}
|
||||
|
||||
return pDnode;
|
||||
}
|
||||
|
||||
void dndClose(SDnode *pDnode) {
|
||||
if (pDnode == NULL) return;
|
||||
|
||||
if (dndGetStatus(pDnode) == DND_STAT_STOPPED) {
|
||||
dError("dnode is shutting down, data:%p", pDnode);
|
||||
return;
|
||||
}
|
||||
|
||||
dInfo("start to close dnode, data:%p", pDnode);
|
||||
dndSetStatus(pDnode, DND_STAT_STOPPED);
|
||||
|
||||
for (ENodeType n = 0; n < NODE_MAX; ++n) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||
dndCloseNode(pWrapper);
|
||||
}
|
||||
|
||||
dndClearVars(pDnode);
|
||||
dInfo("dnode is closed, data:%p", pDnode);
|
||||
}
|
||||
|
||||
void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
|
||||
dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode);
|
||||
if (event == DND_EVENT_STOP) {
|
||||
pDnode->event = event;
|
||||
}
|
||||
}
|
||||
|
||||
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||
SMgmtWrapper *pRetWrapper = pWrapper;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
if (pWrapper->deployed) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
pRetWrapper = NULL;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
||||
return pRetWrapper;
|
||||
}
|
||||
|
||||
int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
code = -1;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void dndReleaseWrapper(SMgmtWrapper *pWrapper) {
|
||||
if (pWrapper == NULL) return;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
|
||||
}
|
|
@ -23,6 +23,9 @@ int32_t taosNewProc(char **args) {
|
|||
int32_t pid = fork();
|
||||
if (pid == 0) {
|
||||
args[0] = tsProcPath;
|
||||
close(STDIN_FILENO);
|
||||
close(STDOUT_FILENO);
|
||||
close(STDERR_FILENO);
|
||||
return execvp(tsProcPath, args);
|
||||
} else {
|
||||
return pid;
|
||||
|
|
|
@ -590,12 +590,12 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
|
|||
}
|
||||
|
||||
int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
||||
uInfo("load from global env variables success");
|
||||
uInfo("load from global env variables not implemented yet");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *filepath) {
|
||||
uInfo("load from env file [%s] success", filepath);
|
||||
uInfo("load from env file not implemented yet");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -649,11 +649,11 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
|||
taosCloseFile(&pFile);
|
||||
if (line != NULL) taosMemoryFreeClear(line);
|
||||
|
||||
uInfo("load from cfg file [%s] success", filepath);
|
||||
uInfo("load from cfg file %s success", filepath);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
|
||||
uInfo("load from apoll url [%s] success", url);
|
||||
uInfo("load from apoll url not implemented yet");
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -140,14 +140,16 @@ static void taosProcDestroySem(SProcQueue *pQueue) {
|
|||
pQueue->sem = NULL;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static void taosProcCleanupQueue(SProcQueue *pQueue) {
|
||||
#if 0
|
||||
if (pQueue != NULL) {
|
||||
taosProcDestroyMutex(pQueue);
|
||||
taosProcDestroySem(pQueue);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
|
||||
int32_t rawBodyLen, ProcFuncType ftype) {
|
||||
|
@ -222,7 +224,6 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
|||
taosThreadMutexLock(&pQueue->mutex);
|
||||
if (pQueue->total - pQueue->avail <= 0) {
|
||||
taosThreadMutexUnlock(&pQueue->mutex);
|
||||
tsem_post(&pQueue->sem);
|
||||
terrno = TSDB_CODE_OUT_OF_SHM_MEM;
|
||||
return 0;
|
||||
}
|
||||
|
@ -317,7 +318,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
|
|||
pProc->pChildQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + cstart, csize);
|
||||
pProc->pParentQueue = taosProcInitQueue(pCfg->name, pCfg->isChild, (char *)pCfg->shm.ptr + pstart, psize);
|
||||
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
|
||||
// taosProcCleanupQueue(pProc->pChildQueue);
|
||||
taosProcCleanupQueue(pProc->pChildQueue);
|
||||
taosMemoryFree(pProc);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -370,13 +371,13 @@ static void taosProcThreadLoop(SProcObj *pProc) {
|
|||
freeBodyFp = pProc->parentFreeBodyFp;
|
||||
}
|
||||
|
||||
uDebug("proc:%s, start to get msg from queue:%p, isChild:%d", pProc->name, pQueue, pProc->isChild);
|
||||
uDebug("proc:%s, start to get msg from queue:%p", pProc->name, pQueue);
|
||||
|
||||
while (1) {
|
||||
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype, mallocHeadFp, freeHeadFp,
|
||||
mallocBodyFp, freeBodyFp);
|
||||
if (numOfMsgs == 0) {
|
||||
uInfo("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
|
||||
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pProc->name, pQueue);
|
||||
break;
|
||||
} else if (numOfMsgs < 0) {
|
||||
uTrace("proc:%s, get no msg from queue:%p since %s", pProc->name, pQueue, terrstr());
|
||||
|
@ -406,7 +407,7 @@ int32_t taosProcRun(SProcObj *pProc) {
|
|||
static void taosProcStop(SProcObj *pProc) {
|
||||
if (!taosCheckPthreadValid(pProc->thread)) return;
|
||||
|
||||
uDebug("proc:%s, start to join thread:%" PRId64 ", isChild:%d", pProc->name, pProc->thread, pProc->isChild);
|
||||
uDebug("proc:%s, start to join thread:%" PRId64, pProc->name, pProc->thread);
|
||||
SProcQueue *pQueue;
|
||||
if (pProc->isChild) {
|
||||
pQueue = pProc->pChildQueue;
|
||||
|
@ -421,9 +422,9 @@ void taosProcCleanup(SProcObj *pProc) {
|
|||
if (pProc != NULL) {
|
||||
uDebug("proc:%s, start to clean up", pProc->name);
|
||||
taosProcStop(pProc);
|
||||
taosProcCleanupQueue(pProc->pChildQueue);
|
||||
taosProcCleanupQueue(pProc->pParentQueue);
|
||||
uDebug("proc:%s, is cleaned up", pProc->name);
|
||||
// taosProcCleanupQueue(pProc->pChildQueue);
|
||||
// taosProcCleanupQueue(pProc->pParentQueue);
|
||||
taosMemoryFree(pProc);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue