Merge pull request #10012 from taosdata/feature/mnode

adjust func paras
This commit is contained in:
Shengliang Guan 2022-01-25 14:56:10 +08:00 committed by GitHub
commit ab83312126
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 16 additions and 24 deletions

View File

@ -737,15 +737,9 @@ int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return 0; return 0;
} }
static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); }
SRpcMsg *pRsp = NULL;
vnodeProcessQueryReq(pVnode->pImpl, pMsg, &pRsp);
}
static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); }
SRpcMsg *pRsp = NULL;
vnodeProcessFetchReq(pVnode->pImpl, pMsg, &pRsp);
}
static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void dndProcessVnodeWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *));

View File

@ -318,8 +318,8 @@ int tqRegisterContext(STqGroup*, void* ahandle);
int tqSendLaunchQuery(STqMsgItem*, int64_t offset); int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
#endif #endif
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -159,20 +159,18 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
* *
* @param pVnode The vnode object. * @param pVnode The vnode object.
* @param pMsg The request message * @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure * @return int 0 for success, -1 for failure
*/ */
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
/** /**
* @brief Process a fetch message. * @brief Process a fetch message.
* *
* @param pVnode The vnode object. * @param pVnode The vnode object.
* @param pMsg The request message * @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure * @return int 0 for success, -1 for failure
*/ */
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg);
/* ------------------------ SVnodeCfg ------------------------ */ /* ------------------------ SVnodeCfg ------------------------ */
/** /**

View File

@ -667,8 +667,8 @@ int tqItemSSize() {
} }
#endif #endif
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeReq* pReq = pMsg->pCont; SMqConsumeReq* pReq = pMsg->pCont;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int64_t reqId = pReq->reqId; int64_t reqId = pReq->reqId;
int64_t consumerId = pReq->consumerId; int64_t consumerId = pReq->consumerId;
@ -783,7 +783,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
return 0; return 0;
} }
int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) { int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
SMqSetCVgReq req; SMqSetCVgReq req;
tDecodeSMqSetCVgReq(msg, &req); tDecodeSMqSetCVgReq(msg, &req);
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);

View File

@ -17,14 +17,14 @@
#include "vnd.h" #include "vnd.h"
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode,
(putReqToQueryQFp)vnodePutReqToVQueryQ); (putReqToQueryQFp)vnodePutReqToVQueryQ);
} }
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in query queue is processing"); vTrace("message in query queue is processing");
switch (pMsg->msgType) { switch (pMsg->msgType) {
@ -38,7 +38,7 @@ int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
} }
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
@ -57,16 +57,16 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return vnodeGetTableList(pVnode, pMsg); return vnodeGetTableList(pVnode, pMsg);
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessConsumeReq(pVnode->pTq, pMsg, pRsp); return tqProcessConsumeReq(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;
} }
} }
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont); STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont);
STbCfg * pTbCfg = NULL; STbCfg * pTbCfg = NULL;
STbCfg * pStbCfg = NULL; STbCfg * pStbCfg = NULL;

View File

@ -115,7 +115,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead)), NULL) < 0) { if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead))) < 0) {
// TODO: handle error // TODO: handle error
} }
} break; } break;