get queue size
This commit is contained in:
parent
c6d23422e9
commit
b36356ae57
|
@ -25,23 +25,25 @@ extern "C" {
|
||||||
typedef struct SRpcMsg SRpcMsg;
|
typedef struct SRpcMsg SRpcMsg;
|
||||||
typedef struct SEpSet SEpSet;
|
typedef struct SEpSet SEpSet;
|
||||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||||
|
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
|
||||||
|
|
||||||
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
||||||
|
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype);
|
||||||
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
|
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
|
||||||
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
||||||
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
|
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
|
||||||
|
|
||||||
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMgmtWrapper* pWrapper;
|
SMgmtWrapper* pWrapper;
|
||||||
PutToQueueFp queueFps[QUEUE_MAX];
|
PutToQueueFp queueFps[QUEUE_MAX];
|
||||||
|
GetQueueSizeFp qsizeFp;
|
||||||
SendReqFp sendReqFp;
|
SendReqFp sendReqFp;
|
||||||
SendMnodeReqFp sendMnodeReqFp;
|
SendMnodeReqFp sendMnodeReqFp;
|
||||||
SendRspFp sendRspFp;
|
SendRspFp sendRspFp;
|
||||||
} SMsgCb;
|
} SMsgCb;
|
||||||
|
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
||||||
|
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
|
||||||
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
||||||
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
|
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
|
||||||
|
|
|
@ -20,6 +20,10 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
|
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
||||||
|
return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) {
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
|
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||||
// qmWorker.c
|
// qmWorker.c
|
||||||
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype);
|
||||||
|
|
||||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt);
|
int32_t qmStartWorker(SQnodeMgmt *pMgmt);
|
||||||
void qmStopWorker(SQnodeMgmt *pMgmt);
|
void qmStopWorker(SQnodeMgmt *pMgmt);
|
||||||
|
|
|
@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
|
||||||
msgCb.pWrapper = pMgmt->pWrapper;
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
|
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
|
||||||
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
|
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
|
||||||
|
msgCb.qsizeFp = qmGetQueueSize;
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
|
|
@ -83,6 +83,23 @@ int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
|
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
||||||
|
int32_t size = -1;
|
||||||
|
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
switch (qtype) {
|
||||||
|
case QUERY_QUEUE:
|
||||||
|
size = taosQueueSize(pMgmt->queryWorker.queue);
|
||||||
|
break;
|
||||||
|
case FETCH_QUEUE:
|
||||||
|
size = taosQueueSize(pMgmt->fetchWorker.queue);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
||||||
int32_t maxFetchThreads = 4;
|
int32_t maxFetchThreads = 4;
|
||||||
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
||||||
|
|
|
@ -104,6 +104,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
||||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype);
|
||||||
|
|
||||||
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
|
|
|
@ -133,6 +133,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||||
|
msgCb.qsizeFp = vmGetQueueSize;
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
|
|
@ -87,6 +87,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||||
|
msgCb.qsizeFp = vmGetQueueSize;
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
|
|
@ -258,6 +258,34 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
|
return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
||||||
|
int32_t size = -1;
|
||||||
|
SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
|
||||||
|
if (pVnode != NULL) {
|
||||||
|
switch (qtype) {
|
||||||
|
case QUERY_QUEUE:
|
||||||
|
size = taosQueueSize(pVnode->pQueryQ);
|
||||||
|
break;
|
||||||
|
case FETCH_QUEUE:
|
||||||
|
size = taosQueueSize(pVnode->pFetchQ);
|
||||||
|
break;
|
||||||
|
case WRITE_QUEUE:
|
||||||
|
size = taosQueueSize(pVnode->pWriteQ);
|
||||||
|
break;
|
||||||
|
case SYNC_QUEUE:
|
||||||
|
size = taosQueueSize(pVnode->pSyncQ);
|
||||||
|
break;
|
||||||
|
case APPLY_QUEUE:
|
||||||
|
size = taosQueueSize(pVnode->pApplyQ);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
vmReleaseVnode(pWrapper->pMgmt, pVnode);
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
|
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
|
||||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
|
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
|
||||||
|
|
Loading…
Reference in New Issue