From ace5940ce855fdc71f4d2d859ba379fbf47b48ae Mon Sep 17 00:00:00 2001 From: Shengliang Date: Wed, 11 May 2022 17:09:14 +0800 Subject: [PATCH] refactor: node mgmt --- source/dnode/mgmt/node_common/inc/dmDef.h | 205 --------------------- source/dnode/mgmt/node_common/inc/dmInt.h | 139 +++++++++++--- source/dnode/mgmt/node_common/inc/dmLog.h | 36 ---- source/dnode/mgmt/node_common/src/dmFile.c | 54 +++--- source/dnode/mgmt/node_common/src/dmStr.c | 92 --------- source/dnode/mgmt/node_common/src/dmUtil.c | 194 ++++++++----------- 6 files changed, 216 insertions(+), 504 deletions(-) delete mode 100644 source/dnode/mgmt/node_common/inc/dmDef.h delete mode 100644 source/dnode/mgmt/node_common/inc/dmLog.h delete mode 100644 source/dnode/mgmt/node_common/src/dmStr.c diff --git a/source/dnode/mgmt/node_common/inc/dmDef.h b/source/dnode/mgmt/node_common/inc/dmDef.h deleted file mode 100644 index f91c582fa9..0000000000 --- a/source/dnode/mgmt/node_common/inc/dmDef.h +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DM_DEF_H_ -#define _TD_DM_DEF_H_ - -// tobe deleted -#include "uv.h" - -#include "dmLog.h" - -#include "cJSON.h" -#include "tcache.h" -#include "tcrc32c.h" -#include "tdatablock.h" -#include "tglobal.h" -#include "thash.h" -#include "tlockfree.h" -#include "tlog.h" -#include "tmsg.h" -#include "tmsgcb.h" -#include "tprocess.h" -#include "tqueue.h" -#include "trpc.h" -#include "tthread.h" -#include "ttime.h" -#include "tworker.h" - -#include "dnode.h" -#include "mnode.h" -#include "monitor.h" -#include "sync.h" -#include "wal.h" - -#include "libs/function/function.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef enum { - DNODE = 0, - VNODE = 1, - QNODE = 2, - SNODE = 3, - MNODE = 4, - BNODE = 5, - NODE_END = 6, -} EDndNodeType; - -typedef enum { - DND_STAT_INIT, - DND_STAT_RUNNING, - DND_STAT_STOPPED, -} EDndRunStatus; - -typedef enum { - DND_ENV_INIT, - DND_ENV_READY, - DND_ENV_CLEANUP, -} EDndEnvStatus; - -typedef enum { - DND_PROC_SINGLE, - DND_PROC_CHILD, - DND_PROC_PARENT, - DND_PROC_TEST, -} EDndProcType; - -typedef int32_t (*NodeMsgFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -typedef int32_t (*NodeOpenFp)(struct SMgmtWrapper *pWrapper); -typedef void (*NodeCloseFp)(struct SMgmtWrapper *pWrapper); -typedef int32_t (*NodeStartFp)(struct SMgmtWrapper *pWrapper); -typedef void (*NodeStopFp)(struct SMgmtWrapper *pWrapper); -typedef int32_t (*NodeCreateFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -typedef int32_t (*NodeDropFp)(struct SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -typedef int32_t (*NodeRequireFp)(struct SMgmtWrapper *pWrapper, bool *required); - -typedef struct { - EDndNodeType defaultNtype; - bool needCheckVgId; -} SMsgHandle; - -typedef struct { - NodeOpenFp openFp; - NodeCloseFp closeFp; - NodeStartFp startFp; - NodeStopFp stopFp; - NodeCreateFp createFp; - NodeDropFp dropFp; - NodeRequireFp requiredFp; -} SMgmtFp; - -typedef struct SMgmtWrapper { - SDnode *pDnode; - struct { - const char *name; - char *path; - int32_t refCount; - SRWLatch latch; - EDndNodeType nodeType; - bool deployed; - bool required; - SMgmtFp fp; - void *pMgmt; - }; - struct { - EDndProcType procType; - int32_t procId; - SProcObj *procObj; - SShm procShm; - }; - struct { - bool needCheckVgIds[TDMT_MAX]; - NodeMsgFp msgFps[TDMT_MAX]; - }; -} SMgmtWrapper; - -typedef struct { - void *serverRpc; - void *clientRpc; - SMsgHandle msgHandles[TDMT_MAX]; -} SDnodeTrans; - -typedef struct { - int32_t dnodeId; - int64_t clusterId; - int64_t dnodeVer; - int64_t updateTime; - int64_t rebootTime; - int32_t unsyncedVgId; - ESyncState vndState; - ESyncState mndState; - bool isMnode; - bool dropped; - SEpSet mnodeEps; - SArray *dnodeEps; - SHashObj *dnodeHash; - TdThread *statusThreadId; - TdThread *monitorThreadId; - SRWLatch latch; - SSingleWorker mgmtWorker; - SMsgCb msgCb; - SDnode *pDnode; - TdFilePtr lockfile; - char *localEp; - char *localFqdn; - char *firstEp; - char *secondEp; - char *dataDir; - SDiskCfg *disks; - int32_t numOfDisks; - int32_t supportVnodes; - uint16_t serverPort; -} SDnodeData; - -typedef struct { - char name[TSDB_STEP_NAME_LEN]; - char desc[TSDB_STEP_DESC_LEN]; -} SStartupInfo; - -typedef struct SUdfdData { - bool startCalled; - bool needCleanUp; - uv_loop_t loop; - uv_thread_t thread; - uv_barrier_t barrier; - uv_process_t process; - int spawnErr; - uv_pipe_t ctrlPipe; - uv_async_t stopAsync; - int32_t stopCalled; - int32_t dnodeId; -} SUdfdData; - -typedef struct SDnode { - EDndProcType ptype; - EDndNodeType ntype; - EDndRunStatus status; - EDndEvent event; - SStartupInfo startup; - SDnodeTrans trans; - SDnodeData data; - SUdfdData udfdData; - TdThreadMutex mutex; - SMgmtWrapper wrappers[NODE_END]; -} SDnode; - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DM_DEF_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/node_common/inc/dmInt.h b/source/dnode/mgmt/node_common/inc/dmInt.h index 692249e311..c6a7903660 100644 --- a/source/dnode/mgmt/node_common/inc/dmInt.h +++ b/source/dnode/mgmt/node_common/inc/dmInt.h @@ -16,37 +16,132 @@ #ifndef _TD_DM_INT_H_ #define _TD_DM_INT_H_ -#include "dmDef.h" +#include "cJSON.h" +#include "tcache.h" +#include "tcrc32c.h" +#include "tdatablock.h" +#include "tglobal.h" +#include "thash.h" +#include "tlockfree.h" +#include "tlog.h" +#include "tmsg.h" +#include "tmsgcb.h" +#include "tprocess.h" +#include "tqueue.h" +#include "trpc.h" +#include "tthread.h" +#include "ttime.h" +#include "tworker.h" + +#include "dnode.h" +#include "mnode.h" +#include "monitor.h" +#include "sync.h" +#include "wal.h" + +#include "libs/function/function.h" #ifdef __cplusplus extern "C" { #endif -// dmInt.c -SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType); -int32_t dmMarkWrapper(SMgmtWrapper *pWrapper); -void dmReleaseWrapper(SMgmtWrapper *pWrapper); -const char *dmStatStr(EDndRunStatus stype); -const char *dmNodeLogName(EDndNodeType ntype); -const char *dmNodeProcName(EDndNodeType ntype); -const char *dmEventStr(EDndEvent etype); -const char *dmProcStr(EDndProcType ptype); +#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} +#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} +#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} +#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} +#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} +#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -void dmSetStatus(SDnode *pDnode, EDndRunStatus stype); -void dmSetEvent(SDnode *pDnode, EDndEvent event); -void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgIds); -void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); -void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc); -void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pMsg); -void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg); -void dmGetMonitorSysInfo(SMonSysInfo *pInfo); +typedef enum { + DNODE = 0, + VNODE = 1, + QNODE = 2, + SNODE = 3, + MNODE = 4, + BNODE = 5, + NODE_END = 6, +} EDndNodeType; + +typedef enum { + DND_STAT_INIT, + DND_STAT_RUNNING, + DND_STAT_STOPPED, +} EDndRunStatus; + +typedef enum { + DND_ENV_INIT, + DND_ENV_READY, + DND_ENV_CLEANUP, +} EDndEnvStatus; + +typedef enum { + DND_PROC_SINGLE, + DND_PROC_CHILD, + DND_PROC_PARENT, + DND_PROC_TEST, +} EDndProcType; + +typedef struct { + const char *path; + const char *name; + SMsgCb msgCb; + int32_t dnodeId; + int64_t clusterId; + const char *dataDir; + const char *localEp; + const char *firstEp; + const char *localFqdn; + uint16_t serverPort; + int32_t supportVnodes; +} SMgmtInputOpt; + +typedef struct { + int32_t dnodeId; + void *pMgmt; +} SMgmtOutputOpt; + +typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg); +typedef int32_t (*NodeOpenFp)(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput); +typedef void (*NodeCloseFp)(void *pMgmt); +typedef int32_t (*NodeStartFp)(void *pMgmt); +typedef void (*NodeStopFp)(void *pMgmt); +typedef int32_t (*NodeCreateFp)(void *pMgmt, SNodeMsg *pMsg); +typedef int32_t (*NodeDropFp)(void *pMgmt, SNodeMsg *pMsg); +typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required); +typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle + +typedef struct { + NodeOpenFp openFp; + NodeCloseFp closeFp; + NodeStartFp startFp; + NodeStopFp stopFp; + NodeCreateFp createFp; + NodeDropFp dropFp; + NodeRequireFp requiredFp; + NodeGetHandlesFp getHandlesFp; +} SMgmtFunc; + +typedef struct { + tmsg_t msgType; + bool needCheckVgId; + NodeMsgFp msgFp; +} SMgmtHandle; + +// dmUtil.c +const char *dmStatStr(EDndRunStatus stype); +const char *dmNodeLogName(EDndNodeType ntype); +const char *dmNodeProcName(EDndNodeType ntype); +const char *dmEventStr(EDndEvent etype); +const char *dmProcStr(EDndProcType ptype); +void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId); +void dmGetSystemInfo(SMonSysInfo *pInfo); // dmFile.c -int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); -int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed); +int32_t dmReadFile(const char *path, const char *name, bool *pDeployed); +int32_t dmWriteFile(const char *path, const char *name, bool deployed); TdFilePtr dmCheckRunning(const char *dataDir); -int32_t dmReadShmFile(SMgmtWrapper *pWrapper); -int32_t dmWriteShmFile(SMgmtWrapper *pWrapper); +int32_t dmReadShmFile(const char *path, const char *name, SShm *pShm); +int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/node_common/inc/dmLog.h b/source/dnode/mgmt/node_common/inc/dmLog.h deleted file mode 100644 index c21933fc01..0000000000 --- a/source/dnode/mgmt/node_common/inc/dmLog.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DM_LOG_H_ -#define _TD_DM_LOG_H_ - -#include "tlog.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} -#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} -#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} -#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} -#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} -#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DM_LOG_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/node_common/src/dmFile.c b/source/dnode/mgmt/node_common/src/dmFile.c index 3dc3cdcaf7..a49d4ae9e7 100644 --- a/source/dnode/mgmt/node_common/src/dmFile.c +++ b/source/dnode/mgmt/node_common/src/dmFile.c @@ -18,7 +18,7 @@ #define MAXLEN 1024 -int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { +int32_t dmReadFile(const char *path, const char *name, bool *pDeployed) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int64_t len = 0; char content[MAXLEN + 1] = {0}; @@ -26,7 +26,7 @@ int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { char file[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); + snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name); pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { code = 0; @@ -63,7 +63,7 @@ _OVER: return code; } -int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed) { +int32_t dmWriteFile(const char *path, const char *name, bool deployed) { int32_t code = -1; int32_t len = 0; char content[MAXLEN + 1] = {0}; @@ -71,8 +71,8 @@ int32_t dmWriteFile(SMgmtWrapper *pWrapper, bool deployed) { char realfile[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); - snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); + snprintf(file, sizeof(file), "%s%s%s.json", path, TD_DIRSEP, name); + snprintf(realfile, sizeof(realfile), "%s%s%s.json", path, TD_DIRSEP, name); pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { @@ -139,14 +139,14 @@ TdFilePtr dmCheckRunning(const char *dataDir) { return pFile; } -int32_t dmReadShmFile(SMgmtWrapper *pWrapper) { +int32_t dmReadShmFile(const char *path, const char *name, SShm *pShm) { int32_t code = -1; char content[MAXLEN + 1] = {0}; char file[PATH_MAX] = {0}; cJSON *root = NULL; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%sshmfile", pWrapper->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%sshmfile", path, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_READ); if (pFile == NULL) { code = 0; @@ -157,36 +157,36 @@ int32_t dmReadShmFile(SMgmtWrapper *pWrapper) { root = cJSON_Parse(content); if (root == NULL) { terrno = TSDB_CODE_INVALID_JSON_FORMAT; - dError("node:%s, failed to read %s since invalid json format", pWrapper->name, file); + dError("node:%s, failed to read %s since invalid json format", name, file); goto _OVER; } cJSON *shmid = cJSON_GetObjectItem(root, "shmid"); if (shmid && shmid->type == cJSON_Number) { - pWrapper->procShm.id = shmid->valueint; + pShm->id = shmid->valueint; } cJSON *shmsize = cJSON_GetObjectItem(root, "shmsize"); if (shmsize && shmsize->type == cJSON_Number) { - pWrapper->procShm.size = shmsize->valueint; + pShm->size = shmsize->valueint; } } if (!tsMultiProcess) { - if (pWrapper->procShm.id >= 0) { - dDebug("node:%s, shmid:%d, is closed, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size); - taosDropShm(&pWrapper->procShm); + if (pShm->id >= 0) { + dDebug("node:%s, shmid:%d, is closed, size:%d", name, pShm->id, pShm->size); + taosDropShm(pShm); } } else { - if (taosAttachShm(&pWrapper->procShm) != 0) { + if (taosAttachShm(pShm) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("shmid:%d, failed to attach shm since %s", pWrapper->procShm.id, terrstr()); + dError("shmid:%d, failed to attach shm since %s", pShm->id, terrstr()); goto _OVER; } - dInfo("node:%s, shmid:%d is attached, size:%d", pWrapper->name, pWrapper->procShm.id, pWrapper->procShm.size); + dInfo("node:%s, shmid:%d is attached, size:%d", name, pShm->id, pShm->size); } - dDebug("node:%s, successed to load %s", pWrapper->name, file); + dDebug("node:%s, successed to load %s", name, file); code = 0; _OVER: @@ -196,7 +196,7 @@ _OVER: return code; } -int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) { +int32_t dmWriteShmFile(const char *path, const char *name, const SShm *pShm) { int32_t code = -1; int32_t len = 0; char content[MAXLEN + 1] = {0}; @@ -204,30 +204,30 @@ int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) { char realfile[PATH_MAX] = {0}; TdFilePtr pFile = NULL; - snprintf(file, sizeof(file), "%s%sshmfile.bak", pWrapper->path, TD_DIRSEP); - snprintf(realfile, sizeof(realfile), "%s%sshmfile", pWrapper->path, TD_DIRSEP); + snprintf(file, sizeof(file), "%s%sshmfile.bak", path, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%sshmfile", path, TD_DIRSEP); pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to open file:%s since %s", pWrapper->name, file, terrstr()); + dError("node:%s, failed to open file:%s since %s", name, file, terrstr()); goto _OVER; } len += snprintf(content + len, MAXLEN - len, "{\n"); - len += snprintf(content + len, MAXLEN - len, " \"shmid\":%d,\n", pWrapper->procShm.id); - len += snprintf(content + len, MAXLEN - len, " \"shmsize\":%d\n", pWrapper->procShm.size); + len += snprintf(content + len, MAXLEN - len, " \"shmid\":%d,\n", pShm->id); + len += snprintf(content + len, MAXLEN - len, " \"shmsize\":%d\n", pShm->size); len += snprintf(content + len, MAXLEN - len, "}\n"); if (taosWriteFile(pFile, content, len) != len) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to write file:%s since %s", pWrapper->name, file, terrstr()); + dError("node:%s, failed to write file:%s since %s", name, file, terrstr()); goto _OVER; } if (taosFsyncFile(pFile) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to fsync file:%s since %s", pWrapper->name, file, terrstr()); + dError("node:%s, failed to fsync file:%s since %s", name, file, terrstr()); goto _OVER; } @@ -235,11 +235,11 @@ int32_t dmWriteShmFile(SMgmtWrapper *pWrapper) { if (taosRenameFile(file, realfile) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to rename %s to %s since %s", pWrapper->name, file, realfile, terrstr()); + dError("node:%s, failed to rename %s to %s since %s", name, file, realfile, terrstr()); return -1; } - dInfo("node:%s, successed to write %s", pWrapper->name, realfile); + dInfo("node:%s, successed to write %s", name, realfile); code = 0; _OVER: diff --git a/source/dnode/mgmt/node_common/src/dmStr.c b/source/dnode/mgmt/node_common/src/dmStr.c deleted file mode 100644 index df8ff6d766..0000000000 --- a/source/dnode/mgmt/node_common/src/dmStr.c +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dmInt.h" - -const char *dmStatStr(EDndRunStatus stype) { - switch (stype) { - case DND_STAT_INIT: - return "init"; - case DND_STAT_RUNNING: - return "running"; - case DND_STAT_STOPPED: - return "stopped"; - default: - return "UNKNOWN"; - } -} - -const char *dmNodeLogName(EDndNodeType ntype) { - switch (ntype) { - case VNODE: - return "vnode"; - case QNODE: - return "qnode"; - case SNODE: - return "snode"; - case MNODE: - return "mnode"; - case BNODE: - return "bnode"; - default: - return "taosd"; - } -} - -const char *dmNodeProcName(EDndNodeType ntype) { - switch (ntype) { - case VNODE: - return "taosv"; - case QNODE: - return "taosq"; - case SNODE: - return "taoss"; - case MNODE: - return "taosm"; - case BNODE: - return "taosb"; - default: - return "taosd"; - } -} - -const char *dmEventStr(EDndEvent ev) { - switch (ev) { - case DND_EVENT_START: - return "start"; - case DND_EVENT_STOP: - return "stop"; - case DND_EVENT_CHILD: - return "child"; - default: - return "UNKNOWN"; - } -} - -const char *dmProcStr(EDndProcType etype) { - switch (etype) { - case DND_PROC_SINGLE: - return "start"; - case DND_PROC_CHILD: - return "stop"; - case DND_PROC_PARENT: - return "child"; - case DND_PROC_TEST: - return "test"; - default: - return "UNKNOWN"; - } -} diff --git a/source/dnode/mgmt/node_common/src/dmUtil.c b/source/dnode/mgmt/node_common/src/dmUtil.c index c1d15f62de..34205e7ccb 100644 --- a/source/dnode/mgmt/node_common/src/dmUtil.c +++ b/source/dnode/mgmt/node_common/src/dmUtil.c @@ -16,143 +16,93 @@ #define _DEFAULT_SOURCE #include "dmInt.h" -void dmSetStatus(SDnode *pDnode, EDndRunStatus status) { - if (pDnode->status != status) { - dDebug("dnode status set from %s to %s", dmStatStr(pDnode->status), dmStatStr(status)); - pDnode->status = status; + +const char *dmStatStr(EDndRunStatus stype) { + switch (stype) { + case DND_STAT_INIT: + return "init"; + case DND_STAT_RUNNING: + return "running"; + case DND_STAT_STOPPED: + return "stopped"; + default: + return "UNKNOWN"; } } -void dmSetEvent(SDnode *pDnode, EDndEvent event) { - if (event == DND_EVENT_STOP) { - pDnode->event = event; +const char *dmNodeLogName(EDndNodeType ntype) { + switch (ntype) { + case VNODE: + return "vnode"; + case QNODE: + return "qnode"; + case SNODE: + return "snode"; + case MNODE: + return "mnode"; + case BNODE: + return "bnode"; + default: + return "taosd"; } } -void dmSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId) { - pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; - pWrapper->needCheckVgIds[TMSG_INDEX(msgType)] = needCheckVgId; -} - -SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType ntype) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SMgmtWrapper *pRetWrapper = pWrapper; - - taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed) { - int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); - dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount); - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - pRetWrapper = NULL; - } - taosRUnLockLatch(&pWrapper->latch); - - return pRetWrapper; -} - -int32_t dmMarkWrapper(SMgmtWrapper *pWrapper) { - int32_t code = 0; - - taosRLockLatch(&pWrapper->latch); - if (pWrapper->deployed || (pWrapper->procType == DND_PROC_PARENT && pWrapper->required)) { - int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); - dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount); - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - code = -1; - } - taosRUnLockLatch(&pWrapper->latch); - - return code; -} - -void dmReleaseWrapper(SMgmtWrapper *pWrapper) { - if (pWrapper == NULL) return; - - taosRLockLatch(&pWrapper->latch); - int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); - taosRUnLockLatch(&pWrapper->latch); - dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); -} - -void dmReportStartup(SDnode *pDnode, const char *pName, const char *pDesc) { - SStartupInfo *pStartup = &pDnode->startup; - tstrncpy(pStartup->name, pName, TSDB_STEP_NAME_LEN); - tstrncpy(pStartup->desc, pDesc, TSDB_STEP_DESC_LEN); - dInfo("step:%s, %s", pStartup->name, pStartup->desc); -} - -void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const char *pDesc) { - dmReportStartup(pWrapper->pDnode, pName, pDesc); -} - -static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) { - pStatus->details[0] = 0; - - if (pDnode->status == DND_STAT_INIT) { - pStatus->statusCode = TSDB_SRV_STATUS_NETWORK_OK; - snprintf(pStatus->details, sizeof(pStatus->details), "%s: %s", pDnode->startup.name, pDnode->startup.desc); - } else if (pDnode->status == DND_STAT_STOPPED) { - pStatus->statusCode = TSDB_SRV_STATUS_EXTING; - } else { - SDnodeData *pData = &pDnode->data; - if (pData->isMnode && pData->mndState != TAOS_SYNC_STATE_LEADER && pData->mndState == TAOS_SYNC_STATE_FOLLOWER) { - pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; - snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(pData->mndState)); - } else if (pData->unsyncedVgId != 0 && pData->vndState != TAOS_SYNC_STATE_LEADER && - pData->vndState != TAOS_SYNC_STATE_FOLLOWER) { - pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED; - snprintf(pStatus->details, sizeof(pStatus->details), "vnode:%d sync state is %s", pData->unsyncedVgId, - syncStr(pData->vndState)); - } else { - pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_OK; - } +const char *dmNodeProcName(EDndNodeType ntype) { + switch (ntype) { + case VNODE: + return "taosv"; + case QNODE: + return "taosq"; + case SNODE: + return "taoss"; + case MNODE: + return "taosm"; + case BNODE: + return "taosb"; + default: + return "taosd"; } } -void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("net test req is received"); - SRpcMsg rsp = {.handle = pReq->handle, .refId = pReq->refId, .ahandle = pReq->ahandle, .code = 0}; - rsp.pCont = rpcMallocCont(pReq->contLen); - if (rsp.pCont == NULL) { - rsp.code = TSDB_CODE_OUT_OF_MEMORY; - } else { - rsp.contLen = pReq->contLen; +const char *dmEventStr(EDndEvent ev) { + switch (ev) { + case DND_EVENT_START: + return "start"; + case DND_EVENT_STOP: + return "stop"; + case DND_EVENT_CHILD: + return "child"; + default: + return "UNKNOWN"; } - rpcSendResponse(&rsp); - rpcFreeCont(pReq->pCont); } -void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) { - dDebug("server status req is received"); - - SServerStatusRsp statusRsp = {0}; - dmGetServerStatus(pDnode, &statusRsp); - - SRpcMsg rspMsg = {.handle = pReq->handle, .ahandle = pReq->ahandle, .refId = pReq->refId}; - int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp); - if (rspLen < 0) { - rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; +const char *dmProcStr(EDndProcType etype) { + switch (etype) { + case DND_PROC_SINGLE: + return "start"; + case DND_PROC_CHILD: + return "stop"; + case DND_PROC_PARENT: + return "child"; + case DND_PROC_TEST: + return "test"; + default: + return "UNKNOWN"; } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - rspMsg.code = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; - } - - tSerializeSServerStatusRsp(pRsp, rspLen, &statusRsp); - rspMsg.pCont = pRsp; - rspMsg.contLen = rspLen; - -_OVER: - rpcSendResponse(&rspMsg); - rpcFreeCont(pReq->pCont); } -void dmGetMonitorSysInfo(SMonSysInfo *pInfo) { +void *dmSetMgmtHandle(SArray *pArray, tmsg_t msgType, NodeMsgFp nodeMsgFp, bool needCheckVgId) { + SMgmtHandle handle = { + .msgType = msgType, + .msgFp = nodeMsgFp, + .needCheckVgId = needCheckVgId, + }; + + return taosArrayPush(pArray, &handle); +} + +void dmGetSystemInfo(SMonSysInfo *pInfo) { taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); taosGetCpuCores(&pInfo->cpu_cores); taosGetProcMemory(&pInfo->mem_engine);