From 1363021f430167572f7c3dd5b6091b71439738e8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 19 Mar 2022 09:48:29 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/vnode/inc/vmInt.h | 29 +++++++++++++------------ source/dnode/mgmt/vnode/src/vmInt.c | 3 ++- source/dnode/mgmt/vnode/src/vmMsg.c | 2 +- source/dnode/mgmt/vnode/src/vmWorker.c | 6 ++--- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnd.h | 6 ++--- source/dnode/vnode/src/vnd/vnodeMain.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeMgr.c | 8 +++---- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- 9 files changed, 32 insertions(+), 30 deletions(-) diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index a5e29f89d0..a05b6ec993 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -49,20 +49,21 @@ typedef struct { } SWrapperCfg; typedef struct { - int32_t vgId; - int32_t refCount; - int32_t vgVersion; - int8_t dropped; - int8_t accessState; - uint64_t dbUid; - char *db; - char *path; - SVnode *pImpl; - STaosQueue *pWriteQ; - STaosQueue *pSyncQ; - STaosQueue *pApplyQ; - STaosQueue *pQueryQ; - STaosQueue *pFetchQ; + int32_t vgId; + int32_t refCount; + int32_t vgVersion; + int8_t dropped; + int8_t accessState; + uint64_t dbUid; + char *db; + char *path; + SVnode *pImpl; + STaosQueue *pWriteQ; + STaosQueue *pSyncQ; + STaosQueue *pApplyQ; + STaosQueue *pQueryQ; + STaosQueue *pFetchQ; + SMgmtWrapper *pWrapper; } SVnodeObj; typedef struct { diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 6751ef3f3c..ed1f6021e4 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -56,6 +56,7 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->refCount = 0; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; + pVnode->pWrapper = pMgmt->pWrapper; pVnode->pImpl = pImpl; pVnode->vgVersion = pCfg->vgVersion; pVnode->dbUid = pCfg->dbUid; @@ -127,7 +128,7 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnodeCfg cfg = {.pMgmt = pMgmt, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; + SVnodeCfg cfg = {.pWrapper = pMgmt->pWrapper, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index 96cb95bcb7..b3f7529311 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -82,7 +82,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - vnodeCfg.pMgmt = pMgmt; + vnodeCfg.pWrapper = pMgmt->pWrapper; vnodeCfg.pTfs = pMgmt->pTfs; vnodeCfg.dbId = wrapperCfg.dbUid; SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 6d75c6c3e8..e5d5e38328 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -46,12 +46,12 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; - rpcSendResponse(pRsp); + dndSendRsp(pVnode->pWrapper, pRsp); free(pRsp); } else { if (code != 0) code = terrno; SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; - rpcSendResponse(&rpcRsp); + dndSendRsp(pVnode->pWrapper, &rpcRsp); } } @@ -236,7 +236,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { if (msgType & 1u) { if (code != 0) code = terrno; SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; - rpcSendResponse(&rsp); + dndSendRsp(pMgmt->pWrapper, &rsp); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d318732bfc..36f95f233b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -40,7 +40,7 @@ typedef struct { typedef struct { int32_t vgId; uint64_t dbId; - void *pMgmt; + void *pWrapper; STfs *pTfs; uint64_t wsize; uint64_t ssize; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ebdc44ca6d..1157b44acf 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -78,14 +78,14 @@ struct SVnode { SWal* pWal; tsem_t canCommit; SQHandle* pQuery; - void* pMgmt; + void* pWrapper; STfs* pTfs; }; int vnodeScheduleTask(SVnodeTask* task); -int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); -void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); +int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); +void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); #define vFatal(...) \ do { \ diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index 9388877fdc..c7405fdcea 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -27,7 +27,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnodeCfg cfg = defaultVnodeOptions; if (pVnodeCfg != NULL) { cfg.vgId = pVnodeCfg->vgId; - cfg.pMgmt = pVnodeCfg->pMgmt; + cfg.pWrapper = pVnodeCfg->pWrapper; cfg.pTfs = pVnodeCfg->pTfs; cfg.dbId = pVnodeCfg->dbId; cfg.hashBegin = pVnodeCfg->hashBegin; @@ -79,7 +79,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { } pVnode->vgId = pVnodeCfg->vgId; - pVnode->pMgmt = pVnodeCfg->pMgmt; + pVnode->pWrapper = pVnodeCfg->pWrapper; pVnode->pTfs = pVnodeCfg->pTfs; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index a01c17488b..59109d30c0 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -89,16 +89,16 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { +int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { if (pVnode == NULL || pVnode->pMeta == NULL || vnodeMgr.putToQueryQFp == NULL) { terrno = TSDB_CODE_VND_APP_ERROR; return -1; } - return (*vnodeMgr.putToQueryQFp)(pVnode->pMgmt, pReq); + return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq); } -void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { - (*vnodeMgr.sendReqFp)(pVnode->pMgmt, epSet, pReq); +void vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { + (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq); } /* ------------------------ STATIC METHODS ------------------------ */ diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 9819c10512..229ed29ea9 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, - (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqFp)vnodeSendReqToDnode); + (putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq); } void vnodeQueryClose(SVnode *pVnode) {