refactor: rpc msg handler
This commit is contained in:
parent
1146196fd3
commit
d5fabd5375
|
@ -56,24 +56,6 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
int32_t code = -1;
|
||||
dTrace("msg:%p, get from mnode-query queue", pMsg);
|
||||
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
code = mndProcessMsg(pMsg);
|
||||
|
||||
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != 0) {
|
||||
if (terrno != 0) code = terrno;
|
||||
mmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
|
||||
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
|
||||
taosWriteQitem(pWorker->queue, pMsg);
|
||||
|
@ -131,7 +113,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
.min = tsNumOfMnodeQueryThreads,
|
||||
.max = tsNumOfMnodeQueryThreads,
|
||||
.name = "mnode-query",
|
||||
.fp = (FItem)mmProcessQueryQueue,
|
||||
.fp = (FItem)mmProcessQueue,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &qCfg) != 0) {
|
||||
|
|
|
@ -19,36 +19,49 @@
|
|||
#include "qworker.h"
|
||||
|
||||
int32_t mndProcessQueryMsg(SRpcMsg *pReq) {
|
||||
int32_t code = -1;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SReadHandle handle = {.mnd = pMnode, .pMsgCb = &pMnode->msgCb};
|
||||
|
||||
mTrace("msg:%p, in query queue is processing", pReq);
|
||||
switch (pReq->msgType) {
|
||||
case TDMT_VND_QUERY:
|
||||
return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pReq);
|
||||
code = qWorkerProcessQueryMsg(&handle, pMnode->pQuery, pReq);
|
||||
break;
|
||||
case TDMT_VND_QUERY_CONTINUE:
|
||||
return qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pReq);
|
||||
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pReq);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_VND_APP_ERROR;
|
||||
mError("unknown msg type:%d in query queue", pReq->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
}
|
||||
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndProcessFetchMsg(SRpcMsg *pMsg) {
|
||||
int32_t code = -1;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
mTrace("msg:%p, in fetch queue is processing", pMsg);
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_VND_FETCH:
|
||||
return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg);
|
||||
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg);
|
||||
break;
|
||||
case TDMT_VND_DROP_TASK:
|
||||
return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg);
|
||||
code = qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg);
|
||||
break;
|
||||
case TDMT_VND_QUERY_HEARTBEAT:
|
||||
return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg);
|
||||
code = qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg);
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_VND_APP_ERROR;
|
||||
mError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||
return TSDB_CODE_VND_APP_ERROR;
|
||||
}
|
||||
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndInitQuery(SMnode *pMnode) {
|
||||
|
|
Loading…
Reference in New Issue