refact vnode
This commit is contained in:
parent
0ed14aa00d
commit
032b30da75
|
@ -52,6 +52,11 @@ typedef struct SVState SVState;
|
|||
typedef struct SVBufPool SVBufPool;
|
||||
typedef struct SQWorkerMgmt SQHandle;
|
||||
|
||||
#define VNODE_META_DIR "meta"
|
||||
#define VNODE_TSDB_DIR "tsdb"
|
||||
#define VNODE_TQ_DIR "tq"
|
||||
#define VNODE_WAL_DIR "wal"
|
||||
|
||||
typedef struct {
|
||||
int8_t streamType; // sma or other
|
||||
int8_t dstType;
|
||||
|
|
|
@ -15,9 +15,6 @@
|
|||
|
||||
#include "vnodeInt.h"
|
||||
|
||||
static int vnodeOpenImpl(SVnode *pVnode);
|
||||
static void vnodeCloseImpl(SVnode *pVnode);
|
||||
|
||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||
SVnodeInfo info = {0};
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
|
@ -55,6 +52,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
SVnode *pVnode = NULL;
|
||||
SVnodeInfo info = {0};
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
char tdir[TSDB_FILENAME_LEN * 2];
|
||||
int ret;
|
||||
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
||||
|
@ -83,83 +81,85 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
|
||||
tsem_init(&(pVnode->canCommit), 0, 1);
|
||||
|
||||
// open the vnode
|
||||
if (vnodeOpenImpl(pVnode) < 0) {
|
||||
// TODO: handle error
|
||||
return NULL;
|
||||
// open buffer pool
|
||||
if (vnodeOpenBufPool(pVnode) < 0) {
|
||||
vError("vgId: %d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open meta
|
||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_META_DIR);
|
||||
pVnode->pMeta = metaOpen(tdir, vBufPoolGetMAF(pVnode));
|
||||
if (pVnode->pMeta == NULL) {
|
||||
vError("vgId: %d failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open tsdb
|
||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TSDB_DIR);
|
||||
pVnode->pTsdb =
|
||||
tsdbOpen(tdir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
|
||||
if (pVnode->pTsdb == NULL) {
|
||||
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open wal
|
||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
|
||||
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
|
||||
if (pVnode->pWal == NULL) {
|
||||
vError("vgId: %d failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open tq
|
||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
||||
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode));
|
||||
if (pVnode->pTq == NULL) {
|
||||
vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open query
|
||||
if (vnodeQueryOpen(pVnode)) {
|
||||
vError("vgId: %d failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (vnodeBegin() < 0) {
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
return pVnode;
|
||||
|
||||
_err:
|
||||
if (pVnode->pQuery) vnodeQueryClose(pVnode);
|
||||
if (pVnode->pTq) tqClose(pVnode->pTq);
|
||||
if (pVnode->pWal) walClose(pVnode->pWal);
|
||||
if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb);
|
||||
if (pVnode->pMeta) metaClose(pVnode->pMeta);
|
||||
tsem_destroy(&(pVnode->canCommit));
|
||||
taosMemoryFreeClear(pVnode->path);
|
||||
taosMemoryFree(pVnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void vnodeClose(SVnode *pVnode) {
|
||||
if (pVnode) {
|
||||
vnodeCloseImpl(pVnode);
|
||||
// commit (TODO: use option to control)
|
||||
vnodeSyncCommit(pVnode);
|
||||
// close vnode
|
||||
vnodeQueryClose(pVnode);
|
||||
walClose(pVnode->pWal);
|
||||
tqClose(pVnode->pTq);
|
||||
tsdbClose(pVnode->pTsdb);
|
||||
metaClose(pVnode->pMeta);
|
||||
vnodeCloseBufPool(pVnode);
|
||||
// destroy handle
|
||||
tsem_destroy(&(pVnode->canCommit));
|
||||
taosMemoryFreeClear(pVnode->path);
|
||||
taosMemoryFree(pVnode);
|
||||
}
|
||||
}
|
||||
|
||||
/* ------------------------ STATIC METHODS ------------------------ */
|
||||
static int vnodeOpenImpl(SVnode *pVnode) {
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
|
||||
if (vnodeOpenBufPool(pVnode) < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Open meta
|
||||
sprintf(dir, "%s/meta", pVnode->path);
|
||||
pVnode->pMeta = metaOpen(dir, vBufPoolGetMAF(pVnode));
|
||||
if (pVnode->pMeta == NULL) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Open tsdb
|
||||
sprintf(dir, "%s/tsdb", pVnode->path);
|
||||
pVnode->pTsdb =
|
||||
tsdbOpen(dir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
|
||||
if (pVnode->pTsdb == NULL) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Open WAL
|
||||
sprintf(dir, "%s/wal", pVnode->path);
|
||||
pVnode->pWal = walOpen(dir, &(pVnode->config.walCfg));
|
||||
if (pVnode->pWal == NULL) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Open TQ
|
||||
sprintf(dir, "%s/tq", pVnode->path);
|
||||
pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode));
|
||||
if (pVnode->pTq == NULL) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Open Query
|
||||
if (vnodeQueryOpen(pVnode)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void vnodeCloseImpl(SVnode *pVnode) {
|
||||
vnodeSyncCommit(pVnode);
|
||||
if (pVnode) {
|
||||
vnodeCloseBufPool(pVnode);
|
||||
metaClose(pVnode->pMeta);
|
||||
tsdbClose(pVnode->pTsdb);
|
||||
tqClose(pVnode->pTq);
|
||||
walClose(pVnode->pWal);
|
||||
vnodeQueryClose(pVnode);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue