From 4049155736d7786f7916a58a2e0fc627afaada1a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 21 Mar 2022 15:51:32 +0800 Subject: [PATCH] shm --- include/common/tmsg.h | 6 +++--- source/dnode/vnode/src/inc/vnd.h | 18 ++++++++++++------ source/dnode/vnode/src/vnd/vnodeMgr.c | 23 +++++++++++++++++------ 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f148009f81..595c03b752 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2323,9 +2323,9 @@ struct SRpcMsg; struct SEpSet; struct SMgmtWrapper; typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); -typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* rpcMsg); -typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); -typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* rpcMsg); +typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq); +typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq); +typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index d3f0dce2e5..de9b7bac83 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -46,15 +46,18 @@ typedef struct SVnodeTask { typedef struct SVnodeMgr { td_mode_flag_t vnodeInitFlag; // For commit - bool stop; - uint16_t nthreads; - TdThread* threads; + bool stop; + uint16_t nthreads; + TdThread* threads; TdThreadMutex mutex; TdThreadCond hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt - PutToQueueFp putToQueryQFp; - SendReqFp sendReqFp; + PutToQueueFp putToQueryQFp; + PutToQueueFp putToFetchQFp; + SendReqFp sendReqFp; + SendMnodeReqFp sendMnodeReqFp; + SendRspFp sendRspFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -85,7 +88,10 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); -void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); +int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq); +int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); +int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq); +void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp); #define vFatal(...) \ do { \ diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index 442921b90e..51aaf9e68f 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -26,7 +26,10 @@ int vnodeInit(const SVnodeOpt *pOption) { vnodeMgr.stop = false; vnodeMgr.putToQueryQFp = pOption->putToQueryQFp; + vnodeMgr.putToFetchQFp = pOption->putToFetchQFp; vnodeMgr.sendReqFp = pOption->sendReqFp; + vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp; + vnodeMgr.sendRspFp = pOption->sendRspFp; // Start commit handers if (pOption->nthreads > 0) { @@ -90,15 +93,23 @@ int vnodeScheduleTask(SVnodeTask* pTask) { } int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { - if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) { - terrno = TSDB_CODE_VND_APP_ERROR; - return -1; - } return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq); } -void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { - (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq); +int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) { + return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq); +} + +int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { + return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq); +} + +int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) { + return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq); +} + +void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) { + (*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp); } /* ------------------------ STATIC METHODS ------------------------ */