|
|
|
@ -27,46 +27,42 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
|
|
|
|
|
return required;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
|
|
|
|
|
if (taosMkDir(pWrapper->path) != 0) {
|
|
|
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
|
|
|
|
|
static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) {
|
|
|
|
|
int32_t shmsize = tsMnodeShmSize;
|
|
|
|
|
if (pWrapper->ntype == VNODES) {
|
|
|
|
|
shmsize = tsVnodeShmSize;
|
|
|
|
|
} else if (pWrapper->ntype == QNODE) {
|
|
|
|
|
shmsize = tsQnodeShmSize;
|
|
|
|
|
} else if (pWrapper->ntype == SNODE) {
|
|
|
|
|
shmsize = tsSnodeShmSize;
|
|
|
|
|
} else if (pWrapper->ntype == MNODE) {
|
|
|
|
|
shmsize = tsMnodeShmSize;
|
|
|
|
|
} else if (pWrapper->ntype == BNODE) {
|
|
|
|
|
shmsize = tsBnodeShmSize;
|
|
|
|
|
} else {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
|
|
|
|
|
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
|
|
|
|
if (taosCreateShm(&pWrapper->shm, pWrapper->ntype, shmsize) != 0) {
|
|
|
|
|
terrno = TAOS_SYSTEM_ERROR(terrno);
|
|
|
|
|
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
|
|
|
|
|
|
|
|
|
|
SProcCfg cfg = dndGenProcCfg(pWrapper);
|
|
|
|
|
cfg.isChild = false;
|
|
|
|
|
pWrapper->procType = PROC_PARENT;
|
|
|
|
|
pWrapper->pProc = taosProcInit(&cfg);
|
|
|
|
|
if (pWrapper->pProc == NULL) {
|
|
|
|
|
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dDebug("node:%s, has been opened", pWrapper->name);
|
|
|
|
|
pWrapper->deployed = true;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void dndCloseNode(SMgmtWrapper *pWrapper) {
|
|
|
|
|
dDebug("node:%s, mgmt start to close", pWrapper->name);
|
|
|
|
|
pWrapper->required = false;
|
|
|
|
|
taosWLockLatch(&pWrapper->latch);
|
|
|
|
|
if (pWrapper->deployed) {
|
|
|
|
|
(*pWrapper->fp.closeFp)(pWrapper);
|
|
|
|
|
pWrapper->deployed = false;
|
|
|
|
|
}
|
|
|
|
|
taosWUnLockLatch(&pWrapper->latch);
|
|
|
|
|
|
|
|
|
|
while (pWrapper->refCount > 0) {
|
|
|
|
|
taosMsleep(10);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pWrapper->pProc) {
|
|
|
|
|
taosProcCleanup(pWrapper->pProc);
|
|
|
|
|
pWrapper->pProc = NULL;
|
|
|
|
|
}
|
|
|
|
|
dDebug("node:%s, mgmt has been closed", pWrapper->name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
|
|
|
|
|
static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndType n) {
|
|
|
|
|
char tstr[8] = {0};
|
|
|
|
|
char *args[6] = {0};
|
|
|
|
|
snprintf(tstr, sizeof(tstr), "%d", n);
|
|
|
|
@ -88,6 +84,86 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t dndRunNodeProc(SMgmtWrapper *pWrapper) {
|
|
|
|
|
if (pWrapper->pDnode->ntype == NODE_MAX) {
|
|
|
|
|
dInfo("node:%s, should be started manually", pWrapper->name);
|
|
|
|
|
} else {
|
|
|
|
|
if (dndNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (taosProcRun(pWrapper->pProc) != 0) {
|
|
|
|
|
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t dndOpenNodeImp(SMgmtWrapper *pWrapper) {
|
|
|
|
|
if (taosMkDir(pWrapper->path) != 0) {
|
|
|
|
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
|
|
|
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
|
|
|
|
|
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dDebug("node:%s, has been opened", pWrapper->name);
|
|
|
|
|
pWrapper->deployed = true;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
|
|
|
|
|
SDnode *pDnode = pWrapper->pDnode;
|
|
|
|
|
if (pDnode->procType == PROC_SINGLE) {
|
|
|
|
|
return dndOpenNodeImp(pWrapper);
|
|
|
|
|
} else if (pDnode->procType == PROC_PARENT) {
|
|
|
|
|
if (dndInitNodeProc(pWrapper) != 0) return -1;
|
|
|
|
|
if (dndWriteShmFile(pDnode) != 0) return -1;
|
|
|
|
|
if (dndRunNodeProc(pWrapper) != 0) return -1;
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void dndCloseNodeImp(SMgmtWrapper *pWrapper) {
|
|
|
|
|
dDebug("node:%s, mgmt start to close", pWrapper->name);
|
|
|
|
|
pWrapper->required = false;
|
|
|
|
|
taosWLockLatch(&pWrapper->latch);
|
|
|
|
|
if (pWrapper->deployed) {
|
|
|
|
|
(*pWrapper->fp.closeFp)(pWrapper);
|
|
|
|
|
pWrapper->deployed = false;
|
|
|
|
|
}
|
|
|
|
|
taosWUnLockLatch(&pWrapper->latch);
|
|
|
|
|
|
|
|
|
|
while (pWrapper->refCount > 0) {
|
|
|
|
|
taosMsleep(10);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pWrapper->pProc) {
|
|
|
|
|
taosProcCleanup(pWrapper->pProc);
|
|
|
|
|
pWrapper->pProc = NULL;
|
|
|
|
|
}
|
|
|
|
|
dDebug("node:%s, mgmt has been closed", pWrapper->name);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void dndCloseNode(SMgmtWrapper *pWrapper) {
|
|
|
|
|
if (pWrapper->pDnode->procType == PROC_PARENT) {
|
|
|
|
|
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
|
|
|
|
|
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
|
|
|
|
|
taosKillProc(pWrapper->procId);
|
|
|
|
|
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
|
|
|
|
|
taosWaitProc(pWrapper->procId);
|
|
|
|
|
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
dndCloseNodeImp(pWrapper);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void dndProcessProcHandle(void *handle) {
|
|
|
|
|
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
|
|
|
|
|
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
|
|
|
|
@ -96,13 +172,14 @@ static void dndProcessProcHandle(void *handle) {
|
|
|
|
|
|
|
|
|
|
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
|
|
|
|
dInfo("dnode run in single process");
|
|
|
|
|
pDnode->procType = PROC_SINGLE;
|
|
|
|
|
|
|
|
|
|
for (EDndType n = DNODE; n < NODE_MAX; ++n) {
|
|
|
|
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
|
|
|
|
pWrapper->required = dndRequireNode(pWrapper);
|
|
|
|
|
if (!pWrapper->required) continue;
|
|
|
|
|
|
|
|
|
|
if (dndOpenNode(pWrapper) != 0) {
|
|
|
|
|
if (dndOpenNodeImp(pWrapper) != 0) {
|
|
|
|
|
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
@ -136,8 +213,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
|
|
|
|
|
|
|
|
|
static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
|
|
|
|
dInfo("dnode run in parent process");
|
|
|
|
|
pDnode->procType = PROC_PARENT;
|
|
|
|
|
|
|
|
|
|
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
|
|
|
|
|
if (dndOpenNode(pDWrapper) != 0) {
|
|
|
|
|
if (dndOpenNodeImp(pDWrapper) != 0) {
|
|
|
|
|
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
@ -146,36 +225,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
|
|
|
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
|
|
|
|
pWrapper->required = dndRequireNode(pWrapper);
|
|
|
|
|
if (!pWrapper->required) continue;
|
|
|
|
|
|
|
|
|
|
int32_t shmsize = tsMnodeShmSize;
|
|
|
|
|
if (n == VNODES) {
|
|
|
|
|
shmsize = tsVnodeShmSize;
|
|
|
|
|
} else if (n == QNODE) {
|
|
|
|
|
shmsize = tsQnodeShmSize;
|
|
|
|
|
} else if (n == SNODE) {
|
|
|
|
|
shmsize = tsSnodeShmSize;
|
|
|
|
|
} else if (n == MNODE) {
|
|
|
|
|
shmsize = tsMnodeShmSize;
|
|
|
|
|
} else if (n == BNODE) {
|
|
|
|
|
shmsize = tsBnodeShmSize;
|
|
|
|
|
} else {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
|
|
|
|
|
terrno = TAOS_SYSTEM_ERROR(terrno);
|
|
|
|
|
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
|
|
|
|
|
|
|
|
|
|
SProcCfg cfg = dndGenProcCfg(pWrapper);
|
|
|
|
|
cfg.isChild = false;
|
|
|
|
|
pWrapper->procType = PROC_PARENT;
|
|
|
|
|
pWrapper->pProc = taosProcInit(&cfg);
|
|
|
|
|
if (pWrapper->pProc == NULL) {
|
|
|
|
|
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
if (dndInitNodeProc(pWrapper) != 0) return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (dndWriteShmFile(pDnode) != 0) {
|
|
|
|
@ -186,19 +236,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
|
|
|
|
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
|
|
|
|
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
|
|
|
|
if (!pWrapper->required) continue;
|
|
|
|
|
|
|
|
|
|
if (pDnode->ntype == NODE_MAX) {
|
|
|
|
|
dInfo("node:%s, should be started manually", pWrapper->name);
|
|
|
|
|
} else {
|
|
|
|
|
if (dndNewProc(pWrapper, n) != 0) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (taosProcRun(pWrapper->pProc) != 0) {
|
|
|
|
|
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
if (dndRunNodeProc(pWrapper) != 0) return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dndSetStatus(pDnode, DND_STAT_RUNNING);
|
|
|
|
@ -239,7 +277,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
|
|
|
|
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
|
|
|
|
|
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
|
|
|
|
|
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
|
|
|
|
|
dndNewProc(pWrapper, n);
|
|
|
|
|
dndNewNodeProc(pWrapper, n);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -253,6 +291,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
|
|
|
|
static int32_t dndRunInChildProcess(SDnode *pDnode) {
|
|
|
|
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
|
|
|
|
|
dInfo("%s run in child process", pWrapper->name);
|
|
|
|
|
pDnode->procType = PROC_CHILD;
|
|
|
|
|
|
|
|
|
|
pWrapper->required = dndRequireNode(pWrapper);
|
|
|
|
|
if (!pWrapper->required) {
|
|
|
|
@ -264,7 +303,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
|
|
|
|
|
tmsgSetDefaultMsgCb(&msgCb);
|
|
|
|
|
pWrapper->procType = PROC_CHILD;
|
|
|
|
|
|
|
|
|
|
if (dndOpenNode(pWrapper) != 0) {
|
|
|
|
|
if (dndOpenNodeImp(pWrapper) != 0) {
|
|
|
|
|
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|