diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 5ce4b16300..54a145ff33 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -25,23 +25,25 @@ extern "C" { typedef struct SRpcMsg SRpcMsg; typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; +typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); +typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); -typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; - typedef struct { SMgmtWrapper* pWrapper; PutToQueueFp queueFps[QUEUE_MAX]; + GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; } SMsgCb; int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); +int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index b13f727db6..98ee1b679d 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -20,6 +20,10 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); } +int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { + return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype); +} + int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index c78d04ffb5..3f1f62ebd5 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -44,6 +44,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // qmWorker.c int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index 40959192c3..2bbfbd83f5 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; + msgCb.qsizeFp = qmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index c828d4f3c3..19686e2a76 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -83,6 +83,23 @@ int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc); } +int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { + int32_t size = -1; + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + switch (qtype) { + case QUERY_QUEUE: + size = taosQueueSize(pMgmt->queryWorker.queue); + break; + case FETCH_QUEUE: + size = taosQueueSize(pMgmt->fetchWorker.queue); + break; + default: + break; + } + + return size; +} + int32_t qmStartWorker(SQnodeMgmt *pMgmt) { int32_t maxFetchThreads = 4; int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index 38a477178f..ce1a004b65 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -104,6 +104,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 1c6c7d089e..a324c60618 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -133,6 +133,7 @@ static void *vmOpenVnodeFunc(void *param) { msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; + msgCb.qsizeFp = vmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 9f86351985..e4a4cfcd9f 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -87,6 +87,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; + msgCb.qsizeFp = vmGetQueueSize; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 2b9f31c867..dd398cb202 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -258,6 +258,34 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); } +int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { + int32_t size = -1; + SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId); + if (pVnode != NULL) { + switch (qtype) { + case QUERY_QUEUE: + size = taosQueueSize(pVnode->pQueryQ); + break; + case FETCH_QUEUE: + size = taosQueueSize(pVnode->pFetchQ); + break; + case WRITE_QUEUE: + size = taosQueueSize(pVnode->pWriteQ); + break; + case SYNC_QUEUE: + size = taosQueueSize(pVnode->pSyncQ); + break; + case APPLY_QUEUE: + size = taosQueueSize(pVnode->pApplyQ); + break; + default: + break; + } + } + vmReleaseVnode(pWrapper->pMgmt, pVnode); + return size; +} + int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);