diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 089cb5bb94..90aac6edcd 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -51,7 +51,7 @@ extern int32_t tsCompatibleModel; extern bool tsEnableSlaveQuery; extern bool tsPrintAuth; extern int64_t tsTickPerDay[3]; -extern bool tsMultiProcess; +extern int32_t tsMultiProcess; // monitor extern bool tsEnableMonitor; diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 97a49e6b07..86fd7104df 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -72,7 +72,7 @@ void dndClose(SDnode *pDnode); * * @param pDnode The dnode object to run. */ -void dndRun(SDnode *pDnode); +int32_t dndRun(SDnode *pDnode); /** * @brief Handle event in the dnode. diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index f9a5538a01..94e2bdf7e1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -45,7 +45,7 @@ float tsRatioOfQueryCores = 1.0f; int32_t tsMaxBinaryDisplayWidth = 30; bool tsEnableSlaveQuery = 1; bool tsPrintAuth = 0; -bool tsMultiProcess = 0; +int32_t tsMultiProcess = 0; // monitor bool tsEnableMonitor = 1; @@ -340,7 +340,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; - if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; @@ -458,7 +458,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; - tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; + tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->i32; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/bnode/src/bmInt.c b/source/dnode/mgmt/bnode/src/bmInt.c index 9147c4f8b8..300dc1ec8d 100644 --- a/source/dnode/mgmt/bnode/src/bmInt.c +++ b/source/dnode/mgmt/bnode/src/bmInt.c @@ -17,8 +17,14 @@ #include "bmInt.h" #include "bmHandle.h" +bool bmRequireNode(SMgmtWrapper *pWrapper) { return false; } + + SMgmtFp bmGetMgmtFp() { SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = NULL; + mgmtFp.closeFp = NULL; + mgmtFp.requiredFp = bmRequireNode; mgmtFp.getMsgHandleFp = bmGetMsgHandle; return mgmtFp; } diff --git a/source/dnode/mgmt/dnode/inc/dndInt.h b/source/dnode/mgmt/dnode/inc/dndInt.h index 3473c9b339..dff88e75a0 100644 --- a/source/dnode/mgmt/dnode/inc/dndInt.h +++ b/source/dnode/mgmt/dnode/inc/dndInt.h @@ -70,9 +70,9 @@ typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg); -typedef SMgmtWrapper *(*MgmtOpenFp)(SDnode *pDnode, const char *path); -typedef void (*MgmtCloseFp)(SDnode *pDnode, SMgmtWrapper *pWrapper); -typedef bool (*MgmtRequiredFp)(SMgmtWrapper *pWrapper); +typedef SMgmtWrapper *(*OpenNodeFp)(SDnode *pDnode, const char *path); +typedef void (*CloseNodeFp)(SDnode *pDnode, SMgmtWrapper *pWrapper); +typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); typedef int32_t (*MgmtHandleMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg); typedef SMsgHandle (*GetMsgHandleFp)(SMgmtWrapper *pWrapper, int32_t msgIndex); @@ -211,9 +211,9 @@ typedef struct { } STransMgmt; typedef struct SMgmtFp { - MgmtOpenFp openFp; - MgmtCloseFp closeFp; - MgmtRequiredFp requiredFp; + OpenNodeFp openFp; + CloseNodeFp closeFp; + RequireNodeFp requiredFp; GetMsgHandleFp getMsgHandleFp; } SMgmtFp; @@ -239,7 +239,7 @@ typedef struct SDnode { STransMgmt tmgmt; STfs *pTfs; SMgmtFp fps[NODE_MAX]; - SMgmtWrapper mgmts[NODE_MAX]; + SMgmtWrapper wrappers[NODE_MAX]; char *path; } SDnode; diff --git a/source/dnode/mgmt/dnode/inc/dndTransport.h b/source/dnode/mgmt/dnode/inc/dndTransport.h index f6d7d97758..aa58f6c3d4 100644 --- a/source/dnode/mgmt/dnode/inc/dndTransport.h +++ b/source/dnode/mgmt/dnode/inc/dndTransport.h @@ -24,7 +24,7 @@ extern "C" { int32_t dndInitTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode); -void dndCleanupClient(SDnode *pDnode); +int32_t dndInitClient(SDnode *pDnode); int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/dnode/src/dndInt.c b/source/dnode/mgmt/dnode/src/dndInt.c index e61f49a820..b9276beb6e 100644 --- a/source/dnode/mgmt/dnode/src/dndInt.c +++ b/source/dnode/mgmt/dnode/src/dndInt.c @@ -16,9 +16,76 @@ #define _DEFAULT_SOURCE #include "dndInt.h" #include "dndHandle.h" +#include "dndTransport.h" +#include "vmInt.h" static int8_t once = DND_ENV_INIT; +int32_t dndInit() { + if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { + terrno = TSDB_CODE_REPEAT_INIT; + dError("failed to init dnode env since %s", terrstr()); + return -1; + } + + taosIgnSIGPIPE(); + taosBlockSIGPIPE(); + taosResolveCRC(); + + if (rpcInit() != 0) { + dError("failed to init rpc since %s", terrstr()); + dndCleanup(); + return -1; + } + + if (walInit() != 0) { + dError("failed to init wal since %s", terrstr()); + dndCleanup(); + return -1; + } + + SVnodeOpt vnodeOpt = {0}; + vnodeOpt.nthreads = tsNumOfCommitThreads; + vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ; + vnodeOpt.sendReqToDnodeFp = dndSendReqToDnode; + if (vnodeInit(&vnodeOpt) != 0) { + dError("failed to init vnode since %s", terrstr()); + dndCleanup(); + return -1; + } + + SMonCfg monCfg = {0}; + monCfg.maxLogs = tsMonitorMaxLogs; + monCfg.port = tsMonitorPort; + monCfg.server = tsMonitorFqdn; + monCfg.comp = tsMonitorComp; + if (monInit(&monCfg) != 0) { + dError("failed to init monitor since %s", terrstr()); + dndCleanup(); + return -1; + } + + dInfo("dnode env is initialized"); + return 0; +} + +void dndCleanup() { + if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { + dError("dnode env is already cleaned up"); + return; + } + + monCleanup(); + vnodeCleanup(); + walCleanUp(); + rpcCleanup(); + + taosStopCacheRefreshWorker(); + dInfo("dnode env is cleaned up"); +} + +SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { return &pDnode->wrappers[nodeType]; } + EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } void dndSetStatus(SDnode *pDnode, EDndStatus status) { @@ -75,76 +142,7 @@ TdFilePtr dndCheckRunning(char *dataDir) { return pFile; } -int32_t dndInit() { - if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { - terrno = TSDB_CODE_REPEAT_INIT; - dError("failed to init dnode env since %s", terrstr()); - return -1; - } - - taosIgnSIGPIPE(); - taosBlockSIGPIPE(); - taosResolveCRC(); - - if (rpcInit() != 0) { - dError("failed to init rpc since %s", terrstr()); - dndCleanup(); - return -1; - } - - if (walInit() != 0) { - dError("failed to init wal since %s", terrstr()); - dndCleanup(); - return -1; - } - - // SVnodeOpt vnodeOpt = { - // .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = - // dndSendReqToDnode}; - - // if (vnodeInit(&vnodeOpt) != 0) { - // dError("failed to init vnode since %s", terrstr()); - // dndCleanup(); - // return -1; - // } - - SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp}; - if (monInit(&monCfg) != 0) { - dError("failed to init monitor since %s", terrstr()); - dndCleanup(); - return -1; - } - - dInfo("dnode env is initialized"); - return 0; -} - -void dndCleanup() { - if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { - dError("dnode env is already cleaned up"); - return; - } - - walCleanUp(); - // vnodeCleanup(); - rpcCleanup(); - monCleanup(); - - taosStopCacheRefreshWorker(); - dInfo("dnode env is cleaned up"); -} - void dndeHandleEvent(SDnode *pDnode, EDndEvent event) { dInfo("dnode object receive event %d, data:%p", event, pDnode); pDnode->event = event; } - -SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { - return &pDnode->mgmts[nodeType]; -} - -SMgmtFp dndGetMgmtFp() { - SMgmtFp mgmtFp = {0}; - mgmtFp.getMsgHandleFp = dndGetMsgHandle; - return mgmtFp; -} diff --git a/source/dnode/mgmt/dnode/src/dndMain.c b/source/dnode/mgmt/dnode/src/dndMain.c index 06be5b9d82..2fcd00f735 100644 --- a/source/dnode/mgmt/dnode/src/dndMain.c +++ b/source/dnode/mgmt/dnode/src/dndMain.c @@ -45,7 +45,7 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) { static void dndClearMemory(SDnode *pDnode) { for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pMgmt = &pDnode->mgmts[n]; + SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; tfree(pMgmt->path); } if (pDnode->pLockFile != NULL) { @@ -120,34 +120,35 @@ SDnode *dndCreate(SDndCfg *pCfg) { goto _OVER; } - pDnode->mgmts[DNODE].fp = dndGetMgmtFp(); - pDnode->mgmts[MNODE].fp = mmGetMgmtFp(); - pDnode->mgmts[VNODES].fp = vmGetMgmtFp(); - pDnode->mgmts[QNODE].fp = qmGetMgmtFp(); - pDnode->mgmts[SNODE].fp = smGetMgmtFp(); - pDnode->mgmts[BNODE].fp = bmGetMgmtFp(); - pDnode->mgmts[MNODE].name = "mnode"; - pDnode->mgmts[VNODES].name = "vnodes"; - pDnode->mgmts[QNODE].name = "qnode"; - pDnode->mgmts[SNODE].name = "snode"; - pDnode->mgmts[BNODE].name = "bnode"; + pDnode->wrappers[DNODE].fp = dndGetMgmtFp(); + pDnode->wrappers[MNODE].fp = mmGetMgmtFp(); + pDnode->wrappers[VNODES].fp = vmGetMgmtFp(); + pDnode->wrappers[QNODE].fp = qmGetMgmtFp(); + pDnode->wrappers[SNODE].fp = smGetMgmtFp(); + pDnode->wrappers[BNODE].fp = bmGetMgmtFp(); + pDnode->wrappers[DNODE].name = "dnode"; + pDnode->wrappers[MNODE].name = "mnode"; + pDnode->wrappers[VNODES].name = "vnodes"; + pDnode->wrappers[QNODE].name = "qnode"; + pDnode->wrappers[SNODE].name = "snode"; + pDnode->wrappers[BNODE].name = "bnode"; memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg)); for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pMgmt = &pDnode->mgmts[n]; - snprintf(path, sizeof(path), "%s%s%s", pCfg->dataDir, TD_DIRSEP, pDnode->mgmts[n].name); - pMgmt->path = strdup(path); - if (pDnode->mgmts[n].path == NULL) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + snprintf(path, sizeof(path), "%s%s%s", pCfg->dataDir, TD_DIRSEP, pDnode->wrappers[n].name); + pWrapper->path = strdup(path); + if (pDnode->wrappers[n].path == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } - pMgmt->procType = PROC_SINGLE; - pMgmt->required = dndRequireNode(pMgmt); - if (pMgmt->required) { - if (taosMkDir(pMgmt->path) != 0) { + pWrapper->procType = PROC_SINGLE; + pWrapper->required = dndRequireNode(pWrapper); + if (pWrapper->required) { + if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to create dir:%s since %s", pMgmt->path, terrstr()); + dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); goto _OVER; } } @@ -158,6 +159,8 @@ SDnode *dndCreate(SDndCfg *pCfg) { goto _OVER; } + code = 0; + _OVER: if (code != 0 && pDnode) { dndClearMemory(pDnode); @@ -187,12 +190,118 @@ void dndClose(SDnode *pDnode) { dInfo("dnode object is closed, data:%p", pDnode); } -void dndRun(SDnode *pDnode) { - while (pDnode->event != DND_EVENT_STOP) { - taosMsleep(100); - } +static int32_t dndOpenNode(SDnode *pDnode, SMgmtWrapper *pWrapper) { + // if (tsMultiProcess) { + // SProcCfg cfg = {0}; + // pWrapper->pProc = taosProcInit(&cfg); + // if (taosProcIsChild(pWrapper->pProc)) { + // pWrapper->procType = PROC_CHILD; + // dInfo("node:%s, will start in child process", pWrapper->name); + // } else { + // pWrapper->procType = PROC_PARENT; + // dInfo("node:%s, will start in parent process", pWrapper->name); + // } + // } else { + // pWrapper->procType = PROC_SINGLE; + // dInfo("node:%s, will start in single process mnode", pWrapper->name); + // } + + // if (pWrapper->procType == PROC_SINGLE || pWrapper->procType == PROC_CHILD) { + // SDndInfo info; + // pWrapper->pNode = (*pWrapper->fp.openFp)(pWrapper->path, &info); + // if (pWrapper != NULL) { + // return -1; + // } + // } + + // return 0; + + pWrapper->pMgmt = (*pWrapper->fp.openFp)(pDnode, pWrapper->path); + return 0; } -void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) { - -} \ No newline at end of file +static void dndClearNodeExecpt(SDnode *pDnode, SMgmtWrapper *pWrapper){} + +static int32_t dndRunInSingleProcess(SDnode *pDnode) { + dInfo("dnode run in single process mode"); + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + dInfo("node:%s, will start in single process", pWrapper->name); + if (dndOpenNode(pDnode, pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; + } + } + return 0; +} + +static int32_t dndRunInMultiProcess(SDnode *pDnode) { + for (ENodeType n = 0; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (n == DNODE) { + dInfo("node:%s, will start in parent process", pWrapper->name); + pWrapper->procType = PROC_PARENT; + if (dndOpenNode(pDnode, pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; + } + continue; + } + + SProcCfg cfg = {0}; + SProcObj *pProc = taosProcInit(&cfg); + if (pProc == NULL) { + dError("node:%s, failed to fork since %s", pWrapper->name, terrstr()); + return -1; + } + + pWrapper->pProc = pProc; + + if (taosProcIsChild(pProc)) { + dInfo("node:%s, will start in child process", pWrapper->name); + pWrapper->procType = PROC_CHILD; + dndResetLog(pWrapper); + + dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); + dndClearNodeExecpt(pDnode, pWrapper); + + dInfo("node:%s, init trans client in child process", pWrapper->name); + dndInitClient(pDnode); + + dInfo("node:%s, will be initialized in child process", pWrapper->name); + dndOpenNode(pDnode, pWrapper); + } else { + dInfo("node:%s, will not start in parent process", pWrapper->name); + pWrapper->procType = PROC_PARENT; + dndOpenNode(pDnode, pWrapper); + } + } + + return 0; +} + +int32_t dndRun(SDnode *pDnode) { + if (tsMultiProcess == 0) { + if (dndRunInSingleProcess(pDnode) != 0) { + dError("failed to run dnode in single process mode since %s", terrstr()); + return -1; + } + } else { + if (dndRunInMultiProcess(pDnode) != 0) { + dError("failed to run dnode in multi process mode since %s", terrstr()); + return -1; + } + } + + while (1) { + if (pDnode->event != DND_EVENT_STOP) { + dInfo("dnode object receive stop event"); + break; + } + taosMsleep(100); + } + + return 0; +} + +void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) {} \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dndMgmt.c b/source/dnode/mgmt/dnode/src/dndMgmt.c index 8f71e866cc..0ceb316307 100644 --- a/source/dnode/mgmt/dnode/src/dndMgmt.c +++ b/source/dnode/mgmt/dnode/src/dndMgmt.c @@ -15,6 +15,8 @@ #define _DEFAULT_SOURCE #include "dndMgmt.h" + +#include "dndHandle.h" #include "dndMonitor.h" // #include "dndBnode.h" // #include "mm.h" @@ -442,4 +444,15 @@ void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {} void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq){} -void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} \ No newline at end of file +void dndProcessMgmtMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} + +bool dndRequireNode(SMgmtWrapper *pWrapper) { return true; } + +SMgmtFp dndGetMgmtFp() { + SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = NULL; + mgmtFp.closeFp = NULL; + mgmtFp.requiredFp = dndRequireNode; + mgmtFp.getMsgHandleFp = dndGetMsgHandle; + return mgmtFp; +} diff --git a/source/dnode/mgmt/dnode/src/dndTransport.c b/source/dnode/mgmt/dnode/src/dndTransport.c index 00971bd934..94aad909f2 100644 --- a/source/dnode/mgmt/dnode/src/dndTransport.c +++ b/source/dnode/mgmt/dnode/src/dndTransport.c @@ -13,12 +13,6 @@ * along with this program. If not, see . */ -/* this file is mainly responsible for the communication between DNODEs. Each - * dnode works as both server and client. Dnode may send status, grant, config - * messages to mnode, mnode may send create/alter/drop table/vnode messages - * to dnode. All theses messages are handled from here - */ - #define _DEFAULT_SOURCE #include "dndTransport.h" #include "dndMgmt.h" @@ -52,7 +46,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { } } -static int32_t dndInitClient(SDnode *pDnode) { + int32_t dndInitClient(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; SRpcInit rpcInit; @@ -256,7 +250,7 @@ static int32_t dndSetMsgHandle(SDnode *pDnode) { STransMgmt *pMgmt = &pDnode->tmgmt; for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) { - SMgmtWrapper *pWrapper = &pDnode->mgmts[nodeType]; + SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; GetMsgHandleFp getMsgHandleFp = pDnode->fps[nodeType].getMsgHandleFp; if (getMsgHandleFp == NULL) continue; diff --git a/source/dnode/mgmt/exec/src/dndExec.c b/source/dnode/mgmt/exec/src/dndExec.c index 0621bfa48f..c60f07f409 100644 --- a/source/dnode/mgmt/exec/src/dndExec.c +++ b/source/dnode/mgmt/exec/src/dndExec.c @@ -82,14 +82,14 @@ static int32_t dndRunDnode() { } dInfo("start the TDengine service"); - dndRun(pDnode); + int32_t code = dndRun(pDnode); dInfo("start shutting down the TDengine service"); dndClose(pDnode); dndCleanup(); taosCloseLog(); taosCleanupCfg(); - return 0; + return code; } int main(int argc, char const *argv[]) { diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index 58957211a0..288a159e07 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -17,13 +17,18 @@ #include "mmInt.h" #include "mmHandle.h" +bool mmRequireNode(SMgmtWrapper *pWrapper) { return false; } + + SMgmtFp mmGetMgmtFp() { SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = NULL; + mgmtFp.closeFp = NULL; + mgmtFp.requiredFp = mmRequireNode; mgmtFp.getMsgHandleFp = mmGetMsgHandle; return mgmtFp; } - int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) { return 0; } diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index ba172a9a72..3b2d3cadbd 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -17,8 +17,13 @@ #include "qmInt.h" #include "qmHandle.h" +bool qmRequireNode(SMgmtWrapper *pWrapper) { return false; } + SMgmtFp qmGetMgmtFp() { SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = NULL; + mgmtFp.closeFp = NULL; + mgmtFp.requiredFp = qmRequireNode; mgmtFp.getMsgHandleFp = qmGetMsgHandle; return mgmtFp; } diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c index 840315364b..3a7498edbd 100644 --- a/source/dnode/mgmt/snode/src/smInt.c +++ b/source/dnode/mgmt/snode/src/smInt.c @@ -17,8 +17,14 @@ #include "smInt.h" #include "smHandle.h" +bool smRequireNode(SMgmtWrapper *pWrapper) { return false; } + + SMgmtFp smGetMgmtFp() { SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = NULL; + mgmtFp.closeFp = NULL; + mgmtFp.requiredFp = smRequireNode; mgmtFp.getMsgHandleFp = smGetMsgHandle; return mgmtFp; } diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 44bc6df368..ec4ccf4ed3 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -17,8 +17,14 @@ #include "vmInt.h" #include "vmHandle.h" +bool vmRequireNode(SMgmtWrapper *pWrapper) { return false; } + + SMgmtFp vmGetMgmtFp() { SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = NULL; + mgmtFp.closeFp = NULL; + mgmtFp.requiredFp = vmRequireNode; mgmtFp.getMsgHandleFp = vmGetMsgHandle; return mgmtFp; } diff --git a/source/dnode/mgmt/vnode/src/vmMgmt.c b/source/dnode/mgmt/vnode/src/vmMgmt.c index 5240f41ba5..03bd84779c 100644 --- a/source/dnode/mgmt/vnode/src/vmMgmt.c +++ b/source/dnode/mgmt/vnode/src/vmMgmt.c @@ -14,9 +14,9 @@ */ #define _DEFAULT_SOURCE -// #include "dndVnodes.h" -// #include "dndMgmt.h" -// #include "dndTransport.h" +#include "vmMgmt.h" +#include "dndMgmt.h" +#include "dndTransport.h" // #include "sync.h" #if 0 @@ -1024,4 +1024,35 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; } -#endif \ No newline at end of file +#endif + +// int32_t dndInitVnodes(SDnode *pDnode) { + +// SVnodeOpt vnodeOpt = {0}; + +// vnodeOpt.nthreads = tsNumOfCommitThreads; +// vnodeOpt.putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = +// dndSendReqToDnode}; + +// if (vnodeInit(&vnodeOpt) != 0) { +// dError("failed to init vnode since %s", terrstr()); +// dndCleanup(); +// return -1; +// // } + + +// if (walInit() != 0) { +// dError("failed to init wal since %s", terrstr()); +// dndCleanup(); +// return -1; +// } +// } + + +// void dndCleanupVnodes(SDnode *pDnode) { +// // vnodeCleanup(); + + // walCleanUp(); +// } + +int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq){return 0;} \ No newline at end of file