diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c index 2d2a89c181..1ab8b5ef34 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmWorker.c @@ -134,18 +134,16 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { return -1; } - if (tsMultiProcess) { - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "bnode-monitor", - .fp = (FItem)bmProcessMonitorQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start bnode-monitor worker since %s", terrstr()); - return -1; - } + SSingleWorkerCfg mCfg = { + .min = 1, + .max = 1, + .name = "bnode-monitor", + .fp = (FItem)bmProcessMonitorQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start bnode-monitor worker since %s", terrstr()); + return -1; } dDebug("bnode workers are initialized"); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index ccb0c5fe8d..7f4b12bcf2 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -52,6 +52,9 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) pMgmt->data.supportVnodes = pInput->supportVnodes; pMgmt->data.serverPort = pInput->serverPort; pMgmt->pDnode = pInput->pDnode; + pMgmt->msgCb = pInput->msgCb; + pMgmt->path = pInput->path; + pMgmt->name = pInput->name; pMgmt->processCreateNodeFp = pInput->processCreateNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->isNodeDeployedFp = pInput->isNodeDeployedFp; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 52756aa7ce..57b04b68f3 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -176,18 +176,16 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return -1; } - if (tsMultiProcess) { - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "mnode-monitor", - .fp = (FItem)mmProcessQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start mnode mnode-monitor worker since %s", terrstr()); - return -1; - } + SSingleWorkerCfg mCfg = { + .min = 1, + .max = 1, + .name = "mnode-monitor", + .fp = (FItem)mmProcessQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start mnode mnode-monitor worker since %s", terrstr()); + return -1; } dDebug("mnode workers are initialized"); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 33df1b6786..c95a4e8292 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -165,18 +165,16 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { return -1; } - if (tsMultiProcess) { - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "qnode-monitor", - .fp = (FItem)qmProcessMonitorQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start qnode-monitor worker since %s", terrstr()); - return -1; - } + SSingleWorkerCfg mCfg = { + .min = 1, + .max = 1, + .name = "qnode-monitor", + .fp = (FItem)qmProcessMonitorQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start qnode-monitor worker since %s", terrstr()); + return -1; } dDebug("qnode workers are initialized"); diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index 7bee965a4b..90e20f5fc5 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -121,18 +121,16 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } - if (tsMultiProcess) { - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "snode-monitor", - .fp = (FItem)smProcessMonitorQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start snode-monitor worker since %s", terrstr()); - return -1; - } + SSingleWorkerCfg mCfg = { + .min = 1, + .max = 1, + .name = "snode-monitor", + .fp = (FItem)smProcessMonitorQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start snode-monitor worker since %s", terrstr()); + return -1; } dDebug("snode workers are initialized"); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 00ad137db6..cf8f4d5010 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -496,18 +496,16 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { return -1; } - if (tsMultiProcess) { - SSingleWorkerCfg mCfg = { - .min = 1, - .max = 1, - .name = "vnode-monitor", - .fp = (FItem)vmProcessMgmtMonitorQueue, - .param = pMgmt, - }; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { - dError("failed to start mnode vnode-monitor worker since %s", terrstr()); - return -1; - } + SSingleWorkerCfg mCfg = { + .min = 1, + .max = 1, + .name = "vnode-monitor", + .fp = (FItem)vmProcessMgmtMonitorQueue, + .param = pMgmt, + }; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { + dError("failed to start mnode vnode-monitor worker since %s", terrstr()); + return -1; } dDebug("vnode workers are initialized"); diff --git a/source/dnode/mgmt/node_mgmt/src/dmObj.c b/source/dnode/mgmt/node_mgmt/src/dmObj.c index d34f0b0bcc..e257004352 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmObj.c +++ b/source/dnode/mgmt/node_mgmt/src/dmObj.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "dmImp.h" +static bool dmIsNodeDeployedFp(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; } + static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { pDnode->input.dnodeId = 0; pDnode->input.clusterId = 0; @@ -29,6 +31,9 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { pDnode->input.disks = pOption->disks; pDnode->input.dataDir = strdup(pOption->dataDir); pDnode->input.pDnode = pDnode; + pDnode->input.processCreateNodeFp = dmProcessCreateNodeReq; + pDnode->input.processDropNodeFp = dmProcessDropNodeReq; + pDnode->input.isNodeDeployedFp = dmIsNodeDeployedFp; if (pDnode->input.dataDir == NULL || pDnode->input.localEp == NULL || pDnode->input.localFqdn == NULL || pDnode->input.firstEp == NULL || pDnode->input.secondEp == NULL) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 57d9136467..486eef78d6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -84,7 +84,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe if (pWrapper->procType != DND_PROC_PARENT) { dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user); - code = (*msgFp)(pWrapper, pMsg); + code = (*msgFp)(pWrapper->pMgmt, pMsg); } else { dTrace("msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->rpcMsg.code & 0XFFFF, pMsg->user, pRpc->contLen); @@ -335,7 +335,7 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - int32_t code = (*msgFp)(pWrapper, pMsg); + int32_t code = (*msgFp)(pWrapper->pMgmt, pMsg); if (code != 0) { dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index fa15d1e898..b728719af6 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -83,7 +83,7 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); -typedef int8_t (*IsNodeDeployedFp)(struct SDnode *pDnode, EDndNodeType ntype); +typedef bool (*IsNodeDeployedFp)(struct SDnode *pDnode, EDndNodeType ntype); typedef struct { const char *path;