Merge pull request #11418 from taosdata/feature/node
fix(cluster): get monitor info in multi-process mode
This commit is contained in:
commit
b9f5237d8d
|
@ -78,6 +78,9 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
float uptime; // day
|
float uptime; // day
|
||||||
int8_t has_mnode;
|
int8_t has_mnode;
|
||||||
|
int8_t has_qnode;
|
||||||
|
int8_t has_snode;
|
||||||
|
int8_t has_bnode;
|
||||||
SMonDiskDesc logdir;
|
SMonDiskDesc logdir;
|
||||||
SMonDiskDesc tempdir;
|
SMonDiskDesc tempdir;
|
||||||
} SMonDnodeInfo;
|
} SMonDnodeInfo;
|
||||||
|
@ -134,8 +137,8 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t expire_time;
|
int32_t expire_time;
|
||||||
int32_t timeseries_used;
|
int64_t timeseries_used;
|
||||||
int32_t timeseries_total;
|
int64_t timeseries_total;
|
||||||
} SMonGrantInfo;
|
} SMonGrantInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -39,7 +39,7 @@ int32_t taosGetEmail(char *email, int32_t maxLen);
|
||||||
int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen);
|
int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen);
|
||||||
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
|
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
|
||||||
int32_t taosGetCpuCores(float *numOfCores);
|
int32_t taosGetCpuCores(float *numOfCores);
|
||||||
int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine);
|
void taosGetCpuUsage(double *cpu_system, double *cpu_engine);
|
||||||
int32_t taosGetTotalMemory(int64_t *totalKB);
|
int32_t taosGetTotalMemory(int64_t *totalKB);
|
||||||
int32_t taosGetProcMemory(int64_t *usedKB);
|
int32_t taosGetProcMemory(int64_t *usedKB);
|
||||||
int32_t taosGetSysMemory(int64_t *usedKB);
|
int32_t taosGetSysMemory(int64_t *usedKB);
|
||||||
|
|
|
@ -58,7 +58,7 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId);
|
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return bmOpen(pWrapper);
|
return dndOpenNode(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,6 +77,7 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
dError("failed to drop bnode since %s", terrstr());
|
dError("failed to drop bnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
|
// dndCloseNode(pWrapper);
|
||||||
return bmDrop(pWrapper);
|
return bmDrop(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,11 +25,10 @@ static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
|
||||||
|
|
||||||
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
|
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
|
||||||
pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f);
|
pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f);
|
||||||
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
|
pInfo->has_mnode = pDnode->wrappers[MNODE].required;
|
||||||
if (pWrapper != NULL) {
|
pInfo->has_qnode = pDnode->wrappers[QNODE].required;
|
||||||
pInfo->has_mnode = pWrapper->required;
|
pInfo->has_snode = pDnode->wrappers[SNODE].required;
|
||||||
dndReleaseWrapper(pWrapper);
|
pInfo->has_bnode = pDnode->wrappers[BNODE].required;
|
||||||
}
|
|
||||||
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
|
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
|
||||||
pInfo->logdir.size = tsLogSpace.size;
|
pInfo->logdir.size = tsLogSpace.size;
|
||||||
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
|
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
|
||||||
|
@ -65,7 +64,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
|
||||||
bool getFromAPI = !tsMultiProcess;
|
bool getFromAPI = !tsMultiProcess;
|
||||||
pWrapper = &pDnode->wrappers[MNODE];
|
pWrapper = &pDnode->wrappers[MNODE];
|
||||||
if (getFromAPI) {
|
if (getFromAPI) {
|
||||||
if (dndMarkWrapper(pWrapper) != 0) {
|
if (dndMarkWrapper(pWrapper) == 0) {
|
||||||
mmGetMonitorInfo(pWrapper, &mmInfo);
|
mmGetMonitorInfo(pWrapper, &mmInfo);
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
@ -82,7 +81,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
|
||||||
|
|
||||||
pWrapper = &pDnode->wrappers[VNODES];
|
pWrapper = &pDnode->wrappers[VNODES];
|
||||||
if (getFromAPI) {
|
if (getFromAPI) {
|
||||||
if (dndMarkWrapper(pWrapper) != 0) {
|
if (dndMarkWrapper(pWrapper) == 0) {
|
||||||
vmGetMonitorInfo(pWrapper, &vmInfo);
|
vmGetMonitorInfo(pWrapper, &vmInfo);
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
@ -99,7 +98,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
|
||||||
|
|
||||||
pWrapper = &pDnode->wrappers[QNODE];
|
pWrapper = &pDnode->wrappers[QNODE];
|
||||||
if (getFromAPI) {
|
if (getFromAPI) {
|
||||||
if (dndMarkWrapper(pWrapper) != 0) {
|
if (dndMarkWrapper(pWrapper) == 0) {
|
||||||
qmGetMonitorInfo(pWrapper, &qmInfo);
|
qmGetMonitorInfo(pWrapper, &qmInfo);
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
@ -116,7 +115,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
|
||||||
|
|
||||||
pWrapper = &pDnode->wrappers[SNODE];
|
pWrapper = &pDnode->wrappers[SNODE];
|
||||||
if (getFromAPI) {
|
if (getFromAPI) {
|
||||||
if (dndMarkWrapper(pWrapper) != 0) {
|
if (dndMarkWrapper(pWrapper) == 0) {
|
||||||
smGetMonitorInfo(pWrapper, &smInfo);
|
smGetMonitorInfo(pWrapper, &smInfo);
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
@ -133,7 +132,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
|
||||||
|
|
||||||
pWrapper = &pDnode->wrappers[BNODE];
|
pWrapper = &pDnode->wrappers[BNODE];
|
||||||
if (getFromAPI) {
|
if (getFromAPI) {
|
||||||
if (dndMarkWrapper(pWrapper) != 0) {
|
if (dndMarkWrapper(pWrapper) == 0) {
|
||||||
bmGetMonitorInfo(pWrapper, &bmInfo);
|
bmGetMonitorInfo(pWrapper, &bmInfo);
|
||||||
dndReleaseWrapper(pWrapper);
|
dndReleaseWrapper(pWrapper);
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,6 +126,7 @@ typedef struct SDnode {
|
||||||
int32_t numOfDisks;
|
int32_t numOfDisks;
|
||||||
uint16_t serverPort;
|
uint16_t serverPort;
|
||||||
bool dropped;
|
bool dropped;
|
||||||
|
EProcType procType;
|
||||||
EDndType ntype;
|
EDndType ntype;
|
||||||
EDndStatus status;
|
EDndStatus status;
|
||||||
EDndEvent event;
|
EDndEvent event;
|
||||||
|
|
|
@ -27,46 +27,42 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
|
||||||
return required;
|
return required;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
|
static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) {
|
||||||
if (taosMkDir(pWrapper->path) != 0) {
|
int32_t shmsize = tsMnodeShmSize;
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
if (pWrapper->ntype == VNODES) {
|
||||||
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
|
shmsize = tsVnodeShmSize;
|
||||||
|
} else if (pWrapper->ntype == QNODE) {
|
||||||
|
shmsize = tsQnodeShmSize;
|
||||||
|
} else if (pWrapper->ntype == SNODE) {
|
||||||
|
shmsize = tsSnodeShmSize;
|
||||||
|
} else if (pWrapper->ntype == MNODE) {
|
||||||
|
shmsize = tsMnodeShmSize;
|
||||||
|
} else if (pWrapper->ntype == BNODE) {
|
||||||
|
shmsize = tsBnodeShmSize;
|
||||||
|
} else {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
|
if (taosCreateShm(&pWrapper->shm, pWrapper->ntype, shmsize) != 0) {
|
||||||
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
terrno = TAOS_SYSTEM_ERROR(terrno);
|
||||||
|
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
|
||||||
|
|
||||||
|
SProcCfg cfg = dndGenProcCfg(pWrapper);
|
||||||
|
cfg.isChild = false;
|
||||||
|
pWrapper->procType = PROC_PARENT;
|
||||||
|
pWrapper->pProc = taosProcInit(&cfg);
|
||||||
|
if (pWrapper->pProc == NULL) {
|
||||||
|
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dDebug("node:%s, has been opened", pWrapper->name);
|
|
||||||
pWrapper->deployed = true;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dndCloseNode(SMgmtWrapper *pWrapper) {
|
static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndType n) {
|
||||||
dDebug("node:%s, mgmt start to close", pWrapper->name);
|
|
||||||
pWrapper->required = false;
|
|
||||||
taosWLockLatch(&pWrapper->latch);
|
|
||||||
if (pWrapper->deployed) {
|
|
||||||
(*pWrapper->fp.closeFp)(pWrapper);
|
|
||||||
pWrapper->deployed = false;
|
|
||||||
}
|
|
||||||
taosWUnLockLatch(&pWrapper->latch);
|
|
||||||
|
|
||||||
while (pWrapper->refCount > 0) {
|
|
||||||
taosMsleep(10);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pWrapper->pProc) {
|
|
||||||
taosProcCleanup(pWrapper->pProc);
|
|
||||||
pWrapper->pProc = NULL;
|
|
||||||
}
|
|
||||||
dDebug("node:%s, mgmt has been closed", pWrapper->name);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
|
|
||||||
char tstr[8] = {0};
|
char tstr[8] = {0};
|
||||||
char *args[6] = {0};
|
char *args[6] = {0};
|
||||||
snprintf(tstr, sizeof(tstr), "%d", n);
|
snprintf(tstr, sizeof(tstr), "%d", n);
|
||||||
|
@ -88,6 +84,86 @@ static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t dndRunNodeProc(SMgmtWrapper *pWrapper) {
|
||||||
|
if (pWrapper->pDnode->ntype == NODE_MAX) {
|
||||||
|
dInfo("node:%s, should be started manually", pWrapper->name);
|
||||||
|
} else {
|
||||||
|
if (dndNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosProcRun(pWrapper->pProc) != 0) {
|
||||||
|
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t dndOpenNodeImp(SMgmtWrapper *pWrapper) {
|
||||||
|
if (taosMkDir(pWrapper->path) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
|
||||||
|
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dDebug("node:%s, has been opened", pWrapper->name);
|
||||||
|
pWrapper->deployed = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
|
||||||
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
|
if (pDnode->procType == PROC_SINGLE) {
|
||||||
|
return dndOpenNodeImp(pWrapper);
|
||||||
|
} else if (pDnode->procType == PROC_PARENT) {
|
||||||
|
if (dndInitNodeProc(pWrapper) != 0) return -1;
|
||||||
|
if (dndWriteShmFile(pDnode) != 0) return -1;
|
||||||
|
if (dndRunNodeProc(pWrapper) != 0) return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dndCloseNodeImp(SMgmtWrapper *pWrapper) {
|
||||||
|
dDebug("node:%s, mgmt start to close", pWrapper->name);
|
||||||
|
pWrapper->required = false;
|
||||||
|
taosWLockLatch(&pWrapper->latch);
|
||||||
|
if (pWrapper->deployed) {
|
||||||
|
(*pWrapper->fp.closeFp)(pWrapper);
|
||||||
|
pWrapper->deployed = false;
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pWrapper->latch);
|
||||||
|
|
||||||
|
while (pWrapper->refCount > 0) {
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pWrapper->pProc) {
|
||||||
|
taosProcCleanup(pWrapper->pProc);
|
||||||
|
pWrapper->pProc = NULL;
|
||||||
|
}
|
||||||
|
dDebug("node:%s, mgmt has been closed", pWrapper->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dndCloseNode(SMgmtWrapper *pWrapper) {
|
||||||
|
if (pWrapper->pDnode->procType == PROC_PARENT) {
|
||||||
|
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
|
||||||
|
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
|
||||||
|
taosKillProc(pWrapper->procId);
|
||||||
|
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
|
||||||
|
taosWaitProc(pWrapper->procId);
|
||||||
|
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
dndCloseNodeImp(pWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
static void dndProcessProcHandle(void *handle) {
|
static void dndProcessProcHandle(void *handle) {
|
||||||
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
|
dWarn("handle:%p, the child process dies and send an offline rsp", handle);
|
||||||
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
|
SRpcMsg rpcMsg = {.handle = handle, .code = TSDB_CODE_NODE_OFFLINE};
|
||||||
|
@ -96,13 +172,14 @@ static void dndProcessProcHandle(void *handle) {
|
||||||
|
|
||||||
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
||||||
dInfo("dnode run in single process");
|
dInfo("dnode run in single process");
|
||||||
|
pDnode->procType = PROC_SINGLE;
|
||||||
|
|
||||||
for (EDndType n = DNODE; n < NODE_MAX; ++n) {
|
for (EDndType n = DNODE; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
pWrapper->required = dndRequireNode(pWrapper);
|
pWrapper->required = dndRequireNode(pWrapper);
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
|
|
||||||
if (dndOpenNode(pWrapper) != 0) {
|
if (dndOpenNodeImp(pWrapper) != 0) {
|
||||||
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -136,8 +213,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
|
||||||
|
|
||||||
static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
dInfo("dnode run in parent process");
|
dInfo("dnode run in parent process");
|
||||||
|
pDnode->procType = PROC_PARENT;
|
||||||
|
|
||||||
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
|
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
|
||||||
if (dndOpenNode(pDWrapper) != 0) {
|
if (dndOpenNodeImp(pDWrapper) != 0) {
|
||||||
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
|
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -146,36 +225,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
pWrapper->required = dndRequireNode(pWrapper);
|
pWrapper->required = dndRequireNode(pWrapper);
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
|
if (dndInitNodeProc(pWrapper) != 0) return -1;
|
||||||
int32_t shmsize = tsMnodeShmSize;
|
|
||||||
if (n == VNODES) {
|
|
||||||
shmsize = tsVnodeShmSize;
|
|
||||||
} else if (n == QNODE) {
|
|
||||||
shmsize = tsQnodeShmSize;
|
|
||||||
} else if (n == SNODE) {
|
|
||||||
shmsize = tsSnodeShmSize;
|
|
||||||
} else if (n == MNODE) {
|
|
||||||
shmsize = tsMnodeShmSize;
|
|
||||||
} else if (n == BNODE) {
|
|
||||||
shmsize = tsBnodeShmSize;
|
|
||||||
} else {
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(terrno);
|
|
||||||
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
|
|
||||||
|
|
||||||
SProcCfg cfg = dndGenProcCfg(pWrapper);
|
|
||||||
cfg.isChild = false;
|
|
||||||
pWrapper->procType = PROC_PARENT;
|
|
||||||
pWrapper->pProc = taosProcInit(&cfg);
|
|
||||||
if (pWrapper->pProc == NULL) {
|
|
||||||
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndWriteShmFile(pDnode) != 0) {
|
if (dndWriteShmFile(pDnode) != 0) {
|
||||||
|
@ -186,19 +236,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
|
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
|
||||||
if (!pWrapper->required) continue;
|
if (!pWrapper->required) continue;
|
||||||
|
if (dndRunNodeProc(pWrapper) != 0) return -1;
|
||||||
if (pDnode->ntype == NODE_MAX) {
|
|
||||||
dInfo("node:%s, should be started manually", pWrapper->name);
|
|
||||||
} else {
|
|
||||||
if (dndNewProc(pWrapper, n) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosProcRun(pWrapper->pProc) != 0) {
|
|
||||||
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dndSetStatus(pDnode, DND_STAT_RUNNING);
|
dndSetStatus(pDnode, DND_STAT_RUNNING);
|
||||||
|
@ -239,7 +277,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
|
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
|
||||||
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
|
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
|
||||||
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
|
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
|
||||||
dndNewProc(pWrapper, n);
|
dndNewNodeProc(pWrapper, n);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -253,6 +291,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
|
||||||
static int32_t dndRunInChildProcess(SDnode *pDnode) {
|
static int32_t dndRunInChildProcess(SDnode *pDnode) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
|
||||||
dInfo("%s run in child process", pWrapper->name);
|
dInfo("%s run in child process", pWrapper->name);
|
||||||
|
pDnode->procType = PROC_CHILD;
|
||||||
|
|
||||||
pWrapper->required = dndRequireNode(pWrapper);
|
pWrapper->required = dndRequireNode(pWrapper);
|
||||||
if (!pWrapper->required) {
|
if (!pWrapper->required) {
|
||||||
|
@ -264,7 +303,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
|
||||||
tmsgSetDefaultMsgCb(&msgCb);
|
tmsgSetDefaultMsgCb(&msgCb);
|
||||||
pWrapper->procType = PROC_CHILD;
|
pWrapper->procType = PROC_CHILD;
|
||||||
|
|
||||||
if (dndOpenNode(pWrapper) != 0) {
|
if (dndOpenNodeImp(pWrapper) != 0) {
|
||||||
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,6 +80,7 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
dError("failed to drop mnode since %s", terrstr());
|
dError("failed to drop mnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
|
// dndCloseNode(pWrapper);
|
||||||
return mmDrop(pWrapper);
|
return mmDrop(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,7 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
dError("failed to create qnode since %s", terrstr());
|
dError("failed to create qnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return qmOpen(pWrapper);
|
return dndOpenNode(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -77,6 +77,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
dError("failed to drop qnode since %s", terrstr());
|
dError("failed to drop qnode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
|
// dndCloseNode(pWrapper);
|
||||||
return qmDrop(pWrapper);
|
return qmDrop(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
|
if (pMsg->rpcMsg.msgType == TDMT_MON_QM_INFO) {
|
||||||
code = qmProcessGetMonQmInfoReq(pMgmt->pWrapper, pMsg);
|
code = qmProcessGetMonQmInfoReq(pMgmt->pWrapper, pMsg);
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
|
|
@ -58,7 +58,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
dError("failed to create snode since %s", terrstr());
|
dError("failed to create snode since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return smOpen(pWrapper);
|
return dndOpenNode(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,6 +78,7 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
return smDrop(pWrapper);
|
return smDrop(pWrapper);
|
||||||
|
// return dndCloseNode(pWrapper);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -375,6 +375,9 @@ static void monGenDnodeJson(SMonInfo *pMonitor) {
|
||||||
tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes);
|
tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes);
|
||||||
tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum);
|
tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum);
|
||||||
tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode);
|
tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode);
|
||||||
|
tjsonAddDoubleToObject(pJson, "has_qnode", pInfo->has_qnode);
|
||||||
|
tjsonAddDoubleToObject(pJson, "has_snode", pInfo->has_snode);
|
||||||
|
tjsonAddDoubleToObject(pJson, "has_bnode", pInfo->has_bnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void monGenDiskJson(SMonInfo *pMonitor) {
|
static void monGenDiskJson(SMonInfo *pMonitor) {
|
||||||
|
@ -530,7 +533,7 @@ void monSendReport() {
|
||||||
if (pCont != NULL) {
|
if (pCont != NULL) {
|
||||||
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
|
||||||
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
|
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
|
||||||
uError("failed to send monitor msg since %s", terrstr());
|
uError("failed to send monitor msg");
|
||||||
}
|
}
|
||||||
taosMemoryFree(pCont);
|
taosMemoryFree(pCont);
|
||||||
}
|
}
|
||||||
|
|
|
@ -194,9 +194,9 @@ int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) {
|
||||||
if (tDecodeCStrTo(decoder, desc.database_name) < 0) return -1;
|
if (tDecodeCStrTo(decoder, desc.database_name) < 0) return -1;
|
||||||
if (tDecodeCStrTo(decoder, desc.status) < 0) return -1;
|
if (tDecodeCStrTo(decoder, desc.status) < 0) return -1;
|
||||||
for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
|
for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
|
||||||
SMonVnodeDesc vdesc = {0};
|
SMonVnodeDesc *pVDesc = &desc.vnodes[j];
|
||||||
if (tDecodeI32(decoder, &vdesc.dnode_id) < 0) return -1;
|
if (tDecodeI32(decoder, &pVDesc->dnode_id) < 0) return -1;
|
||||||
if (tDecodeCStrTo(decoder, vdesc.vnode_role) < 0) return -1;
|
if (tDecodeCStrTo(decoder, pVDesc->vnode_role) < 0) return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(pInfo->vgroups, &desc);
|
taosArrayPush(pInfo->vgroups, &desc);
|
||||||
}
|
}
|
||||||
|
@ -205,15 +205,15 @@ int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) {
|
||||||
|
|
||||||
int32_t tEncodeSMonGrantInfo(SCoder *encoder, const SMonGrantInfo *pInfo) {
|
int32_t tEncodeSMonGrantInfo(SCoder *encoder, const SMonGrantInfo *pInfo) {
|
||||||
if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1;
|
if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1;
|
||||||
if (tEncodeI32(encoder, pInfo->timeseries_used) < 0) return -1;
|
if (tEncodeI64(encoder, pInfo->timeseries_used) < 0) return -1;
|
||||||
if (tEncodeI32(encoder, pInfo->timeseries_total) < 0) return -1;
|
if (tEncodeI64(encoder, pInfo->timeseries_total) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSMonGrantInfo(SCoder *decoder, SMonGrantInfo *pInfo) {
|
int32_t tDecodeSMonGrantInfo(SCoder *decoder, SMonGrantInfo *pInfo) {
|
||||||
if (tDecodeI32(decoder, &pInfo->expire_time) < 0) return -1;
|
if (tDecodeI32(decoder, &pInfo->expire_time) < 0) return -1;
|
||||||
if (tDecodeI32(decoder, &pInfo->timeseries_used) < 0) return -1;
|
if (tDecodeI64(decoder, &pInfo->timeseries_used) < 0) return -1;
|
||||||
if (tDecodeI32(decoder, &pInfo->timeseries_total) < 0) return -1;
|
if (tDecodeI64(decoder, &pInfo->timeseries_total) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -164,8 +164,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
|
||||||
wb[1] = uv_buf_init((char*)pCont, contLen);
|
wb[1] = uv_buf_init((char*)pCont, contLen);
|
||||||
|
|
||||||
connect->data = wb;
|
connect->data = wb;
|
||||||
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
|
||||||
uv_run(loop, UV_RUN_DEFAULT);
|
uv_run(loop, UV_RUN_DEFAULT);
|
||||||
uv_loop_close(loop);
|
uv_loop_close(loop);
|
||||||
taosMemoryFree(connect);
|
taosMemoryFree(connect);
|
||||||
|
|
|
@ -24,7 +24,7 @@ int32_t taosNewProc(char **args) {
|
||||||
if (pid == 0) {
|
if (pid == 0) {
|
||||||
args[0] = tsProcPath;
|
args[0] = tsProcPath;
|
||||||
// close(STDIN_FILENO);
|
// close(STDIN_FILENO);
|
||||||
close(STDOUT_FILENO);
|
// close(STDOUT_FILENO);
|
||||||
// close(STDERR_FILENO);
|
// close(STDERR_FILENO);
|
||||||
return execvp(tsProcPath, args);
|
return execvp(tsProcPath, args);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -369,53 +369,33 @@ int32_t taosGetCpuCores(float *numOfCores) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
|
void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
|
||||||
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
|
static int64_t lastSysUsed = 0;
|
||||||
|
static int64_t lastSysTotal = 0;
|
||||||
|
static int64_t lastProcTotal = 0;
|
||||||
|
static int64_t curSysUsed = 0;
|
||||||
|
static int64_t curSysTotal = 0;
|
||||||
|
static int64_t curProcTotal = 0;
|
||||||
|
|
||||||
*cpu_system = 0;
|
*cpu_system = 0;
|
||||||
*cpu_engine = 0;
|
*cpu_engine = 0;
|
||||||
return 0;
|
|
||||||
#elif defined(_TD_DARWIN_64)
|
|
||||||
*cpu_system = 0;
|
|
||||||
*cpu_engine = 0;
|
|
||||||
return 0;
|
|
||||||
#else
|
|
||||||
static uint64_t lastSysUsed = 0;
|
|
||||||
static uint64_t lastSysTotal = 0;
|
|
||||||
static uint64_t lastProcTotal = 0;
|
|
||||||
|
|
||||||
SysCpuInfo sysCpu;
|
SysCpuInfo sysCpu = {0};
|
||||||
ProcCpuInfo procCpu;
|
ProcCpuInfo procCpu = {0};
|
||||||
if (taosGetSysCpuInfo(&sysCpu) != 0) {
|
if (taosGetSysCpuInfo(&sysCpu) == 0 && taosGetProcCpuInfo(&procCpu) == 0) {
|
||||||
return -1;
|
curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system;
|
||||||
|
curSysTotal = curSysUsed + sysCpu.idle;
|
||||||
|
curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
|
||||||
|
|
||||||
|
if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
|
||||||
|
*cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
|
||||||
|
*cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100;
|
||||||
|
}
|
||||||
|
|
||||||
|
lastSysUsed = curSysUsed;
|
||||||
|
lastSysTotal = curSysTotal;
|
||||||
|
lastProcTotal = curProcTotal;
|
||||||
}
|
}
|
||||||
if (taosGetProcCpuInfo(&procCpu) != 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint64_t curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system;
|
|
||||||
uint64_t curSysTotal = curSysUsed + sysCpu.idle;
|
|
||||||
uint64_t curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
|
|
||||||
|
|
||||||
if (lastSysUsed == 0 || lastSysTotal == 0 || lastProcTotal == 0) {
|
|
||||||
lastSysUsed = curSysUsed > 1 ? curSysUsed : 1;
|
|
||||||
lastSysTotal = curSysTotal > 1 ? curSysTotal : 1;
|
|
||||||
lastProcTotal = curProcTotal > 1 ? curProcTotal : 1;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (curSysTotal == lastSysTotal) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
*cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
|
|
||||||
*cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100;
|
|
||||||
|
|
||||||
lastSysUsed = curSysUsed;
|
|
||||||
lastSysTotal = curSysTotal;
|
|
||||||
lastProcTotal = curProcTotal;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosGetTotalMemory(int64_t *totalKB) {
|
int32_t taosGetTotalMemory(int64_t *totalKB) {
|
||||||
|
@ -618,7 +598,6 @@ void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, i
|
||||||
static int64_t last_wchars = 0;
|
static int64_t last_wchars = 0;
|
||||||
static int64_t last_read_bytes = 0;
|
static int64_t last_read_bytes = 0;
|
||||||
static int64_t last_write_bytes = 0;
|
static int64_t last_write_bytes = 0;
|
||||||
|
|
||||||
static int64_t cur_rchars = 0;
|
static int64_t cur_rchars = 0;
|
||||||
static int64_t cur_wchars = 0;
|
static int64_t cur_wchars = 0;
|
||||||
static int64_t cur_read_bytes = 0;
|
static int64_t cur_read_bytes = 0;
|
||||||
|
@ -632,6 +611,11 @@ void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, i
|
||||||
last_wchars = cur_wchars;
|
last_wchars = cur_wchars;
|
||||||
last_read_bytes = cur_read_bytes;
|
last_read_bytes = cur_read_bytes;
|
||||||
last_write_bytes = cur_write_bytes;
|
last_write_bytes = cur_write_bytes;
|
||||||
|
} else {
|
||||||
|
*rchars = 0;
|
||||||
|
*wchars = 0;
|
||||||
|
*read_bytes = 0;
|
||||||
|
*write_bytes = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -693,7 +677,6 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
|
||||||
void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes) {
|
void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes) {
|
||||||
static int64_t last_receive_bytes = 0;
|
static int64_t last_receive_bytes = 0;
|
||||||
static int64_t last_transmit_bytes = 0;
|
static int64_t last_transmit_bytes = 0;
|
||||||
|
|
||||||
static int64_t cur_receive_bytes = 0;
|
static int64_t cur_receive_bytes = 0;
|
||||||
static int64_t cur_transmit_bytes = 0;
|
static int64_t cur_transmit_bytes = 0;
|
||||||
if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) == 0) {
|
if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) == 0) {
|
||||||
|
@ -701,6 +684,9 @@ void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes) {
|
||||||
*transmit_bytes = cur_transmit_bytes - last_transmit_bytes;
|
*transmit_bytes = cur_transmit_bytes - last_transmit_bytes;
|
||||||
last_receive_bytes = cur_receive_bytes;
|
last_receive_bytes = cur_receive_bytes;
|
||||||
last_transmit_bytes = cur_transmit_bytes;
|
last_transmit_bytes = cur_transmit_bytes;
|
||||||
|
} else {
|
||||||
|
*receive_bytes = 0;
|
||||||
|
*transmit_bytes = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/cfg.sh -n dnode1 -c monitorfqdn -v localhost
|
||||||
|
system sh/cfg.sh -n dnode1 -c monitorport -v 80
|
||||||
|
system sh/cfg.sh -n dnode1 -c monitorInterval -v 1
|
||||||
|
system sh/cfg.sh -n dnode1 -c monitorComp -v 1
|
||||||
|
#system sh/cfg.sh -n dnode1 -c supportVnodes -v 128
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print =============== show dnodes
|
||||||
|
sleep 2000
|
||||||
|
sql create database db vgroups 2;
|
||||||
|
sleep 2000
|
||||||
|
|
||||||
|
print =============== create drop qnode 1
|
||||||
|
sql create qnode on dnode 1
|
||||||
|
sql create snode on dnode 1
|
||||||
|
sql create bnode on dnode 1
|
||||||
|
|
||||||
|
return
|
||||||
|
print =============== restart
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
|
||||||
|
|
||||||
|
return
|
||||||
|
system sh/deploy.sh -n dnode2 -i 2
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
Loading…
Reference in New Issue