refact vnode
This commit is contained in:
parent
2ad1f30fae
commit
fe368d0ffe
|
@ -96,7 +96,6 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0;
|
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0;
|
||||||
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
|
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
|
||||||
pCfg->tsdbCfg.retentions = pCreate->pRetensions;
|
pCfg->tsdbCfg.retentions = pCreate->pRetensions;
|
||||||
pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
|
|
||||||
pCfg->walCfg.level = TAOS_WAL_WRITE;
|
pCfg->walCfg.level = TAOS_WAL_WRITE;
|
||||||
pCfg->walCfg.fsyncPeriod = 0;
|
pCfg->walCfg.fsyncPeriod = 0;
|
||||||
pCfg->walCfg.retentionPeriod = 0;
|
pCfg->walCfg.retentionPeriod = 0;
|
||||||
|
|
|
@ -37,9 +37,7 @@ extern "C" {
|
||||||
|
|
||||||
// vnode
|
// vnode
|
||||||
typedef struct SVnode SVnode;
|
typedef struct SVnode SVnode;
|
||||||
typedef struct SMetaCfg SMetaCfg; // todo: remove
|
|
||||||
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
typedef struct STsdbCfg STsdbCfg; // todo: remove
|
||||||
typedef struct STqCfg STqCfg; // todo: remove
|
|
||||||
typedef struct SVnodeCfg SVnodeCfg;
|
typedef struct SVnodeCfg SVnodeCfg;
|
||||||
|
|
||||||
int vnodeInit(int nthreads);
|
int vnodeInit(int nthreads);
|
||||||
|
@ -134,10 +132,6 @@ struct STsdbCfg {
|
||||||
SArray *retentions;
|
SArray *retentions;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct STqCfg {
|
|
||||||
int32_t reserved;
|
|
||||||
};
|
|
||||||
|
|
||||||
struct SVnodeCfg {
|
struct SVnodeCfg {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
uint64_t dbId;
|
uint64_t dbId;
|
||||||
|
@ -151,8 +145,6 @@ struct SVnodeCfg {
|
||||||
int8_t streamMode;
|
int8_t streamMode;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
STsdbCfg tsdbCfg;
|
STsdbCfg tsdbCfg;
|
||||||
SMetaCfg metaCfg;
|
|
||||||
STqCfg tqCfg;
|
|
||||||
SWalCfg walCfg;
|
SWalCfg walCfg;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
uint32_t hashBegin;
|
uint32_t hashBegin;
|
||||||
|
|
|
@ -40,7 +40,7 @@ typedef struct SMSmaCursor SMSmaCursor;
|
||||||
#define META_CHILD_TABLE TD_CHILD_TABLE
|
#define META_CHILD_TABLE TD_CHILD_TABLE
|
||||||
#define META_NORMAL_TABLE TD_NORMAL_TABLE
|
#define META_NORMAL_TABLE TD_NORMAL_TABLE
|
||||||
|
|
||||||
SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF);
|
SMeta* metaOpen(const char* path, SMemAllocatorFactory* pMAF);
|
||||||
void metaClose(SMeta* pMeta);
|
void metaClose(SMeta* pMeta);
|
||||||
void metaRemove(const char* path);
|
void metaRemove(const char* path);
|
||||||
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg);
|
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg);
|
||||||
|
@ -97,7 +97,6 @@ tb_uid_t metaGenerateUid(SMeta* pMeta);
|
||||||
struct SMeta {
|
struct SMeta {
|
||||||
char* path;
|
char* path;
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
SMetaCfg options;
|
|
||||||
SMetaDB* pDB;
|
SMetaDB* pDB;
|
||||||
SMetaIdx* pIdx;
|
SMetaIdx* pIdx;
|
||||||
SMetaCache* pCache;
|
SMetaCache* pCache;
|
||||||
|
|
|
@ -160,7 +160,6 @@ struct STQ {
|
||||||
// the handle of meta kvstore
|
// the handle of meta kvstore
|
||||||
bool writeTrigger;
|
bool writeTrigger;
|
||||||
char* path;
|
char* path;
|
||||||
STqCfg* tqConfig;
|
|
||||||
STqMemRef tqMemRef;
|
STqMemRef tqMemRef;
|
||||||
STqMetaStore* tqMeta;
|
STqMetaStore* tqMeta;
|
||||||
// STqPushMgr* tqPushMgr;
|
// STqPushMgr* tqPushMgr;
|
||||||
|
@ -251,8 +250,7 @@ int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
|
|
||||||
// open in each vnode
|
// open in each vnode
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig,
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac);
|
||||||
SMemAllocatorFactory* allocFac);
|
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
// required by vnode
|
// required by vnode
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
|
||||||
|
|
|
@ -17,16 +17,16 @@
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
|
static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF);
|
||||||
static void metaFree(SMeta *pMeta);
|
static void metaFree(SMeta *pMeta);
|
||||||
static int metaOpenImpl(SMeta *pMeta);
|
static int metaOpenImpl(SMeta *pMeta);
|
||||||
static void metaCloseImpl(SMeta *pMeta);
|
static void metaCloseImpl(SMeta *pMeta);
|
||||||
|
|
||||||
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) {
|
SMeta *metaOpen(const char *path, SMemAllocatorFactory *pMAF) {
|
||||||
SMeta *pMeta = NULL;
|
SMeta *pMeta = NULL;
|
||||||
|
|
||||||
// Allocate handle
|
// Allocate handle
|
||||||
pMeta = metaNew(path, pMetaCfg, pMAF);
|
pMeta = metaNew(path, pMAF);
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -54,7 +54,7 @@ void metaClose(SMeta *pMeta) {
|
||||||
void metaRemove(const char *path) { taosRemoveDir(path); }
|
void metaRemove(const char *path) { taosRemoveDir(path); }
|
||||||
|
|
||||||
/* ------------------------ STATIC METHODS ------------------------ */
|
/* ------------------------ STATIC METHODS ------------------------ */
|
||||||
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) {
|
static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF) {
|
||||||
SMeta *pMeta;
|
SMeta *pMeta;
|
||||||
size_t psize = strlen(path);
|
size_t psize = strlen(path);
|
||||||
|
|
||||||
|
|
|
@ -19,15 +19,13 @@ int32_t tqInit() { return tqPushMgrInit(); }
|
||||||
|
|
||||||
void tqCleanUp() { tqPushMgrCleanUp(); }
|
void tqCleanUp() { tqPushMgrCleanUp(); }
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig,
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) {
|
||||||
SMemAllocatorFactory* allocFac) {
|
|
||||||
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->tqConfig = tqConfig;
|
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
pTq->pWal = pWal;
|
pTq->pWal = pWal;
|
||||||
pTq->pVnodeMeta = pVnodeMeta;
|
pTq->pVnodeMeta = pVnodeMeta;
|
||||||
|
|
|
@ -132,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
||||||
|
|
||||||
// Open meta
|
// Open meta
|
||||||
sprintf(dir, "%s/meta", pVnode->path);
|
sprintf(dir, "%s/meta", pVnode->path);
|
||||||
pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg), vBufPoolGetMAF(pVnode));
|
pVnode->pMeta = metaOpen(dir, vBufPoolGetMAF(pVnode));
|
||||||
if (pVnode->pMeta == NULL) {
|
if (pVnode->pMeta == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -157,7 +157,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
||||||
|
|
||||||
// Open TQ
|
// Open TQ
|
||||||
sprintf(dir, "%s/tq", pVnode->path);
|
sprintf(dir, "%s/tq", pVnode->path);
|
||||||
pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode));
|
pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode));
|
||||||
if (pVnode->pTq == NULL) {
|
if (pVnode->pTq == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue