diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index df6cdc10cf..f487859fd6 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -737,15 +737,9 @@ int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return 0; } -static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { - SRpcMsg *pRsp = NULL; - vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp); -} +static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); } -static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { - SRpcMsg *pRsp = NULL; - vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp); -} +static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); } static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 0f318dea0b..3a1e5b9c95 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -318,7 +318,7 @@ int tqRegisterContext(STqGroup*, void* ahandle); int tqSendLaunchQuery(STqMsgItem*, int64_t offset); #endif -int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); +int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); #ifdef __cplusplus diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9e6ecb6e23..2accfd6279 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -159,20 +159,18 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); * * @param pVnode The vnode object. * @param pMsg The request message - * @param pRsp The response message * @return int 0 for success, -1 for failure */ -int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); /** * @brief Process a fetch message. * * @param pVnode The vnode object. * @param pMsg The request message - * @param pRsp The response message * @return int 0 for success, -1 for failure */ -int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg); /* ------------------------ SVnodeCfg ------------------------ */ /** diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f3ebd5b284..2b5a995988 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -667,8 +667,8 @@ int tqItemSSize() { } #endif -int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { - SMqConsumeReq* pReq = pMsg->pCont; +int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { + SMqConsumeReq* pReq = pMsg->pCont; SRpcMsg rpcMsg; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index f619499c8d..48ff7252be 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -17,14 +17,14 @@ #include "vnd.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); -static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, (putReqToQueryQFp)vnodePutReqToVQueryQ); } -int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in query queue is processing"); switch (pMsg->msgType) { @@ -38,7 +38,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } -int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in fetch queue is processing"); switch (pMsg->msgType) { case TDMT_VND_FETCH: @@ -57,16 +57,16 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return vnodeGetTableList(pVnode, pMsg); // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); case TDMT_VND_TABLE_META: - return vnodeGetTableMeta(pVnode, pMsg, pRsp); + return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: - return tqProcessConsumeReq(pVnode->pTq, pMsg, pRsp); + return tqProcessConsumeReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; } } -static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { +static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont); STbCfg * pTbCfg = NULL; STbCfg * pStbCfg = NULL;