enh: dmodule logic

This commit is contained in:
Ping Xiao 2023-09-26 11:47:53 +08:00
parent 3dbee70b81
commit 4f23ea2a8c
12 changed files with 69 additions and 61 deletions

View File

@ -195,6 +195,20 @@ if (TD_LINUX)
ELSE()
set(TD_DEPS_DIR "x86")
ENDIF()
elseif (TD_DARWIN)
IF (TD_ARM_64)
set(TD_DEPS_DIR "darwin/arm64")
ELSE ()
set(TD_DEPS_DIR "darwin/x64")
ENDIF ()
elseif (TD_WINDOWS)
IF (TD_WINDOWS_64)
set(TD_DEPS_DIR "win/x86")
ELSEIF (TD_WINDOWS_32)
set(TD_DEPS_DIR "win/i386")
ENDIF ()
else ()
MESSAGE(FATAL_ERROR "unsupported platform")
endif()
MESSAGE(STATUS "DEPS_DIR: " ${TD_DEPS_DIR})

BIN
deps/darwin/arm64/dm_static/libdmodule.a vendored Normal file

Binary file not shown.

View File

@ -3,11 +3,22 @@ add_library(mgmt_dnode STATIC ${MGMT_DNODE})
if (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions(-DGRANTS_CFG)
endif()
IF (NOT BUILD_DM_MODULE)
MESSAGE(STATUS "NOT BUILD_DM_MODULE")
target_link_directories(
mgmt_dnode
PUBLIC "${TD_SOURCE_DIR}/deps/${TD_DEPS_DIR}/dm_static"
)
ELSE()
MESSAGE(STATUS "BUILD_DM_MODULE")
ENDIF()
target_include_directories(
mgmt_dnode
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
PUBLIC "${GRANT_CFG_INCLUDE_DIR}"
)
target_link_libraries(
mgmt_dnode node_util
mgmt_dnode node_util dmodule
)

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmInt.h"
#include "libs/function/tudf.h"
static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if (dmStartStatusThread(pMgmt) != 0) {

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
#include "libs/function/tudf.h"
static int32_t qmRequire(const SMgmtInputOpt *pInput, bool *required) {
return dmReadFile(pInput->path, pInput->name, required);

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
#include "libs/function/function.h"
#include "libs/function/tudf.h"
static int32_t smRequire(const SMgmtInputOpt *pInput, bool *required) {
return dmReadFile(pInput->path, pInput->name, required);

View File

@ -17,6 +17,7 @@
#include "vmInt.h"
#include "tfs.h"
#include "vnd.h"
#include "libs/function/tudf.h"
int32_t vmAllocPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
STfs *pTfs = pMgmt->pTfs;

View File

@ -95,6 +95,10 @@ void dmCleanupDnode(SDnode *pDnode);
SMgmtWrapper *dmAcquireWrapper(SDnode *pDnode, EDndNodeType nType);
int32_t dmMarkWrapper(SMgmtWrapper *pWrapper);
void dmReleaseWrapper(SMgmtWrapper *pWrapper);
int32_t dmInitVars(SDnode *pDnode);
void dmClearVars(SDnode *pDnode);
int32_t dmInitModule(SDnode *pDnode);
bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper);
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);

View File

@ -16,24 +16,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "audit.h"
#define STR_CASE_CMP(s, d) (0 == strcasecmp((s), (d)))
#define STR_STR_CMP(s, d) (strstr((s), (d)))
#define STR_INT_CMP(s, d, c) (taosStr2Int32(s, 0, 10) c(d))
#define STR_STR_SIGN ("ia")
#define DM_INIT_MON() \
do { \
code = (int32_t)(2147483648 | 298); \
strncpy(stName, tsVersionName, 64); \
monCfg.maxLogs = tsMonitorMaxLogs; \
monCfg.port = tsMonitorPort; \
monCfg.server = tsMonitorFqdn; \
monCfg.comp = tsMonitorComp; \
if (monInit(&monCfg) != 0) { \
if (terrno != 0) code = terrno; \
goto _exit; \
} \
} while (0)
#include "libs/function/tudf.h"
#define DM_INIT_AUDIT() \
do { \
@ -45,15 +28,7 @@
} \
} while (0)
#define DM_ERR_RTN(c) \
do { \
code = (c); \
goto _exit; \
} while (0)
static SDnode globalDnode = {0};
static const char *dmOS[10] = {"Ubuntu", "CentOS Linux", "Red Hat", "Debian GNU", "CoreOS",
"FreeBSD", "openSUSE", "SLES", "Fedora", "macOS"};
SDnode *dmInstance() { return &globalDnode; }
@ -76,30 +51,14 @@ static int32_t dmInitSystem() {
static int32_t dmInitMonitor() {
int32_t code = 0;
SMonCfg monCfg = {0};
char reName[64] = {0};
char stName[64] = {0};
char ver[64] = {0};
DM_INIT_MON();
if (STR_STR_CMP(stName, STR_STR_SIGN)) {
DM_ERR_RTN(0);
}
if (taosGetOsReleaseName(reName, stName, ver, 64) != 0) {
DM_ERR_RTN(code);
}
if (STR_CASE_CMP(stName, dmOS[0])) {
if (STR_INT_CMP(ver, 17, >)) {
DM_ERR_RTN(0);
}
} else if (STR_CASE_CMP(stName, dmOS[1])) {
if (STR_INT_CMP(ver, 6, >)) {
DM_ERR_RTN(0);
}
} else if (STR_STR_CMP(stName, dmOS[2]) || STR_STR_CMP(stName, dmOS[3]) || STR_STR_CMP(stName, dmOS[4]) ||
STR_STR_CMP(stName, dmOS[5]) || STR_STR_CMP(stName, dmOS[6]) || STR_STR_CMP(stName, dmOS[7]) ||
STR_STR_CMP(stName, dmOS[8]) || STR_STR_CMP(stName, dmOS[9])) {
DM_ERR_RTN(0);
monCfg.maxLogs = tsMonitorMaxLogs;
monCfg.port = tsMonitorPort;
monCfg.server = tsMonitorFqdn;
monCfg.comp = tsMonitorComp;
if (monInit(&monCfg) != 0) {
if (terrno != 0) code = terrno;
goto _exit;
}
_exit:

View File

@ -24,6 +24,7 @@
#include "tcompression.h"
#endif
#if 0
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@ -105,6 +106,7 @@ static void dmClearVars(SDnode *pDnode) {
taosThreadMutexDestroy(&pDnode->mutex);
memset(&pDnode->mutex, 0, sizeof(pDnode->mutex));
}
#endif
int32_t dmInitDnode(SDnode *pDnode) {
dDebug("start to create dnode");
@ -143,22 +145,26 @@ int32_t dmInitDnode(SDnode *pDnode) {
pWrapper->required = dmRequireNode(pDnode, pWrapper);
}
if (dmInitMsgHandle(pDnode) != 0) {
dError("failed to init msg handles since %s", terrstr());
goto _OVER;
}
// if (dmInitMsgHandle(pDnode) != 0) {
// dError("failed to init msg handles since %s", terrstr());
// goto _OVER;
// }
pDnode->lockfile = dmCheckRunning(tsDataDir);
if (pDnode->lockfile == NULL) {
goto _OVER;
}
if (dmInitServer(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
goto _OVER;
}
// if (dmInitServer(pDnode) != 0) {
// dError("failed to init transport since %s", terrstr());
// goto _OVER;
// }
if (dmInitClient(pDnode) != 0) {
// if (dmInitClient(pDnode) != 0) {
// goto _OVER;
// }
if(dmInitModule(pDnode) != 0) {
goto _OVER;
}

View File

@ -40,7 +40,6 @@
#include "tfs.h"
#include "wal.h"
#include "libs/function/tudf.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -94,6 +93,7 @@ typedef int32_t (*ProcessAlterNodeTypeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef struct {
int32_t dnodeId;
int32_t engineVer;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
@ -172,6 +172,9 @@ int32_t dmReadFile(const char *path, const char *name, bool *pDeployed);
int32_t dmWriteFile(const char *path, const char *name, bool deployed);
TdFilePtr dmCheckRunning(const char *dataDir);
// dmodule.c
int32_t dmInitDndInfo(SDnodeData *pData);
// dmEps.c
int32_t dmReadEps(SDnodeData *pData);
int32_t dmWriteEps(SDnodeData *pData);

View File

@ -57,6 +57,8 @@ static int32_t dmDecodeEps(SJson *pJson, SDnodeData *pData) {
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "dnodeVer", pData->dnodeVer, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "engineVer", pData->dnodeVer, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "clusterId", pData->clusterId, code);
if (code < 0) return -1;
tjsonGetInt32ValueFromDouble(pJson, "dropped", pData->dropped, code);
@ -96,7 +98,8 @@ int32_t dmReadEps(SDnodeData *pData) {
pData->dnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
if (pData->dnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno));
code = terrno;
dError("failed to calloc dnodeEp array since %s", terrstr());
goto _OVER;
}
@ -184,6 +187,7 @@ _OVER:
static int32_t dmEncodeEps(SJson *pJson, SDnodeData *pData) {
if (tjsonAddDoubleToObject(pJson, "dnodeId", pData->dnodeId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "dnodeVer", pData->dnodeVer) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "engineVer", pData->engineVer) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "clusterId", pData->clusterId) < 0) return -1;
if (tjsonAddDoubleToObject(pJson, "dropped", pData->dropped) < 0) return -1;
@ -218,8 +222,11 @@ int32_t dmWriteEps(SDnodeData *pData) {
snprintf(realfile, sizeof(realfile), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP);
terrno = TSDB_CODE_OUT_OF_MEMORY;
if((code == dmInitDndInfo(pData)) != 0) goto _OVER;
pJson = tjsonCreateObject();
if (pJson == NULL) goto _OVER;
pData->engineVer = tsVersion;
if (dmEncodeEps(pJson, pData) != 0) goto _OVER;
buffer = tjsonToString(pJson);
if (buffer == NULL) goto _OVER;