refactor: rpc msg handler
This commit is contained in:
parent
12d27bfdd3
commit
f4c3141821
|
@ -47,10 +47,8 @@ static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
|
|
||||||
static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SBnodeMgmt *pMgmt = pInfo->ahandle;
|
SBnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
|
int32_t code = -1;
|
||||||
dTrace("msg:%p, get from bnode-monitor queue", pMsg);
|
dTrace("msg:%p, get from bnode-monitor queue", pMsg);
|
||||||
SRpcMsg *pRpc = pMsg;
|
|
||||||
int32_t code = -1;
|
|
||||||
|
|
||||||
if (pMsg->msgType == TDMT_MON_BM_INFO) {
|
if (pMsg->msgType == TDMT_MON_BM_INFO) {
|
||||||
code = bmProcessGetMonBmInfoReq(pMgmt, pMsg);
|
code = bmProcessGetMonBmInfoReq(pMgmt, pMsg);
|
||||||
|
@ -58,13 +56,13 @@ static void bmProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRpc->msgType & 1U) {
|
if (IsReq(pMsg)) {
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
bmSendRsp(pMsg, code);
|
bmSendRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
static void *dmStatusThreadFp(void *param) {
|
static void *dmStatusThreadFp(void *param) {
|
||||||
SDnodeMgmt *pMgmt = param;
|
SDnodeMgmt *pMgmt = param;
|
||||||
int64_t lastTime = taosGetTimestampMs();
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
|
||||||
setThreadName("dnode-status");
|
setThreadName("dnode-status");
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -40,7 +39,6 @@ static void *dmStatusThreadFp(void *param) {
|
||||||
static void *dmMonitorThreadFp(void *param) {
|
static void *dmMonitorThreadFp(void *param) {
|
||||||
SDnodeMgmt *pMgmt = param;
|
SDnodeMgmt *pMgmt = param;
|
||||||
int64_t lastTime = taosGetTimestampMs();
|
int64_t lastTime = taosGetTimestampMs();
|
||||||
|
|
||||||
setThreadName("dnode-monitor");
|
setThreadName("dnode-monitor");
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -103,11 +101,9 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
|
||||||
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
tmsg_t msgType = pMsg->msgType;
|
dTrace("msg:%p, will be processed in dnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
bool isRequest = msgType & 1u;
|
|
||||||
dTrace("msg:%p, will be processed in dnode-mgmt queue, type:%s", pMsg, TMSG_INFO(msgType));
|
|
||||||
|
|
||||||
switch (msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_DND_CONFIG_DNODE:
|
case TDMT_DND_CONFIG_DNODE:
|
||||||
code = dmProcessConfigReq(pMgmt, pMsg);
|
code = dmProcessConfigReq(pMgmt, pMsg);
|
||||||
break;
|
break;
|
||||||
|
@ -149,7 +145,7 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isRequest) {
|
if (IsReq(pMsg)) {
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = {
|
||||||
.code = code,
|
.code = code,
|
||||||
|
|
|
@ -28,10 +28,8 @@ static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
|
|
||||||
static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
|
int32_t code = -1;
|
||||||
dTrace("msg:%p, get from snode-monitor queue", pMsg);
|
dTrace("msg:%p, get from snode-monitor queue", pMsg);
|
||||||
SRpcMsg *pRpc = pMsg;
|
|
||||||
int32_t code = -1;
|
|
||||||
|
|
||||||
if (pMsg->msgType == TDMT_MON_SM_INFO) {
|
if (pMsg->msgType == TDMT_MON_SM_INFO) {
|
||||||
code = smProcessGetMonitorInfoReq(pMgmt, pMsg);
|
code = smProcessGetMonitorInfoReq(pMgmt, pMsg);
|
||||||
|
@ -39,13 +37,13 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRpc->msgType & 1U) {
|
if (IsReq(pMsg)) {
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
smSendRsp(pMsg, code);
|
smSendRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void vmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
SVnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
dTrace("msg:%p, get from vnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
|
@ -92,7 +92,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnodeObj *pVnode = pInfo->ahandle;
|
SVnodeObj *pVnode = pInfo->ahandle;
|
||||||
SArray * pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
|
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
|
||||||
return;
|
return;
|
||||||
|
@ -222,8 +222,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) {
|
||||||
SRpcMsg * pRpc = pMsg;
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
SMsgHead *pHead = pRpc->pCont;
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
pHead->contLen = ntohl(pHead->contLen);
|
pHead->contLen = ntohl(pHead->contLen);
|
||||||
|
@ -237,23 +236,23 @@ static int32_t vmPutNodeMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType
|
||||||
|
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
dTrace("msg:%p, put into vnode-query worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
dTrace("msg:%p, put into vnode-fetch worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
taosWriteQitem(pVnode->pFetchQ, pMsg);
|
taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case WRITE_QUEUE:
|
case WRITE_QUEUE:
|
||||||
dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
dTrace("msg:%p, put into vnode-write worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case SYNC_QUEUE:
|
case SYNC_QUEUE:
|
||||||
dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
dTrace("msg:%p, put into vnode-sync worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case MERGE_QUEUE:
|
case MERGE_QUEUE:
|
||||||
dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
dTrace("msg:%p, put into vnode-merge worker, type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
taosWriteQitem(pVnode->pMergeQ, pMsg);
|
taosWriteQitem(pVnode->pMergeQ, pMsg);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -301,7 +300,7 @@ int32_t vmPutNodeMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
|
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) {
|
||||||
SMsgHead * pHead = pRpc->pCont;
|
SMsgHead *pHead = pRpc->pCont;
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) return -1;
|
if (pVnode == NULL) return -1;
|
||||||
|
|
||||||
|
@ -469,7 +468,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
.min = 1,
|
.min = 1,
|
||||||
.max = 1,
|
.max = 1,
|
||||||
.name = "vnode-mgmt",
|
.name = "vnode-mgmt",
|
||||||
.fp = (FItem)vmProcessMgmtMonitorQueue,
|
.fp = (FItem)vmProcessQueue,
|
||||||
.param = pMgmt,
|
.param = pMgmt,
|
||||||
};
|
};
|
||||||
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
|
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
|
||||||
|
@ -481,7 +480,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
.min = 1,
|
.min = 1,
|
||||||
.max = 1,
|
.max = 1,
|
||||||
.name = "vnode-monitor",
|
.name = "vnode-monitor",
|
||||||
.fp = (FItem)vmProcessMgmtMonitorQueue,
|
.fp = (FItem)vmProcessQueue,
|
||||||
.param = pMgmt,
|
.param = pMgmt,
|
||||||
};
|
};
|
||||||
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
|
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
|
||||||
|
|
|
@ -137,7 +137,6 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper);
|
||||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
void dmSetStatus(SDnode *pDnode, EDndRunStatus stype);
|
||||||
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
|
void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg);
|
||||||
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
|
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pMsg);
|
||||||
void dmProcessFetchRsp(SRpcMsg *pMsg);
|
|
||||||
|
|
||||||
// dmNodes.c
|
// dmNodes.c
|
||||||
int32_t dmOpenNode(SMgmtWrapper *pWrapper);
|
int32_t dmOpenNode(SMgmtWrapper *pWrapper);
|
||||||
|
|
|
@ -314,8 +314,3 @@ void dmProcessServerStartupStatus(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
rpcSendResponse(&rsp);
|
rpcSendResponse(&rsp);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmProcessFetchRsp(SRpcMsg *pMsg) {
|
|
||||||
qWorkerProcessFetchRsp(NULL, NULL, pMsg);
|
|
||||||
// rpcFreeCont(pMsg->pCont);
|
|
||||||
}
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
|
#include "qworker.h"
|
||||||
|
|
||||||
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
|
||||||
static void dmSendRsp(SRpcMsg *pMsg);
|
static void dmSendRsp(SRpcMsg *pMsg);
|
||||||
|
@ -61,7 +62,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
dmProcessNetTestReq(pDnode, pRpc);
|
dmProcessNetTestReq(pDnode, pRpc);
|
||||||
return;
|
return;
|
||||||
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
|
} else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) {
|
||||||
dmProcessFetchRsp(pRpc);
|
qWorkerProcessFetchRsp(NULL, NULL, pRpc);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,9 +147,6 @@ _err:
|
||||||
|
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
vTrace("message in vnode query queue is processing");
|
vTrace("message in vnode query queue is processing");
|
||||||
#if 0
|
|
||||||
SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
|
||||||
#endif
|
|
||||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_QUERY:
|
case TDMT_VND_QUERY:
|
||||||
|
|
Loading…
Reference in New Issue