From ce016a216cfe8c3133f2c86eeeafbbaea7742652 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 28 Mar 2022 17:25:12 +0800 Subject: [PATCH] refact node msg fp --- include/util/tqueue.h | 2 +- source/dnode/mgmt/bnode/inc/bmInt.h | 2 +- source/dnode/mgmt/bnode/src/bmWorker.c | 6 +- source/dnode/mgmt/container/inc/dnd.h | 2 +- source/dnode/mgmt/container/src/dndExec.c | 3 +- source/dnode/mgmt/container/src/dndMsg.c | 2 +- source/dnode/mgmt/dnode/inc/dmInt.h | 2 +- source/dnode/mgmt/dnode/src/dmMsg.c | 26 ++-- source/dnode/mgmt/dnode/src/dmWorker.c | 6 +- source/dnode/mgmt/mnode/inc/mmInt.h | 13 +- source/dnode/mgmt/mnode/src/mmInt.c | 1 + source/dnode/mgmt/mnode/src/mmMsg.c | 162 +++++++++++----------- source/dnode/mgmt/mnode/src/mmWorker.c | 75 +++++----- source/dnode/mgmt/qnode/inc/qmInt.h | 4 +- source/dnode/mgmt/qnode/src/qmMsg.c | 18 +-- source/dnode/mgmt/qnode/src/qmWorker.c | 27 ++-- source/dnode/mgmt/snode/inc/smInt.h | 8 +- source/dnode/mgmt/snode/src/smMsg.c | 4 +- source/dnode/mgmt/snode/src/smWorker.c | 26 ++-- source/dnode/mgmt/vnode/inc/vmInt.h | 12 +- source/dnode/mgmt/vnode/src/vmMsg.c | 88 ++++++------ source/dnode/mgmt/vnode/src/vmWorker.c | 63 ++++++--- source/util/src/tqueue.c | 4 +- 23 files changed, 296 insertions(+), 260 deletions(-) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 3bccc7404b..70db65d50f 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -56,7 +56,7 @@ void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); void *taosAllocateQitem(int32_t size); void taosFreeQitem(void *pItem); -int32_t taosWriteQitem(STaosQueue *queue, void *pItem); +void taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); int32_t taosQueueSize(STaosQueue *queue); diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index 8cfff0f1f3..f19ba4e034 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -43,7 +43,7 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // bmWorker.c int32_t bmStartWorker(SBnodeMgmt *pMgmt); void bmStopWorker(SBnodeMgmt *pMgmt); -int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/bnode/src/bmWorker.c b/source/dnode/mgmt/bnode/src/bmWorker.c index 1ed9b0d931..932c008e34 100644 --- a/source/dnode/mgmt/bnode/src/bmWorker.c +++ b/source/dnode/mgmt/bnode/src/bmWorker.c @@ -63,11 +63,13 @@ static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs taosArrayDestroy(pArray); } -int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SBnodeMgmt *pMgmt = pWrapper->pMgmt; SMultiWorker *pWorker = &pMgmt->writeWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); + return 0; } int32_t bmStartWorker(SBnodeMgmt *pMgmt) { diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index f6c8897f64..7c06e08dff 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -63,7 +63,7 @@ typedef struct SQnodeMgmt SQnodeMgmt; typedef struct SSnodeMgmt SSnodeMgmt; typedef struct SBnodeMgmt SBnodeMgmt; -typedef int32_t (*NodeMsgFp)(void *pMgmt, SNodeMsg *pMsg); +typedef int32_t (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef int32_t (*StartNodeFp)(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index cc1d8e5f24..94c996280f 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -123,7 +123,8 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t pRpc->ahandle); NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - int32_t code = (*msgFp)(pWrapper->pMgmt, pMsg); + int32_t code = (*msgFp)(pWrapper, pMsg); + dTrace("msg:%p, is processed, code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); if (code != 0) { if (pRpc->msgType & 1U) { diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index b72d085861..5da1d73034 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -62,7 +62,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pWrapper->procType == PROC_SINGLE) { dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); - code = (*msgFp)(pWrapper->pMgmt, pMsg); + code = (*msgFp)(pWrapper, pMsg); } else if (pWrapper->procType == PROC_PARENT) { dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user); diff --git a/source/dnode/mgmt/dnode/inc/dmInt.h b/source/dnode/mgmt/dnode/inc/dmInt.h index b02b1d2297..3036d1f5ad 100644 --- a/source/dnode/mgmt/dnode/inc/dmInt.h +++ b/source/dnode/mgmt/dnode/inc/dmInt.h @@ -54,7 +54,7 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmStartWorker(SDnodeMgmt *pMgmt); void dmStopWorker(SDnodeMgmt *pMgmt); int32_t dmStartThread(SDnodeMgmt *pMgmt); -int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index eb4e843c55..b301ef478b 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -118,19 +118,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { void dmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, VND_VGID); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, VND_VGID); } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 8e560503d0..63b9704b78 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -140,12 +140,14 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { dDebug("dnode workers are closed"); } -int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SDnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->mgmtWorker; if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) { pWorker = &pMgmt->statusWorker; } dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); + return 0; } diff --git a/source/dnode/mgmt/mnode/inc/mmInt.h b/source/dnode/mgmt/mnode/inc/mmInt.h index cd4585048b..86cba97a33 100644 --- a/source/dnode/mgmt/mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mnode/inc/mmInt.h @@ -55,13 +55,14 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); void mmStopWorker(SMnodeMgmt *pMgmt); -int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); -int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpcMsg); +int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); +int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); +int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); +int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/src/mmInt.c b/source/dnode/mgmt/mnode/src/mmInt.c index 61afcb11d1..f5a3252fa2 100644 --- a/source/dnode/mgmt/mnode/src/mmInt.c +++ b/source/dnode/mgmt/mnode/src/mmInt.c @@ -48,6 +48,7 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; + msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue; msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendRspFp = dndSendRsp; diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index d04077baf8..6afcd249b3 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -75,91 +75,91 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { void mmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by DNODE - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg, VND_VGID); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, VND_VGID); // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, mmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, mmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MND_VGID); } diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index d07313410a..0897951fe3 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -67,60 +67,63 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } - -static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) { +static void mmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); } -int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg); +int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + mmPutMsgToWorker(&pMgmt->writeWorker, pMsg); + return 0; } -int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); +int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + mmPutMsgToWorker(&pMgmt->syncWorker, pMsg); + return 0; } -int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); +int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + mmPutMsgToWorker(&pMgmt->readWorker, pMsg); + return 0; } -int32_t mmProcessQueryMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { - return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); +int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + mmPutMsgToWorker(&pMgmt->queryWorker, pMsg); + return 0; } -static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { +static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); - if (pMsg == NULL) { - return -1; - } + if (pMsg == NULL) return -1; dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - - int32_t code = taosWriteQitem(pWorker->queue, pMsg); - if (code != 0) { - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pRpc->pCont); - } - - return code; -} - -int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return mmPutRpcMsgToWorker(pMgmt, &pMgmt->writeWorker, pRpc); -} - -int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc); + taosWriteQitem(pWorker->queue, pMsg); + return 0; } int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; - return mmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc); + return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pRpc); +} + +int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pRpc); +} + +int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(&pMgmt->readWorker, pRpc); +} + +int32_t mmPutMsgToSyncQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pRpc); } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { @@ -153,8 +156,8 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { - tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); + tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->syncWorker); dDebug("mnode workers are closed"); diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index 52d23a445c..3e975663d3 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -48,8 +48,8 @@ int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); -int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/qnode/src/qmMsg.c b/source/dnode/mgmt/qnode/src/qmMsg.c index ebe6477e81..da5ba6472a 100644 --- a/source/dnode/mgmt/qnode/src/qmMsg.c +++ b/source/dnode/mgmt/qnode/src/qmMsg.c @@ -56,14 +56,14 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { void qmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, qmProcessFetchMsg, QND_VGID); } diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index 5923f5c2f4..14efb311b1 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -49,14 +49,22 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { +static void qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) { dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); } -int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); } +int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); + return 0; +} -int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); } +int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); + return 0; +} static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); @@ -66,15 +74,8 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SR dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; - - int32_t code = taosWriteQitem(pWorker->queue, pMsg); - if (code != 0) { - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pRpc->pCont); - } - - return code; + taosWriteQitem(pWorker->queue, pMsg); + return 0; } int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index f2b510483c..9290384cab 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -46,10 +46,10 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); void smStopWorker(SSnodeMgmt *pMgmt); -int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/src/smMsg.c b/source/dnode/mgmt/snode/src/smMsg.c index aea1dded56..c522ef7fc3 100644 --- a/source/dnode/mgmt/snode/src/smMsg.c +++ b/source/dnode/mgmt/snode/src/smMsg.c @@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { void smInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by SNODE - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, VND_VGID); } diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index 4e46ad5818..0326d7dd9f 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -107,7 +107,8 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { return 0; } -int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0); if (pWorker == NULL) { terrno = TSDB_CODE_INVALID_MSG; @@ -115,10 +116,12 @@ int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); + return 0; } -int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index); if (pWorker == NULL) { @@ -127,21 +130,24 @@ int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { } dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); + return 0; } -int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->sharedWorker; dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); + return 0; } -int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); +int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg); if (workerType == SND_WORKER_TYPE__SHARED) { - return smProcessSharedMsg(pMgmt, pMsg); + return smProcessSharedMsg(pWrapper, pMsg); } else { - return smProcessUniqueMsg(pMgmt, pMsg); + return smProcessUniqueMsg(pWrapper, pMsg); } } diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index 197c606a0d..6722fe1d65 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -108,12 +108,12 @@ int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype); -int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmProcessMergeMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); -int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrappert, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 97d829571f..a5986f8502 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -244,49 +244,49 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { void vmInitMsgHandles(SMgmtWrapper *pWrapper) { // Requests handled by VNODE - dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, vmProcessQueryMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); - dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, vmProcessMgmtMsg, VND_VGID); + dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, vmProcessMgmtMsg, VND_VGID); } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 99274ac99b..193807317f 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -179,9 +179,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO } static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { - SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = -1; - + SRpcMsg *pRpc = &pMsg->rpcMsg; SMsgHead *pHead = pRpc->pCont; pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); @@ -192,28 +190,30 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp return -1; } + int32_t code = 0; switch (qtype) { case QUERY_QUEUE: dTrace("msg:%p, will be written into vnode-query queue", pMsg); - code = taosWriteQitem(pVnode->pQueryQ, pMsg); + taosWriteQitem(pVnode->pQueryQ, pMsg); break; case FETCH_QUEUE: dTrace("msg:%p, will be written into vnode-fetch queue", pMsg); - code = taosWriteQitem(pVnode->pFetchQ, pMsg); + taosWriteQitem(pVnode->pFetchQ, pMsg); break; case WRITE_QUEUE: dTrace("msg:%p, will be written into vnode-write queue", pMsg); - code = taosWriteQitem(pVnode->pWriteQ, pMsg); + taosWriteQitem(pVnode->pWriteQ, pMsg); break; case SYNC_QUEUE: dTrace("msg:%p, will be written into vnode-sync queue", pMsg); - code = taosWriteQitem(pVnode->pSyncQ, pMsg); + taosWriteQitem(pVnode->pSyncQ, pMsg); break; case MERGE_QUEUE: dTrace("msg:%p, will be written into vnode-merge queue", pMsg); - code = taosWriteQitem(pVnode->pMergeQ, pMsg); + taosWriteQitem(pVnode->pMergeQ, pMsg); break; default: + code = -1; terrno = TSDB_CODE_INVALID_PARA; break; } @@ -222,52 +222,73 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp return code; } -int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); } +int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); +} -int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); } +int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); +} -int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); } +int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); +} -int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } +int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); +} -int32_t vmProcessMergeMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); } +int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); +} -int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { +int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); - return taosWriteQitem(pWorker->queue, pMsg); + taosWriteQitem(pWorker->queue, pMsg); + return 0; } static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; - int32_t code = -1; SMsgHead *pHead = pRpc->pCont; SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); - if (pMsg != NULL) { + int32_t code = 0; + + if (pMsg == NULL) { + code = -1; + } else { dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); pMsg->rpcMsg = *pRpc; switch (qtype) { case QUERY_QUEUE: dTrace("msg:%p, will be put into vnode-query queue", pMsg); - code = taosWriteQitem(pVnode->pQueryQ, pMsg); + taosWriteQitem(pVnode->pQueryQ, pMsg); break; case FETCH_QUEUE: dTrace("msg:%p, will be put into vnode-fetch queue", pMsg); - code = taosWriteQitem(pVnode->pFetchQ, pMsg); + taosWriteQitem(pVnode->pFetchQ, pMsg); break; case APPLY_QUEUE: dTrace("msg:%p, will be put into vnode-apply queue", pMsg); - code = taosWriteQitem(pVnode->pApplyQ, pMsg); + taosWriteQitem(pVnode->pApplyQ, pMsg); break; case MERGE_QUEUE: dTrace("msg:%p, will be put into vnode-merge queue", pMsg); - code = taosWriteQitem(pVnode->pMergeQ, pMsg); + taosWriteQitem(pVnode->pMergeQ, pMsg); break; default: + code = -1; terrno = TSDB_CODE_INVALID_PARA; break; } diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index b01e1ea1da..3c3a8460b9 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -146,7 +146,7 @@ void taosFreeQitem(void *pItem) { taosMemoryFree(temp); } -int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { +void taosWriteQitem(STaosQueue *queue, void *pItem) { STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode)); pNode->next = NULL; @@ -167,8 +167,6 @@ int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { taosThreadMutexUnlock(&queue->mutex); if (queue->qset) tsem_post(&queue->qset->sem); - - return 0; } int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {