From f1f666e780238cbd39373ddb98fc6a07df6d6b3a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 11 Jan 2022 23:22:08 +0800 Subject: [PATCH] add message queue for vnode --- include/dnode/vnode/vnode.h | 17 ++++++++++++++--- source/dnode/mgmt/impl/src/dnode.c | 15 +++++++++++++-- source/dnode/vnode/impl/inc/vnodeDef.h | 4 ++++ source/dnode/vnode/impl/src/vnodeMgr.c | 18 +++++++++++++----- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 2212a8c29a..b83ca53859 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -66,15 +66,26 @@ typedef struct SVnodeCfg { SWalCfg walCfg; } SVnodeCfg; +typedef struct SDnode SDnode; +typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef struct { + int32_t sver; + SDnode *pDnode; + char *timezone; + char *locale; + char *charset; + PutReqToVQueryQFp putReqToVQueryQFp; + uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) +} SVnodeOpt; + /* ------------------------ SVnode ------------------------ */ /** * @brief Initialize the vnode module * - * @param nthreads number of commit threads. 0 for no threads and - * a schedule queue should be given (TODO) + * @param pOption Option of the vnode mnodule * @return int 0 for success and -1 for failure */ -int vnodeInit(uint16_t nthreads); +int vnodeInit(const SVnodeOpt *pOption); /** * @brief clear a vnode diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index cd27781df3..f3016feda5 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -22,8 +22,8 @@ #include "dndTransport.h" #include "dndVnodes.h" #include "sync.h" -#include "wal.h" #include "tfs.h" +#include "wal.h" EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } @@ -153,6 +153,8 @@ static void dndCleanupEnv(SDnode *pDnode) { taosStopCacheRefreshWorker(); } +static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); } + SDnode *dndInit(SDnodeOpt *pOption) { taosIgnSIGPIPE(); taosBlockSIGPIPE(); @@ -196,7 +198,16 @@ SDnode *dndInit(SDnodeOpt *pOption) { return NULL; } - if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) { + SVnodeOpt vnodeOpt = { + .sver = pDnode->opt.sver, + .pDnode = pDnode, + .timezone = pDnode->opt.timezone, + .locale = pDnode->opt.locale, + .charset = pDnode->opt.charset, + .putReqToVQueryQFp = dndPutMsgToVQueryQ, + .nthreads = pDnode->opt.numOfCommitThreads, + }; + if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode env"); dndCleanup(pDnode); return NULL; diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index e70e891794..361fdd10e0 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -57,6 +57,8 @@ typedef struct SVnodeMgr { pthread_cond_t hasTask; TD_DLIST(SVnodeTask) queue; // For vnode Mgmt + SDnode* pDnode; + PutReqToVQueryQFp putReqToVQueryQFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -79,6 +81,8 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); +void vnodePutReqToVQueryQ(struct SRpcMsg *pReq); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/impl/src/vnodeMgr.c b/source/dnode/vnode/impl/src/vnodeMgr.c index 784d1abb60..43527c13fe 100644 --- a/source/dnode/vnode/impl/src/vnodeMgr.c +++ b/source/dnode/vnode/impl/src/vnodeMgr.c @@ -19,17 +19,19 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; static void* loop(void* arg); -int vnodeInit(uint16_t nthreads) { +int vnodeInit(const SVnodeOpt *pOption) { if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { return 0; } vnodeMgr.stop = false; + vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; + vnodeMgr.pDnode = pOption->pDnode; // Start commit handers - if (nthreads > 0) { - vnodeMgr.nthreads = nthreads; - vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); + if (pOption->nthreads > 0) { + vnodeMgr.nthreads = pOption->nthreads; + vnodeMgr.threads = (pthread_t*)calloc(pOption->nthreads, sizeof(pthread_t)); if (vnodeMgr.threads == NULL) { return -1; } @@ -38,7 +40,7 @@ int vnodeInit(uint16_t nthreads) { pthread_cond_init(&(vnodeMgr.hasTask), NULL); TD_DLIST_INIT(&(vnodeMgr.queue)); - for (uint16_t i = 0; i < nthreads; i++) { + for (uint16_t i = 0; i < pOption->nthreads; i++) { pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL); pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); } @@ -89,6 +91,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) { return 0; } +void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) { + if (vnodeMgr.putReqToVQueryQFp) { + (*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq); + } +} + /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { SVnodeTask* pTask;