add msg queue

This commit is contained in:
Shengliang Guan 2022-01-11 16:44:11 -08:00
parent f1f666e780
commit ab3378e090
6 changed files with 29 additions and 24 deletions

View File

@ -31,8 +31,12 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct SDnode SDnode;
typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct SVnodeCfg { typedef struct SVnodeCfg {
int32_t vgId; int32_t vgId;
SDnode *pDnode;
/** vnode buffer pool options */ /** vnode buffer pool options */
struct { struct {
@ -66,16 +70,13 @@ typedef struct SVnodeCfg {
SWalCfg walCfg; SWalCfg walCfg;
} SVnodeCfg; } SVnodeCfg;
typedef struct SDnode SDnode;
typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct { typedef struct {
int32_t sver; int32_t sver;
SDnode *pDnode;
char *timezone; char *timezone;
char *locale; char *locale;
char *charset; char *charset;
PutReqToVQueryQFp putReqToVQueryQFp;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutReqToVQueryQFp putReqToVQueryQFp;
} SVnodeOpt; } SVnodeOpt;
/* ------------------------ SVnode ------------------------ */ /* ------------------------ SVnode ------------------------ */
@ -100,7 +101,7 @@ void vnodeClear();
* @param pVnodeCfg options of the vnode * @param pVnodeCfg options of the vnode
* @return SVnode* The vnode object * @return SVnode* The vnode object
*/ */
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
/** /**
* @brief Close a VNODE * @brief Close a VNODE

View File

@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes); pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId); SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return -1; return -1;
} }
SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId); vnodeCfg.pDnode = pDnode;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
return -1; return -1;

View File

@ -200,12 +200,11 @@ SDnode *dndInit(SDnodeOpt *pOption) {
SVnodeOpt vnodeOpt = { SVnodeOpt vnodeOpt = {
.sver = pDnode->opt.sver, .sver = pDnode->opt.sver,
.pDnode = pDnode,
.timezone = pDnode->opt.timezone, .timezone = pDnode->opt.timezone,
.locale = pDnode->opt.locale, .locale = pDnode->opt.locale,
.charset = pDnode->opt.charset, .charset = pDnode->opt.charset,
.putReqToVQueryQFp = dndPutMsgToVQueryQ,
.nthreads = pDnode->opt.numOfCommitThreads, .nthreads = pDnode->opt.numOfCommitThreads,
.putReqToVQueryQFp = dndPutMsgToVQueryQ,
}; };
if (vnodeInit(&vnodeOpt) != 0) { if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode env"); dError("failed to init vnode env");

View File

@ -77,11 +77,12 @@ struct SVnode {
SVnodeFS* pFs; SVnodeFS* pFs;
tsem_t canCommit; tsem_t canCommit;
SQHandle* pQuery; SQHandle* pQuery;
SDnode* pDnode;
}; };
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
void vnodePutReqToVQueryQ(struct SRpcMsg *pReq); void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -15,27 +15,29 @@
#include "vnodeDef.h" #include "vnodeDef.h"
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
static void vnodeFree(SVnode *pVnode); static void vnodeFree(SVnode *pVnode);
static int vnodeOpenImpl(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode);
static void vnodeCloseImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode);
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
// Set default options // Set default options
//if (pVnodeCfg == NULL) { SVnodeCfg cfg = defaultVnodeOptions;
pVnodeCfg = &defaultVnodeOptions; if (pVnodeCfg != NULL) {
//} cfg.vgId = pVnodeCfg->vgId;
cfg.pDnode = pVnodeCfg->pDnode;
}
// Validate options // Validate options
if (vnodeValidateOptions(pVnodeCfg) < 0) { if (vnodeValidateOptions(&cfg) < 0) {
// TODO // TODO
return NULL; return NULL;
} }
// Create the handle // Create the handle
pVnode = vnodeNew(path, pVnodeCfg, vid); pVnode = vnodeNew(path, &cfg);
if (pVnode == NULL) { if (pVnode == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) {
void vnodeDestroy(const char *path) { taosRemoveDir(path); } void vnodeDestroy(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnode *pVnode = NULL; SVnode *pVnode = NULL;
pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); pVnode = (SVnode *)calloc(1, sizeof(*pVnode));
@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi
return NULL; return NULL;
} }
pVnode->vgId = vid; pVnode->vgId = pVnodeCfg->vgId;
pVnode->pDnode = pVnodeCfg->pDnode;
pVnode->path = strdup(path); pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);

View File

@ -26,7 +26,6 @@ int vnodeInit(const SVnodeOpt *pOption) {
vnodeMgr.stop = false; vnodeMgr.stop = false;
vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp;
vnodeMgr.pDnode = pOption->pDnode;
// Start commit handers // Start commit handers
if (pOption->nthreads > 0) { if (pOption->nthreads > 0) {
@ -91,10 +90,10 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
return 0; return 0;
} }
void vnodePutReqToVQueryQ(struct SRpcMsg* pReq) { void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
if (vnodeMgr.putReqToVQueryQFp) { assert(vnodeMgr.putReqToVQueryQFp);
(*vnodeMgr.putReqToVQueryQFp)(vnodeMgr.pDnode, pReq); assert(pVnode->pDnode);
} (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq);
} }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */