From f1f666e780238cbd39373ddb98fc6a07df6d6b3a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 23:22:08 +0800 Subject: [PATCH 1/2] add message queue for vnode --- include/dnode/vnode/vnode.h | 17 ++++++++++++++--- source/dnode/mgmt/impl/src/dnode.c | 15 +++++++++++++-- source/dnode/vnode/impl/inc/vnodeDef.h | 4 ++++ source/dnode/vnode/impl/src/vnodeMgr.c | 18 +++++++++++++----- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 2212a8c29a..b83ca53859 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -66,15 +66,26 @@ typedef struct SVnodeCfg { SWalCfg walCfg; } SVnodeCfg; +typedef struct SDnode SDnode; +typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef struct { + int32_t sver; + SDnode *pDnode; + char *timezone; + char *locale; + char *charset; + PutReqToVQueryQFp putReqToVQueryQFp; + uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) +} SVnodeOpt; + /* ------------------------ SVnode ------------------------ */ /** * @brief Initialize the vnode module * - * @param nthreads number of commit threads. 0 for no threads and - * a schedule queue should be given (TODO) + * @param pOption Option of the vnode mnodule * @return int 0 for success and -1 for failure */ -int vnodeInit(uint16_t nthreads); +int vnodeInit(const SVnodeOpt *pOption); /** * @brief clear a vnode diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index cd27781df3..f3016feda5 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -22,8 +22,8 @@ #include "dndTransport.h" #include "dndVnodes.h" #include "sync.h" -#include "wal.h" #include "tfs.h" +#include "wal.h" EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } @@ -153,6 +153,8 @@ static void dndCleanupEnv(SDnode *pDnode) { taosStopCacheRefreshWorker(); } +static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); } + SDnode *dndInit(SDnodeOpt *pOption) { taosIgnSIGPIPE(); taosBlockSIGPIPE(); @@ -196,7 +198,16 @@ SDnode *dndInit(SDnodeOpt *pOption) { return NULL; } - if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) { + SVnodeOpt vnodeOpt = { + .sver = pDnode->opt.sver, + .pDnode = pDnode, + .timezone = pDnode->opt.timezone, + .locale = pDnode->opt.locale, + .charset = pDnode->opt.charset, + .putReqToVQueryQFp = dndPutMsgToVQueryQ, + .nthreads = pDnode->opt.numOfCommitThreads, + }; + if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); dndCleanup(pDnode); return NULL; diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index e70e891794..361fdd10e0 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -57,6 +57,8 @@ typedef struct SVnodeMgr { pthread_cond_t hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt + SDnode* pDnode; + PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -79,6 +81,8 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); +void vnodePutReqToVQueryQ(struct SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 784d1abb60..43527c13fe 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -19,17 +19,19 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; static void* loop(void* arg); -int vnodeInit(uint16_t nthreads) { +int vnodeInit(const SVnodeOpt *pOption) { if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { return 0; } vnodeMgr.stop = false; + vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; + vnodeMgr.pDnode = pOption->pDnode; // Start commit handers - if (nthreads > 0) { - vnodeMgr.nthreads = nthreads; - vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); + if (pOption->nthreads > 0) { + vnodeMgr.nthreads = pOption->nthreads; + vnodeMgr.threads = (pthread_t*)calloc(pOption->nthreads, sizeof(pthread_t)); if (vnodeMgr.threads == NULL) { return -1; } @@ -38,7 +40,7 @@ int vnodeInit(uint16_t nthreads) { pthread_cond_init(&(vnodeMgr.hasTask), NULL); TD_DLIST_INIT(&(vnodeMgr.queue)); - for (uint16_t i = 0; i < nthreads; i++) { + for (uint16_t i = 0; i < pOption->nthreads; i++) { pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL); pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); } @@ -89,6 +91,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } +void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) { + if (vnodeMgr.putReqToVQueryQFp) { + (*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq); + } +} + /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { SVnodeTask* pTask; From ab3378e090100a98451f4fa606ca85c2a8fc9ec8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 16:44:11 -0800 Subject: [PATCH 2/2] add msg queue --- include/dnode/vnode/vnode.h | 11 ++++++----- source/dnode/mgmt/impl/src/dndVnodes.c | 6 ++++-- source/dnode/mgmt/impl/src/dnode.c | 3 +-- source/dnode/vnode/impl/inc/vnodeDef.h | 3 ++- source/dnode/vnode/impl/src/vnodeMain.c | 21 ++++++++++++--------- source/dnode/vnode/impl/src/vnodeMgr.c | 9 ++++----- 6 files changed, 29 insertions(+), 24 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index b83ca53859..03a59b01e6 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -31,8 +31,12 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; +typedef struct SDnode SDnode; +typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); + typedef struct SVnodeCfg { int32_t vgId; + SDnode *pDnode; /** vnode buffer pool options */ struct { @@ -66,16 +70,13 @@ typedef struct SVnodeCfg { SWalCfg walCfg; } SVnodeCfg; -typedef struct SDnode SDnode; -typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef struct { int32_t sver; - SDnode *pDnode; char *timezone; char *locale; char *charset; - PutReqToVQueryQFp putReqToVQueryQFp; uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) + PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeOpt; /* ------------------------ SVnode ------------------------ */ @@ -100,7 +101,7 @@ void vnodeClear(); * @param pVnodeCfg options of the vnode * @return SVnode* The vnode object */ -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); /** * @brief Close a VNODE diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index bf27a542ae..9f585859a8 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId); + SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId}; + SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId); + vnodeCfg.pDnode = pDnode; + SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); return -1; diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index f3016feda5..ea42db96ab 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -200,12 +200,11 @@ SDnode *dndInit(SDnodeOpt *pOption) { SVnodeOpt vnodeOpt = { .sver = pDnode->opt.sver, - .pDnode = pDnode, .timezone = pDnode->opt.timezone, .locale = pDnode->opt.locale, .charset = pDnode->opt.charset, - .putReqToVQueryQFp = dndPutMsgToVQueryQ, .nthreads = pDnode->opt.numOfCommitThreads, + .putReqToVQueryQFp = dndPutMsgToVQueryQ, }; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 361fdd10e0..4f53dcd899 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -77,11 +77,12 @@ struct SVnode { SVnodeFS* pFs; tsem_t canCommit; SQHandle* pQuery; + SDnode* pDnode; }; int vnodeScheduleTask(SVnodeTask* task); -void vnodePutReqToVQueryQ(struct SRpcMsg *pReq); +void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index eb4b45bc20..85ccc9879e 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -15,27 +15,29 @@ #include "vnodeDef.h" -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - //if (pVnodeCfg == NULL) { - pVnodeCfg = &defaultVnodeOptions; - //} + SVnodeCfg cfg = defaultVnodeOptions; + if (pVnodeCfg != NULL) { + cfg.vgId = pVnodeCfg->vgId; + cfg.pDnode = pVnodeCfg->pDnode; + } // Validate options - if (vnodeValidateOptions(pVnodeCfg) < 0) { + if (vnodeValidateOptions(&cfg) < 0) { // TODO return NULL; } // Create the handle - pVnode = vnodeNew(path, pVnodeCfg, vid); + pVnode = vnodeNew(path, &cfg); if (pVnode == NULL) { // TODO: handle error return NULL; @@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) { void vnodeDestroy(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); @@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi return NULL; } - pVnode->vgId = vid; + pVnode->vgId = pVnodeCfg->vgId; + pVnode->pDnode = pVnodeCfg->pDnode; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 43527c13fe..51f33031ac 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -26,7 +26,6 @@ int vnodeInit(const SVnodeOpt *pOption) { vnodeMgr.stop = false; vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; - vnodeMgr.pDnode = pOption->pDnode; // Start commit handers if (pOption->nthreads > 0) { @@ -91,10 +90,10 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } -void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) { - if (vnodeMgr.putReqToVQueryQFp) { - (*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq); - } +void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { + assert(vnodeMgr.putReqToVQueryQFp); + assert(pVnode->pDnode); + (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); } /* ------------------------ STATIC METHODS ------------------------ */