diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 54a145ff33..784ec6234b 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -25,7 +25,7 @@ extern "C" { typedef struct SRpcMsg SRpcMsg; typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; -typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; +typedef enum { QUERY_QUEUE, FETCH_QUEUE, READ_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index d57088474f..cd4585048b 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -28,6 +28,7 @@ typedef struct SMnodeMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; const char *path; + SSingleWorker queryWorker; SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; @@ -57,11 +58,13 @@ void mmStopWorker(SMnodeMgmt *pMgmt); int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); +int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); #ifdef __cplusplus } #endif -#endif /*_TD_DND_MNODE_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_MNODE_INT_H_*/ diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index 1f60007be1..591bc5aad7 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -45,7 +45,8 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { SMsgCb msgCb = {0}; msgCb.pWrapper = pMgmt->pWrapper; - msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToReadQueue; + msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; + msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; @@ -258,4 +259,4 @@ int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo SMonGrantInfo *pGrantInfo) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); -} \ No newline at end of file +} diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index d45a6c54f3..d04077baf8 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -156,10 +156,10 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessReadMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessReadMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)mmProcessReadMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); } diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index ef0dc5923a..1b408bbde6 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -61,6 +61,10 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); } +int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); +} + static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { @@ -90,11 +94,21 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc); } +int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc); +} + + int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; - SSingleWorkerCfg readCfg = {.minNum = 2, .maxNum = 2, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->readWorker, &readCfg) != 0) { + if (tSingleWorkerInit(&pMgmt->queryWorker, &cfg) != 0) { + dError("failed to start mnode-query worker since %s", terrstr()); + return -1; + } + + if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) { dError("failed to start mnode-read worker since %s", terrstr()); return -1; } @@ -115,6 +129,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->readWorker); + tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->syncWorker); dDebug("mnode workers are closed"); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 754bed030b..2a58968511 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -80,7 +80,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) { .pCont = pReq, .contLen = contLen, }; - tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); @@ -92,7 +92,7 @@ static void mndPullupTelem(void *param, void *tmrId) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; - tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);