refactor: node mgmt
This commit is contained in:
parent
a195ecd725
commit
ace5940ce8
|
@ -1,205 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DM_DEF_H_
|
||||
#define _TD_DM_DEF_H_
|
||||
|
||||
// tobe deleted
|
||||
#include "uv.h"
|
||||
|
||||
#include "dmLog.h"
|
||||
|
||||
#include "cJSON.h"
|
||||
#include "tcache.h"
|
||||
#include "tcrc32c.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "thash.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tlog.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tprocess.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "tthread.h"
|
||||
#include "ttime.h"
|
||||
#include "tworker.h"
|
||||
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "monitor.h"
|
||||
#include "sync.h"
|
||||
#include "wal.h"
|
||||
|
||||
#include "libs/function/function.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef enum {
|
||||
DNODE = 0,
|
||||
VNODE = 1,
|
||||
QNODE = 2,
|
||||
SNODE = 3,
|
||||
MNODE = 4,
|
||||
BNODE = 5,
|
||||
NODE_END = 6,
|
||||
} EDndNodeType;
|
||||
|
||||
typedef enum {
|
||||
DND_STAT_INIT,
|
||||
DND_STAT_RUNNING,
|
||||
DND_STAT_STOPPED,
|
||||
} EDndRunStatus;
|
||||
|
||||
typedef enum {
|
||||
DND_ENV_INIT,
|
||||
DND_ENV_READY,
|
||||
DND_ENV_CLEANUP,
|
||||
} EDndEnvStatus;
|
||||
|
||||
typedef enum {
|
||||
DND_PROC_SINGLE,
|
||||
DND_PROC_CHILD,
|
||||
DND_PROC_PARENT,
|
||||
DND_PROC_TEST,
|
||||
} EDndProcType;
|
||||
|
||||
typedef int32_t (*NodeMsgFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*NodeOpenFp)(struct SMgmtWrapper *pWrapper);
|
||||
typedef void (*NodeCloseFp)(struct SMgmtWrapper *pWrapper);
|
||||
typedef int32_t (*NodeStartFp)(struct SMgmtWrapper *pWrapper);
|
||||
typedef void (*NodeStopFp)(struct SMgmtWrapper *pWrapper);
|
||||
typedef int32_t (*NodeCreateFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*NodeDropFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*NodeRequireFp)(struct SMgmtWrapper *pWrapper, bool *required);
|
||||
|
||||
typedef struct {
|
||||
EDndNodeType defaultNtype;
|
||||
bool needCheckVgId;
|
||||
} SMsgHandle;
|
||||
|
||||
typedef struct {
|
||||
NodeOpenFp openFp;
|
||||
NodeCloseFp closeFp;
|
||||
NodeStartFp startFp;
|
||||
NodeStopFp stopFp;
|
||||
NodeCreateFp createFp;
|
||||
NodeDropFp dropFp;
|
||||
NodeRequireFp requiredFp;
|
||||
} SMgmtFp;
|
||||
|
||||
typedef struct SMgmtWrapper {
|
||||
SDnode *pDnode;
|
||||
struct {
|
||||
const char *name;
|
||||
char *path;
|
||||
int32_t refCount;
|
||||
SRWLatch latch;
|
||||
EDndNodeType nodeType;
|
||||
bool deployed;
|
||||
bool required;
|
||||
SMgmtFp fp;
|
||||
void *pMgmt;
|
||||
};
|
||||
struct {
|
||||
EDndProcType procType;
|
||||
int32_t procId;
|
||||
SProcObj *procObj;
|
||||
SShm procShm;
|
||||
};
|
||||
struct {
|
||||
bool needCheckVgIds[TDMT_MAX];
|
||||
NodeMsgFp msgFps[TDMT_MAX];
|
||||
};
|
||||
} SMgmtWrapper;
|
||||
|
||||
typedef struct {
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
SMsgHandle msgHandles[TDMT_MAX];
|
||||
} SDnodeTrans;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
int64_t dnodeVer;
|
||||
int64_t updateTime;
|
||||
int64_t rebootTime;
|
||||
int32_t unsyncedVgId;
|
||||
ESyncState vndState;
|
||||
ESyncState mndState;
|
||||
bool isMnode;
|
||||
bool dropped;
|
||||
SEpSet mnodeEps;
|
||||
SArray *dnodeEps;
|
||||
SHashObj *dnodeHash;
|
||||
TdThread *statusThreadId;
|
||||
TdThread *monitorThreadId;
|
||||
SRWLatch latch;
|
||||
SSingleWorker mgmtWorker;
|
||||
SMsgCb msgCb;
|
||||
SDnode *pDnode;
|
||||
TdFilePtr lockfile;
|
||||
char *localEp;
|
||||
char *localFqdn;
|
||||
char *firstEp;
|
||||
char *secondEp;
|
||||
char *dataDir;
|
||||
SDiskCfg *disks;
|
||||
int32_t numOfDisks;
|
||||
int32_t supportVnodes;
|
||||
uint16_t serverPort;
|
||||
} SDnodeData;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_STEP_NAME_LEN];
|
||||
char desc[TSDB_STEP_DESC_LEN];
|
||||
} SStartupInfo;
|
||||
|
||||
typedef struct SUdfdData {
|
||||
bool startCalled;
|
||||
bool needCleanUp;
|
||||
uv_loop_t loop;
|
||||
uv_thread_t thread;
|
||||
uv_barrier_t barrier;
|
||||
uv_process_t process;
|
||||
int spawnErr;
|
||||
uv_pipe_t ctrlPipe;
|
||||
uv_async_t stopAsync;
|
||||
int32_t stopCalled;
|
||||
int32_t dnodeId;
|
||||
} SUdfdData;
|
||||
|
||||
typedef struct SDnode {
|
||||
EDndProcType ptype;
|
||||
EDndNodeType ntype;
|
||||
EDndRunStatus status;
|
||||
EDndEvent event;
|
||||
SStartupInfo startup;
|
||||
SDnodeTrans trans;
|
||||
SDnodeData data;
|
||||
SUdfdData udfdData;
|
||||
TdThreadMutex mutex;
|
||||
SMgmtWrapper wrappers[NODE_END];
|
||||
} SDnode;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DM_DEF_H_*/
|
|
@ -16,37 +16,132 @@
|
|||
#ifndef _TD_DM_INT_H_
|
||||
#define _TD_DM_INT_H_
|
||||
|
||||
#include "dmDef.h"
|
||||
#include "cJSON.h"
|
||||
#include "tcache.h"
|
||||
#include "tcrc32c.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "thash.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tlog.h"
|
||||
#include "tmsg.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tprocess.h"
|
||||
#include "tqueue.h"
|
||||
#include "trpc.h"
|
||||
#include "tthread.h"
|
||||
#include "ttime.h"
|
||||
#include "tworker.h"
|
||||
|
||||
#include "dnode.h"
|
||||
#include "mnode.h"
|
||||
#include "monitor.h"
|
||||
#include "sync.h"
|
||||
#include "wal.h"
|
||||
|
||||
#include "libs/function/function.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
// dmInt.c
|
||||
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
|
||||
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
|
||||
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
|
||||
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
typedef enum {
|
||||
DNODE = 0,
|
||||
VNODE = 1,
|
||||
QNODE = 2,
|
||||
SNODE = 3,
|
||||
MNODE = 4,
|
||||
BNODE = 5,
|
||||
NODE_END = 6,
|
||||
} EDndNodeType;
|
||||
|
||||
typedef enum {
|
||||
DND_STAT_INIT,
|
||||
DND_STAT_RUNNING,
|
||||
DND_STAT_STOPPED,
|
||||
} EDndRunStatus;
|
||||
|
||||
typedef enum {
|
||||
DND_ENV_INIT,
|
||||
DND_ENV_READY,
|
||||
DND_ENV_CLEANUP,
|
||||
} EDndEnvStatus;
|
||||
|
||||
typedef enum {
|
||||
DND_PROC_SINGLE,
|
||||
DND_PROC_CHILD,
|
||||
DND_PROC_PARENT,
|
||||
DND_PROC_TEST,
|
||||
} EDndProcType;
|
||||
|
||||
typedef struct {
|
||||
const char *path;
|
||||
const char *name;
|
||||
SMsgCb msgCb;
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
const char *dataDir;
|
||||
const char *localEp;
|
||||
const char *firstEp;
|
||||
const char *localFqdn;
|
||||
uint16_t serverPort;
|
||||
int32_t supportVnodes;
|
||||
} SMgmtInputOpt;
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
void *pMgmt;
|
||||
} SMgmtOutputOpt;
|
||||
|
||||
typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg);
|
||||
typedef int32_t (*NodeOpenFp)(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput);
|
||||
typedef void (*NodeCloseFp)(void *pMgmt);
|
||||
typedef int32_t (*NodeStartFp)(void *pMgmt);
|
||||
typedef void (*NodeStopFp)(void *pMgmt);
|
||||
typedef int32_t (*NodeCreateFp)(void *pMgmt, SNodeMsg *pMsg);
|
||||
typedef int32_t (*NodeDropFp)(void *pMgmt, SNodeMsg *pMsg);
|
||||
typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required);
|
||||
typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle
|
||||
|
||||
typedef struct {
|
||||
NodeOpenFp openFp;
|
||||
NodeCloseFp closeFp;
|
||||
NodeStartFp startFp;
|
||||
NodeStopFp stopFp;
|
||||
NodeCreateFp createFp;
|
||||
NodeDropFp dropFp;
|
||||
NodeRequireFp requiredFp;
|
||||
NodeGetHandlesFp getHandlesFp;
|
||||
} SMgmtFunc;
|
||||
|
||||
typedef struct {
|
||||
tmsg_t msgType;
|
||||
bool needCheckVgId;
|
||||
NodeMsgFp msgFp;
|
||||
} SMgmtHandle;
|
||||
|
||||
// dmUtil.c
|
||||
const char *dmStatStr(EDndRunStatus stype);
|
||||
const char *dmNodeLogName(EDndNodeType ntype);
|
||||
const char *dmNodeProcName(EDndNodeType ntype);
|
||||
const char *dmEventStr(EDndEvent etype);
|
||||
const char *dmProcStr(EDndProcType ptype);
|
||||
|
||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
||||
void dmSetEvent(SDnode *pDnode, EDndEvent event);
|
||||
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgIds);
|
||||
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
|
||||
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc);
|
||||
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||
void dmGetMonitorSysInfo(SMonSysInfo *pInfo);
|
||||
void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId);
|
||||
void dmGetSystemInfo(SMonSysInfo *pInfo);
|
||||
|
||||
// dmFile.c
|
||||
int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
|
||||
int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed);
|
||||
int32_t dmReadFile(const char *path, const char *name, bool *pDeployed);
|
||||
int32_t dmWriteFile(const char *path, const char *name, bool deployed);
|
||||
TdFilePtr dmCheckRunning(const char *dataDir);
|
||||
int32_t dmReadShmFile(SMgmtWrapper *pWrapper);
|
||||
int32_t dmWriteShmFile(SMgmtWrapper *pWrapper);
|
||||
int32_t dmReadShmFile(const char *path, const char *name, SShm *pShm);
|
||||
int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_DM_LOG_H_
|
||||
#define _TD_DM_LOG_H_
|
||||
|
||||
#include "tlog.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
|
||||
#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
|
||||
#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
|
||||
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
|
||||
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
|
||||
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_DM_LOG_H_*/
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
#define MAXLEN 1024
|
||||
|
||||
int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
|
||||
int32_t dmReadFile(const char *path, const char *name, bool *pDeployed) {
|
||||
int32_t code = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
int64_t len = 0;
|
||||
char content[MAXLEN + 1] = {0};
|
||||
|
@ -26,7 +26,7 @@ int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
|
|||
char file[PATH_MAX] = {0};
|
||||
TdFilePtr pFile = NULL;
|
||||
|
||||
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
|
||||
snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
|
||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
code = 0;
|
||||
|
@ -63,7 +63,7 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
|
||||
int32_t dmWriteFile(const char *path, const char *name, bool deployed) {
|
||||
int32_t code = -1;
|
||||
int32_t len = 0;
|
||||
char content[MAXLEN + 1] = {0};
|
||||
|
@ -71,8 +71,8 @@ int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
|
|||
char realfile[PATH_MAX] = {0};
|
||||
TdFilePtr pFile = NULL;
|
||||
|
||||
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
|
||||
snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
|
||||
snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name);
|
||||
snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name);
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
|
@ -139,14 +139,14 @@ TdFilePtr dmCheckRunning(const char *dataDir) {
|
|||
return pFile;
|
||||
}
|
||||
|
||||
int32_t dmReadShmFile(SMgmtWrapper *pWrapper) {
|
||||
int32_t dmReadShmFile(const char *path, const char *name, SShm *pShm) {
|
||||
int32_t code = -1;
|
||||
char content[MAXLEN + 1] = {0};
|
||||
char file[PATH_MAX] = {0};
|
||||
cJSON *root = NULL;
|
||||
TdFilePtr pFile = NULL;
|
||||
|
||||
snprintf(file, sizeof(file), "%s%sshmfile", pWrapper->path, TD_DIRSEP);
|
||||
snprintf(file, sizeof(file), "%s%sshmfile", path, TD_DIRSEP);
|
||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
code = 0;
|
||||
|
@ -157,36 +157,36 @@ int32_t dmReadShmFile(SMgmtWrapper *pWrapper) {
|
|||
root = cJSON_Parse(content);
|
||||
if (root == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
||||
dError("node:%s, failed to read %s since invalid json format", pWrapper->name, file);
|
||||
dError("node:%s, failed to read %s since invalid json format", name, file);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
cJSON *shmid = cJSON_GetObjectItem(root, "shmid");
|
||||
if (shmid && shmid->type == cJSON_Number) {
|
||||
pWrapper->procShm.id = shmid->valueint;
|
||||
pShm->id = shmid->valueint;
|
||||
}
|
||||
|
||||
cJSON *shmsize = cJSON_GetObjectItem(root, "shmsize");
|
||||
if (shmsize && shmsize->type == cJSON_Number) {
|
||||
pWrapper->procShm.size = shmsize->valueint;
|
||||
pShm->size = shmsize->valueint;
|
||||
}
|
||||
}
|
||||
|
||||
if (!tsMultiProcess) {
|
||||
if (pWrapper->procShm.id >= 0) {
|
||||
dDebug("node:%s, shmid:%d, is closed, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size);
|
||||
taosDropShm(&pWrapper->procShm);
|
||||
if (pShm->id >= 0) {
|
||||
dDebug("node:%s, shmid:%d, is closed, size:%d", name, pShm->id, pShm->size);
|
||||
taosDropShm(pShm);
|
||||
}
|
||||
} else {
|
||||
if (taosAttachShm(&pWrapper->procShm) != 0) {
|
||||
if (taosAttachShm(pShm) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("shmid:%d, failed to attach shm since %s", pWrapper->procShm.id, terrstr());
|
||||
dError("shmid:%d, failed to attach shm since %s", pShm->id, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
dInfo("node:%s, shmid:%d is attached, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size);
|
||||
dInfo("node:%s, shmid:%d is attached, size:%d", name, pShm->id, pShm->size);
|
||||
}
|
||||
|
||||
dDebug("node:%s, successed to load %s", pWrapper->name, file);
|
||||
dDebug("node:%s, successed to load %s", name, file);
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
|
@ -196,7 +196,7 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) {
|
||||
int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm) {
|
||||
int32_t code = -1;
|
||||
int32_t len = 0;
|
||||
char content[MAXLEN + 1] = {0};
|
||||
|
@ -204,30 +204,30 @@ int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) {
|
|||
char realfile[PATH_MAX] = {0};
|
||||
TdFilePtr pFile = NULL;
|
||||
|
||||
snprintf(file, sizeof(file), "%s%sshmfile.bak", pWrapper->path, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(realfile), "%s%sshmfile", pWrapper->path, TD_DIRSEP);
|
||||
snprintf(file, sizeof(file), "%s%sshmfile.bak", path, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(realfile), "%s%sshmfile", path, TD_DIRSEP);
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("node:%s, failed to open file:%s since %s", pWrapper->name, file, terrstr());
|
||||
dError("node:%s, failed to open file:%s since %s", name, file, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
len += snprintf(content + len, MAXLEN - len, "{\n");
|
||||
len += snprintf(content + len, MAXLEN - len, " \"shmid\":%d,\n", pWrapper->procShm.id);
|
||||
len += snprintf(content + len, MAXLEN - len, " \"shmsize\":%d\n", pWrapper->procShm.size);
|
||||
len += snprintf(content + len, MAXLEN - len, " \"shmid\":%d,\n", pShm->id);
|
||||
len += snprintf(content + len, MAXLEN - len, " \"shmsize\":%d\n", pShm->size);
|
||||
len += snprintf(content + len, MAXLEN - len, "}\n");
|
||||
|
||||
if (taosWriteFile(pFile, content, len) != len) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("node:%s, failed to write file:%s since %s", pWrapper->name, file, terrstr());
|
||||
dError("node:%s, failed to write file:%s since %s", name, file, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (taosFsyncFile(pFile) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("node:%s, failed to fsync file:%s since %s", pWrapper->name, file, terrstr());
|
||||
dError("node:%s, failed to fsync file:%s since %s", name, file, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -235,11 +235,11 @@ int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) {
|
|||
|
||||
if (taosRenameFile(file, realfile) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("node:%s, failed to rename %s to %s since %s", pWrapper->name, file, realfile, terrstr());
|
||||
dError("node:%s, failed to rename %s to %s since %s", name, file, realfile, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
dInfo("node:%s, successed to write %s", pWrapper->name, realfile);
|
||||
dInfo("node:%s, successed to write %s", name, realfile);
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
|
|
|
@ -1,92 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dmInt.h"
|
||||
|
||||
const char *dmStatStr(EDndRunStatus stype) {
|
||||
switch (stype) {
|
||||
case DND_STAT_INIT:
|
||||
return "init";
|
||||
case DND_STAT_RUNNING:
|
||||
return "running";
|
||||
case DND_STAT_STOPPED:
|
||||
return "stopped";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
const char *dmNodeLogName(EDndNodeType ntype) {
|
||||
switch (ntype) {
|
||||
case VNODE:
|
||||
return "vnode";
|
||||
case QNODE:
|
||||
return "qnode";
|
||||
case SNODE:
|
||||
return "snode";
|
||||
case MNODE:
|
||||
return "mnode";
|
||||
case BNODE:
|
||||
return "bnode";
|
||||
default:
|
||||
return "taosd";
|
||||
}
|
||||
}
|
||||
|
||||
const char *dmNodeProcName(EDndNodeType ntype) {
|
||||
switch (ntype) {
|
||||
case VNODE:
|
||||
return "taosv";
|
||||
case QNODE:
|
||||
return "taosq";
|
||||
case SNODE:
|
||||
return "taoss";
|
||||
case MNODE:
|
||||
return "taosm";
|
||||
case BNODE:
|
||||
return "taosb";
|
||||
default:
|
||||
return "taosd";
|
||||
}
|
||||
}
|
||||
|
||||
const char *dmEventStr(EDndEvent ev) {
|
||||
switch (ev) {
|
||||
case DND_EVENT_START:
|
||||
return "start";
|
||||
case DND_EVENT_STOP:
|
||||
return "stop";
|
||||
case DND_EVENT_CHILD:
|
||||
return "child";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
const char *dmProcStr(EDndProcType etype) {
|
||||
switch (etype) {
|
||||
case DND_PROC_SINGLE:
|
||||
return "start";
|
||||
case DND_PROC_CHILD:
|
||||
return "stop";
|
||||
case DND_PROC_PARENT:
|
||||
return "child";
|
||||
case DND_PROC_TEST:
|
||||
return "test";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
|
@ -16,143 +16,93 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "dmInt.h"
|
||||
|
||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
||||
if (pDnode->status != status) {
|
||||
dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status));
|
||||
pDnode->status = status;
|
||||
|
||||
const char *dmStatStr(EDndRunStatus stype) {
|
||||
switch (stype) {
|
||||
case DND_STAT_INIT:
|
||||
return "init";
|
||||
case DND_STAT_RUNNING:
|
||||
return "running";
|
||||
case DND_STAT_STOPPED:
|
||||
return "stopped";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
void dmSetEvent(SDnode *pDnode, EDndEvent event) {
|
||||
if (event == DND_EVENT_STOP) {
|
||||
pDnode->event = event;
|
||||
const char *dmNodeLogName(EDndNodeType ntype) {
|
||||
switch (ntype) {
|
||||
case VNODE:
|
||||
return "vnode";
|
||||
case QNODE:
|
||||
return "qnode";
|
||||
case SNODE:
|
||||
return "snode";
|
||||
case MNODE:
|
||||
return "mnode";
|
||||
case BNODE:
|
||||
return "bnode";
|
||||
default:
|
||||
return "taosd";
|
||||
}
|
||||
}
|
||||
|
||||
void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId) {
|
||||
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
|
||||
pWrapper->needCheckVgIds[TMSG_INDEX(msgType)] = needCheckVgId;
|
||||
}
|
||||
|
||||
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) {
|
||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||
SMgmtWrapper *pRetWrapper = pWrapper;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
if (pWrapper->deployed) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
pRetWrapper = NULL;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
||||
return pRetWrapper;
|
||||
}
|
||||
|
||||
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) {
|
||||
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
|
||||
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
|
||||
} else {
|
||||
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
code = -1;
|
||||
}
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void dmReleaseWrapper(SMgmtWrapper *pWrapper) {
|
||||
if (pWrapper == NULL) return;
|
||||
|
||||
taosRLockLatch(&pWrapper->latch);
|
||||
int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1);
|
||||
taosRUnLockLatch(&pWrapper->latch);
|
||||
dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount);
|
||||
}
|
||||
|
||||
void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) {
|
||||
SStartupInfo *pStartup = &pDnode->startup;
|
||||
tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN);
|
||||
tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN);
|
||||
dInfo("step:%s, %s", pStartup->name, pStartup->desc);
|
||||
}
|
||||
|
||||
void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) {
|
||||
dmReportStartup(pWrapper->pDnode, pName, pDesc);
|
||||
}
|
||||
|
||||
static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
|
||||
pStatus->details[0] = 0;
|
||||
|
||||
if (pDnode->status == DND_STAT_INIT) {
|
||||
pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK;
|
||||
snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc);
|
||||
} else if (pDnode->status == DND_STAT_STOPPED) {
|
||||
pStatus->statusCode = TSDB_SRV_STATUS_EXTING;
|
||||
} else {
|
||||
SDnodeData *pData = &pDnode->data;
|
||||
if (pData->isMnode && pData->mndState != TAOS_SYNC_STATE_LEADER && pData->mndState == TAOS_SYNC_STATE_FOLLOWER) {
|
||||
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
|
||||
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(pData->mndState));
|
||||
} else if (pData->unsyncedVgId != 0 && pData->vndState != TAOS_SYNC_STATE_LEADER &&
|
||||
pData->vndState != TAOS_SYNC_STATE_FOLLOWER) {
|
||||
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
|
||||
snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pData->unsyncedVgId,
|
||||
syncStr(pData->vndState));
|
||||
} else {
|
||||
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK;
|
||||
const char *dmNodeProcName(EDndNodeType ntype) {
|
||||
switch (ntype) {
|
||||
case VNODE:
|
||||
return "taosv";
|
||||
case QNODE:
|
||||
return "taosq";
|
||||
case SNODE:
|
||||
return "taoss";
|
||||
case MNODE:
|
||||
return "taosm";
|
||||
case BNODE:
|
||||
return "taosb";
|
||||
default:
|
||||
return "taosd";
|
||||
}
|
||||
}
|
||||
|
||||
const char *dmEventStr(EDndEvent ev) {
|
||||
switch (ev) {
|
||||
case DND_EVENT_START:
|
||||
return "start";
|
||||
case DND_EVENT_STOP:
|
||||
return "stop";
|
||||
case DND_EVENT_CHILD:
|
||||
return "child";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
}
|
||||
|
||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
dDebug("net test req is received");
|
||||
SRpcMsg rsp = {.handle = pReq->handle, .refId = pReq->refId, .ahandle = pReq->ahandle, .code = 0};
|
||||
rsp.pCont = rpcMallocCont(pReq->contLen);
|
||||
if (rsp.pCont == NULL) {
|
||||
rsp.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
rsp.contLen = pReq->contLen;
|
||||
const char *dmProcStr(EDndProcType etype) {
|
||||
switch (etype) {
|
||||
case DND_PROC_SINGLE:
|
||||
return "start";
|
||||
case DND_PROC_CHILD:
|
||||
return "stop";
|
||||
case DND_PROC_PARENT:
|
||||
return "child";
|
||||
case DND_PROC_TEST:
|
||||
return "test";
|
||||
default:
|
||||
return "UNKNOWN";
|
||||
}
|
||||
rpcSendResponse(&rsp);
|
||||
rpcFreeCont(pReq->pCont);
|
||||
}
|
||||
|
||||
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||
dDebug("server status req is received");
|
||||
void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId) {
|
||||
SMgmtHandle handle = {
|
||||
.msgType = msgType,
|
||||
.msgFp = nodeMsgFp,
|
||||
.needCheckVgId = needCheckVgId,
|
||||
};
|
||||
|
||||
SServerStatusRsp statusRsp = {0};
|
||||
dmGetServerStatus(pDnode, &statusRsp);
|
||||
|
||||
SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId};
|
||||
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
|
||||
if (rspLen < 0) {
|
||||
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
return taosArrayPush(pArray, &handle);
|
||||
}
|
||||
|
||||
void *pRsp = rpcMallocCont(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp);
|
||||
rspMsg.pCont = pRsp;
|
||||
rspMsg.contLen = rspLen;
|
||||
|
||||
_OVER:
|
||||
rpcSendResponse(&rspMsg);
|
||||
rpcFreeCont(pReq->pCont);
|
||||
}
|
||||
|
||||
void dmGetMonitorSysInfo(SMonSysInfo *pInfo) {
|
||||
void dmGetSystemInfo(SMonSysInfo *pInfo) {
|
||||
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system);
|
||||
taosGetCpuCores(&pInfo->cpu_cores);
|
||||
taosGetProcMemory(&pInfo->mem_engine);
|
||||
|
|
Loading…
Reference in New Issue