refact(cluster): node mgmt
This commit is contained in:
parent
3663c6441d
commit
e770a6e974
|
@ -189,7 +189,7 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
|
||||||
tsDiskCfgNum = 1;
|
tsDiskCfgNum = 1;
|
||||||
taosAddDataDir(0, pItem->str, 0, 1);
|
taosAddDataDir(0, pItem->str, 0, 1);
|
||||||
tstrncpy(tsDataDir, pItem->str, PATH_MAX);
|
tstrncpy(tsDataDir, pItem->str, PATH_MAX);
|
||||||
if (taosMkDir(tsDataDir) != 0) {
|
if (taosMulMkDir(tsDataDir) != 0) {
|
||||||
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
|
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -200,12 +200,12 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
|
||||||
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
|
memcpy(&tsDiskCfg[index], pCfg, sizeof(SDiskCfg));
|
||||||
if (pCfg->level == 0 && pCfg->primary == 1) {
|
if (pCfg->level == 0 && pCfg->primary == 1) {
|
||||||
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
|
tstrncpy(tsDataDir, pCfg->dir, PATH_MAX);
|
||||||
if (taosMkDir(tsDataDir) != 0) {
|
if (taosMulMkDir(tsDataDir) != 0) {
|
||||||
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
|
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (taosMkDir(pCfg->dir) != 0) {
|
if (taosMulMkDir(pCfg->dir) != 0) {
|
||||||
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
|
uError("failed to create tfsDir:%s since %s", tsDataDir, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -486,7 +486,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
|
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
|
||||||
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
|
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
|
||||||
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
|
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
|
||||||
if (taosMkDir(tsTempDir) != 0) {
|
if (taosMulMkDir(tsTempDir) != 0) {
|
||||||
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
|
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
add_subdirectory(exe)
|
|
||||||
add_subdirectory(interface)
|
add_subdirectory(interface)
|
||||||
add_subdirectory(implement)
|
add_subdirectory(implement)
|
||||||
add_subdirectory(mgmt_bnode)
|
add_subdirectory(mgmt_bnode)
|
||||||
|
@ -7,3 +6,11 @@ add_subdirectory(mgmt_qnode)
|
||||||
add_subdirectory(mgmt_snode)
|
add_subdirectory(mgmt_snode)
|
||||||
add_subdirectory(mgmt_vnode)
|
add_subdirectory(mgmt_vnode)
|
||||||
add_subdirectory(test)
|
add_subdirectory(test)
|
||||||
|
|
||||||
|
aux_source_directory(exe EXEC_SRC)
|
||||||
|
add_executable(taosd ${EXEC_SRC})
|
||||||
|
target_include_directories(
|
||||||
|
taosd
|
||||||
|
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/implement/inc"
|
||||||
|
)
|
||||||
|
target_link_libraries(taosd dnode)
|
||||||
|
|
|
@ -1,7 +0,0 @@
|
||||||
aux_source_directory(. EXEC_SRC)
|
|
||||||
add_executable(taosd ${EXEC_SRC})
|
|
||||||
target_include_directories(
|
|
||||||
taosd
|
|
||||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../implement/inc"
|
|
||||||
)
|
|
||||||
target_link_libraries(taosd dnode)
|
|
|
@ -57,7 +57,7 @@ int32_t dmReadEps(SDnode *pDnode) {
|
||||||
snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->data.path, TD_DIRSEP);
|
snprintf(file, sizeof(file), "%s%sdnode.json", pDnode->data.path, TD_DIRSEP);
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
dDebug("file %s not exist", file);
|
// dDebug("file %s not exist", file);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto PRASE_DNODE_OVER;
|
goto PRASE_DNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,6 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) {
|
||||||
int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
|
int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
|
||||||
if (!required) {
|
if (!required) {
|
||||||
dDebug("node:%s, does not require startup", pWrapper->name);
|
dDebug("node:%s, does not require startup", pWrapper->name);
|
||||||
} else {
|
|
||||||
dDebug("node:%s, needs to be started", pWrapper->name);
|
|
||||||
}
|
}
|
||||||
return required;
|
return required;
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,7 +199,7 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
||||||
dInfo("dnode-data start to init");
|
dInfo("dnode-mgmt start to init");
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
|
|
||||||
pDnode->data.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
pDnode->data.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
|
||||||
|
@ -228,12 +228,12 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("dnode-data is initialized");
|
dInfo("dnode-mgmt is initialized");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
|
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
|
||||||
dInfo("dnode-data start to clean up");
|
dInfo("dnode-mgmt start to clean up");
|
||||||
SDnode *pDnode = pWrapper->pDnode;
|
SDnode *pDnode = pWrapper->pDnode;
|
||||||
dmStopWorker(pDnode);
|
dmStopWorker(pDnode);
|
||||||
|
|
||||||
|
@ -249,7 +249,7 @@ static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
|
||||||
taosWUnLockLatch(&pDnode->data.latch);
|
taosWUnLockLatch(&pDnode->data.latch);
|
||||||
|
|
||||||
dmCleanupTrans(pDnode);
|
dmCleanupTrans(pDnode);
|
||||||
dInfo("dnode-data is cleaned up");
|
dInfo("dnode-mgmt is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dmRequireMgmt(SMgmtWrapper *pWrapper, bool *required) {
|
static int32_t dmRequireMgmt(SMgmtWrapper *pWrapper, bool *required) {
|
||||||
|
|
|
@ -29,7 +29,7 @@ int32_t dmReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
|
||||||
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
|
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
dDebug("file %s not exist", file);
|
// dDebug("file %s not exist", file);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
@ -150,7 +150,7 @@ int32_t dmReadShmFile(SMgmtWrapper *pWrapper) {
|
||||||
snprintf(file, sizeof(file), "%s%sshmfile", pWrapper->path, TD_DIRSEP);
|
snprintf(file, sizeof(file), "%s%sshmfile", pWrapper->path, TD_DIRSEP);
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
dDebug("node:%s, file %s not exist", pWrapper->name, file);
|
// dDebug("node:%s, file %s not exist", pWrapper->name, file);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,8 @@
|
||||||
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); }
|
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmReadFile(pWrapper, required); }
|
||||||
|
|
||||||
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
|
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
|
||||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;;
|
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||||
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
pOption->msgCb = msgCb;
|
pOption->msgCb = msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
|
||||||
snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
|
snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
dDebug("file %s not exist", file);
|
// dDebug("file %s not exist", file);
|
||||||
code = 0;
|
code = 0;
|
||||||
goto PRASE_MNODE_OVER;
|
goto PRASE_MNODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
|
||||||
|
|
||||||
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
|
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
|
||||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||||
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
|
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue;
|
||||||
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
|
msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue;
|
||||||
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
|
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
|
||||||
|
|
|
@ -20,6 +20,7 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dmRead
|
||||||
|
|
||||||
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
|
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
|
||||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||||
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
|
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
|
||||||
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
|
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
|
||||||
msgCb.qsizeFp = qmGetQueueSize;
|
msgCb.qsizeFp = qmGetQueueSize;
|
||||||
|
|
|
@ -20,6 +20,7 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dmRead
|
||||||
|
|
||||||
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
|
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
|
||||||
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
SMsgCb msgCb = pMgmt->pDnode->data.msgCb;
|
||||||
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
pOption->msgCb = msgCb;
|
pOption->msgCb = msgCb;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue