Merge pull request #9750 from taosdata/feature/dnode3
add message queue for vnode
This commit is contained in:
commit
9369792053
|
@ -31,8 +31,12 @@ extern "C" {
|
||||||
|
|
||||||
/* ------------------------ TYPES EXPOSED ------------------------ */
|
/* ------------------------ TYPES EXPOSED ------------------------ */
|
||||||
typedef struct SVnode SVnode;
|
typedef struct SVnode SVnode;
|
||||||
|
typedef struct SDnode SDnode;
|
||||||
|
typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
|
||||||
|
|
||||||
typedef struct SVnodeCfg {
|
typedef struct SVnodeCfg {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
SDnode *pDnode;
|
||||||
|
|
||||||
/** vnode buffer pool options */
|
/** vnode buffer pool options */
|
||||||
struct {
|
struct {
|
||||||
|
@ -66,15 +70,23 @@ typedef struct SVnodeCfg {
|
||||||
SWalCfg walCfg;
|
SWalCfg walCfg;
|
||||||
} SVnodeCfg;
|
} SVnodeCfg;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t sver;
|
||||||
|
char *timezone;
|
||||||
|
char *locale;
|
||||||
|
char *charset;
|
||||||
|
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
|
||||||
|
PutReqToVQueryQFp putReqToVQueryQFp;
|
||||||
|
} SVnodeOpt;
|
||||||
|
|
||||||
/* ------------------------ SVnode ------------------------ */
|
/* ------------------------ SVnode ------------------------ */
|
||||||
/**
|
/**
|
||||||
* @brief Initialize the vnode module
|
* @brief Initialize the vnode module
|
||||||
*
|
*
|
||||||
* @param nthreads number of commit threads. 0 for no threads and
|
* @param pOption Option of the vnode mnodule
|
||||||
* a schedule queue should be given (TODO)
|
|
||||||
* @return int 0 for success and -1 for failure
|
* @return int 0 for success and -1 for failure
|
||||||
*/
|
*/
|
||||||
int vnodeInit(uint16_t nthreads);
|
int vnodeInit(const SVnodeOpt *pOption);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief clear a vnode
|
* @brief clear a vnode
|
||||||
|
@ -89,7 +101,7 @@ void vnodeClear();
|
||||||
* @param pVnodeCfg options of the vnode
|
* @param pVnodeCfg options of the vnode
|
||||||
* @return SVnode* The vnode object
|
* @return SVnode* The vnode object
|
||||||
*/
|
*/
|
||||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid);
|
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Close a VNODE
|
* @brief Close a VNODE
|
||||||
|
|
|
@ -381,7 +381,8 @@ static void *dnodeOpenVnodeFunc(void *param) {
|
||||||
pMgmt->openVnodes, pMgmt->totalVnodes);
|
pMgmt->openVnodes, pMgmt->totalVnodes);
|
||||||
dndReportStartup(pDnode, "open-vnodes", stepDesc);
|
dndReportStartup(pDnode, "open-vnodes", stepDesc);
|
||||||
|
|
||||||
SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId);
|
SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId};
|
||||||
|
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
|
||||||
pThread->failed++;
|
pThread->failed++;
|
||||||
|
@ -581,7 +582,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId);
|
vnodeCfg.pDnode = pDnode;
|
||||||
|
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
|
||||||
if (pImpl == NULL) {
|
if (pImpl == NULL) {
|
||||||
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
|
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -22,8 +22,8 @@
|
||||||
#include "dndTransport.h"
|
#include "dndTransport.h"
|
||||||
#include "dndVnodes.h"
|
#include "dndVnodes.h"
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
#include "wal.h"
|
|
||||||
#include "tfs.h"
|
#include "tfs.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
|
EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; }
|
||||||
|
|
||||||
|
@ -153,6 +153,8 @@ static void dndCleanupEnv(SDnode *pDnode) {
|
||||||
taosStopCacheRefreshWorker();
|
taosStopCacheRefreshWorker();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); }
|
||||||
|
|
||||||
SDnode *dndInit(SDnodeOpt *pOption) {
|
SDnode *dndInit(SDnodeOpt *pOption) {
|
||||||
taosIgnSIGPIPE();
|
taosIgnSIGPIPE();
|
||||||
taosBlockSIGPIPE();
|
taosBlockSIGPIPE();
|
||||||
|
@ -196,7 +198,15 @@ SDnode *dndInit(SDnodeOpt *pOption) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (vnodeInit(pDnode->opt.numOfCommitThreads) != 0) {
|
SVnodeOpt vnodeOpt = {
|
||||||
|
.sver = pDnode->opt.sver,
|
||||||
|
.timezone = pDnode->opt.timezone,
|
||||||
|
.locale = pDnode->opt.locale,
|
||||||
|
.charset = pDnode->opt.charset,
|
||||||
|
.nthreads = pDnode->opt.numOfCommitThreads,
|
||||||
|
.putReqToVQueryQFp = dndPutMsgToVQueryQ,
|
||||||
|
};
|
||||||
|
if (vnodeInit(&vnodeOpt) != 0) {
|
||||||
dError("failed to init vnode env");
|
dError("failed to init vnode env");
|
||||||
dndCleanup(pDnode);
|
dndCleanup(pDnode);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -57,6 +57,8 @@ typedef struct SVnodeMgr {
|
||||||
pthread_cond_t hasTask;
|
pthread_cond_t hasTask;
|
||||||
TD_DLIST(SVnodeTask) queue;
|
TD_DLIST(SVnodeTask) queue;
|
||||||
// For vnode Mgmt
|
// For vnode Mgmt
|
||||||
|
SDnode* pDnode;
|
||||||
|
PutReqToVQueryQFp putReqToVQueryQFp;
|
||||||
} SVnodeMgr;
|
} SVnodeMgr;
|
||||||
|
|
||||||
extern SVnodeMgr vnodeMgr;
|
extern SVnodeMgr vnodeMgr;
|
||||||
|
@ -75,10 +77,13 @@ struct SVnode {
|
||||||
SVnodeFS* pFs;
|
SVnodeFS* pFs;
|
||||||
tsem_t canCommit;
|
tsem_t canCommit;
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
|
SDnode* pDnode;
|
||||||
};
|
};
|
||||||
|
|
||||||
int vnodeScheduleTask(SVnodeTask* task);
|
int vnodeScheduleTask(SVnodeTask* task);
|
||||||
|
|
||||||
|
void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -15,27 +15,29 @@
|
||||||
|
|
||||||
#include "vnodeDef.h"
|
#include "vnodeDef.h"
|
||||||
|
|
||||||
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid);
|
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg);
|
||||||
static void vnodeFree(SVnode *pVnode);
|
static void vnodeFree(SVnode *pVnode);
|
||||||
static int vnodeOpenImpl(SVnode *pVnode);
|
static int vnodeOpenImpl(SVnode *pVnode);
|
||||||
static void vnodeCloseImpl(SVnode *pVnode);
|
static void vnodeCloseImpl(SVnode *pVnode);
|
||||||
|
|
||||||
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) {
|
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
|
||||||
SVnode *pVnode = NULL;
|
SVnode *pVnode = NULL;
|
||||||
|
|
||||||
// Set default options
|
// Set default options
|
||||||
//if (pVnodeCfg == NULL) {
|
SVnodeCfg cfg = defaultVnodeOptions;
|
||||||
pVnodeCfg = &defaultVnodeOptions;
|
if (pVnodeCfg != NULL) {
|
||||||
//}
|
cfg.vgId = pVnodeCfg->vgId;
|
||||||
|
cfg.pDnode = pVnodeCfg->pDnode;
|
||||||
|
}
|
||||||
|
|
||||||
// Validate options
|
// Validate options
|
||||||
if (vnodeValidateOptions(pVnodeCfg) < 0) {
|
if (vnodeValidateOptions(&cfg) < 0) {
|
||||||
// TODO
|
// TODO
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the handle
|
// Create the handle
|
||||||
pVnode = vnodeNew(path, pVnodeCfg, vid);
|
pVnode = vnodeNew(path, &cfg);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -62,7 +64,7 @@ void vnodeClose(SVnode *pVnode) {
|
||||||
void vnodeDestroy(const char *path) { taosRemoveDir(path); }
|
void vnodeDestroy(const char *path) { taosRemoveDir(path); }
|
||||||
|
|
||||||
/* ------------------------ STATIC METHODS ------------------------ */
|
/* ------------------------ STATIC METHODS ------------------------ */
|
||||||
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) {
|
static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
|
||||||
SVnode *pVnode = NULL;
|
SVnode *pVnode = NULL;
|
||||||
|
|
||||||
pVnode = (SVnode *)calloc(1, sizeof(*pVnode));
|
pVnode = (SVnode *)calloc(1, sizeof(*pVnode));
|
||||||
|
@ -71,7 +73,8 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vi
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->vgId = vid;
|
pVnode->vgId = pVnodeCfg->vgId;
|
||||||
|
pVnode->pDnode = pVnodeCfg->pDnode;
|
||||||
pVnode->path = strdup(path);
|
pVnode->path = strdup(path);
|
||||||
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
|
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
|
||||||
|
|
||||||
|
|
|
@ -19,17 +19,18 @@ SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};
|
||||||
|
|
||||||
static void* loop(void* arg);
|
static void* loop(void* arg);
|
||||||
|
|
||||||
int vnodeInit(uint16_t nthreads) {
|
int vnodeInit(const SVnodeOpt *pOption) {
|
||||||
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
|
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeMgr.stop = false;
|
vnodeMgr.stop = false;
|
||||||
|
vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp;
|
||||||
|
|
||||||
// Start commit handers
|
// Start commit handers
|
||||||
if (nthreads > 0) {
|
if (pOption->nthreads > 0) {
|
||||||
vnodeMgr.nthreads = nthreads;
|
vnodeMgr.nthreads = pOption->nthreads;
|
||||||
vnodeMgr.threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
|
vnodeMgr.threads = (pthread_t*)calloc(pOption->nthreads, sizeof(pthread_t));
|
||||||
if (vnodeMgr.threads == NULL) {
|
if (vnodeMgr.threads == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -38,7 +39,7 @@ int vnodeInit(uint16_t nthreads) {
|
||||||
pthread_cond_init(&(vnodeMgr.hasTask), NULL);
|
pthread_cond_init(&(vnodeMgr.hasTask), NULL);
|
||||||
TD_DLIST_INIT(&(vnodeMgr.queue));
|
TD_DLIST_INIT(&(vnodeMgr.queue));
|
||||||
|
|
||||||
for (uint16_t i = 0; i < nthreads; i++) {
|
for (uint16_t i = 0; i < pOption->nthreads; i++) {
|
||||||
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
|
pthread_create(&(vnodeMgr.threads[i]), NULL, loop, NULL);
|
||||||
pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
|
pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
|
||||||
}
|
}
|
||||||
|
@ -89,6 +90,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
|
||||||
|
assert(vnodeMgr.putReqToVQueryQFp);
|
||||||
|
assert(pVnode->pDnode);
|
||||||
|
(*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq);
|
||||||
|
}
|
||||||
|
|
||||||
/* ------------------------ STATIC METHODS ------------------------ */
|
/* ------------------------ STATIC METHODS ------------------------ */
|
||||||
static void* loop(void* arg) {
|
static void* loop(void* arg) {
|
||||||
SVnodeTask* pTask;
|
SVnodeTask* pTask;
|
||||||
|
|
Loading…
Reference in New Issue