From 12d27bfdd3c8b4d678dfda49e610e9be60772e21 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 21 May 2022 17:57:39 +0800 Subject: [PATCH] refactor: rpc msg handler --- include/dnode/qnode/qnode.h | 1 - source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 6 +--- source/dnode/mnode/impl/src/mndQuery.c | 35 ++++++--------------- source/dnode/qnode/src/qnode.c | 18 ++--------- 4 files changed, 13 insertions(+), 47 deletions(-) diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 89553f978b..1ab101f705 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -72,7 +72,6 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); * @param pMsg The request message */ int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg); -int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index e30b683d19..35c94b7fbe 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -35,12 +35,8 @@ static void qmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { case TDMT_MON_QM_INFO: code = qmProcessGetMonitorInfoReq(pMgmt, pMsg); break; - case TDMT_VND_QUERY: - case TDMT_VND_QUERY_CONTINUE: - code = qndProcessQueryMsg(pMgmt->pQnode, pMsg); - break; default: - code = qndProcessFetchMsg(pMgmt->pQnode, pMsg); + code = qndProcessQueryMsg(pMgmt->pQnode, pMsg); break; } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 0c0d771a24..78b70c9a74 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -18,34 +18,19 @@ #include "mndMnode.h" #include "qworker.h" -int32_t mndProcessQueryMsg(SRpcMsg *pReq) { +int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t code = -1; - SMnode *pMnode = pReq->info.node; + SMnode *pMnode = pMsg->info.node; SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb}; - mTrace("msg:%p, in query queue is processing", pReq); - switch (pReq->msgType) { + mTrace("msg:%p, in query queue is processing", pMsg); + switch (pMsg->msgType) { case TDMT_VND_QUERY: - code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pReq); + code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pMsg); break; case TDMT_VND_QUERY_CONTINUE: - code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pReq); + code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg); break; - default: - terrno = TSDB_CODE_VND_APP_ERROR; - mError("unknown msg type:%d in query queue", pReq->msgType); - } - - if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; - return code; -} - -int32_t mndProcessFetchMsg(SRpcMsg *pMsg) { - int32_t code = -1; - SMnode *pMnode = pMsg->info.node; - mTrace("msg:%p, in fetch queue is processing", pMsg); - - switch (pMsg->msgType) { case TDMT_VND_FETCH: code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg); break; @@ -57,7 +42,7 @@ int32_t mndProcessFetchMsg(SRpcMsg *pMsg) { break; default: terrno = TSDB_CODE_VND_APP_ERROR; - mError("unknown msg type:%d in fetch queue", pMsg->msgType); + mError("unknown msg type:%d in query queue", pMsg->msgType); } if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; @@ -72,9 +57,9 @@ int32_t mndInitQuery(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg); - mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessFetchMsg); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessFetchMsg); - mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessFetchMsg); + mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessQueryMsg); + mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessQueryMsg); return 0; } diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index af25be8cdf..929643fcdf 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -45,7 +45,7 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; } int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { int32_t code = -1; SReadHandle handle = {.pMsgCb = &pQnode->msgCb}; - qTrace("message in qnode query queue is processing"); + qTrace("message in qnode queue is processing"); switch (pMsg->msgType) { case TDMT_VND_QUERY: @@ -54,20 +54,6 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { case TDMT_VND_QUERY_CONTINUE: code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg); break; - default: - qError("unknown msg type:%d in query queue", pMsg->msgType); - terrno = TSDB_CODE_VND_APP_ERROR; - } - - if (code == 0) return TSDB_CODE_ACTION_IN_PROGRESS; - return code; -} - -int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { - int32_t code = -1; - qTrace("message in fetch queue is processing"); - - switch (pMsg->msgType) { case TDMT_VND_FETCH: code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg); break; @@ -96,7 +82,7 @@ int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) { code = qWorkerProcessHbMsg(pQnode, pQnode->pQuery, pMsg); break; default: - qError("unknown msg type:%d in fetch queue", pMsg->msgType); + qError("unknown msg type:%d in qnode queue", pMsg->msgType); terrno = TSDB_CODE_VND_APP_ERROR; }