From c1fef8c4450d09cf99fe2219f719199a7b03bf90 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 16 Apr 2022 07:04:25 +0000 Subject: [PATCH 1/7] refact --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 ++-- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 5 ++++- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 4 ++-- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 925197d708..277546c66c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -175,7 +175,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr()); vnodeClose(pImpl); - vnodeDestroy(wrapperCfg.path); + vnodeDestroy(path, pMgmt->pTfs); terrno = code; return code; } @@ -184,7 +184,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { if (code != 0) { tFreeSCreateVnodeReq(&createReq); vnodeClose(pImpl); - vnodeDestroy(wrapperCfg.path); + vnodeDestroy(path, pMgmt->pTfs); terrno = code; return code; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 1b8e0eb961..980caf3827 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -84,6 +84,8 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { } void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { + char path[TSDB_FILENAME_LEN]; + taosWLockLatch(&pMgmt->latch); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosWUnLockLatch(&pMgmt->latch); @@ -104,7 +106,8 @@ void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { if (pVnode->dropped) { dDebug("vgId:%d, vnode is destroyed for dropped:%d", pVnode->vgId, pVnode->dropped); - vnodeDestroy(pVnode->path); + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); + vnodeDestroy(path, pMgmt->pTfs); } taosMemoryFree(pVnode->path); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 303376dba4..e302b5f431 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -45,7 +45,7 @@ typedef struct SVnodeCfg SVnodeCfg; int vnodeInit(int nthreads); void vnodeCleanup(); int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); -void vnodeDestroy(const char *path); +void vnodeDestroy(const char *path, STfs *pTfs); SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); void vnodeClose(SVnode *pVnode); void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 241c26ab1c..d92a6de1dc 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -51,6 +51,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { return 0; } +void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); } + SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; @@ -97,8 +99,6 @@ void vnodeClose(SVnode *pVnode) { } } -void vnodeDestroy(const char *path) { taosRemoveDir(path); } - /* ------------------------ STATIC METHODS ------------------------ */ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; From cb585812f1d1cafcc5e6f0ab33dd01a63588f48d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 16 Apr 2022 07:16:10 +0000 Subject: [PATCH 2/7] refact 1 --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 7 ++--- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 5 +-- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/vnd/vnodeCommit.c | 4 ++- source/dnode/vnode/src/vnd/vnodeOpen.c | 35 +++++++++------------ 5 files changed, 24 insertions(+), 29 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 277546c66c..48a0783ddf 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -160,13 +160,10 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - vnodeCfg.msgCb = msgCb; - vnodeCfg.pTfs = pMgmt->pTfs; - vnodeCfg.dbId = wrapperCfg.dbUid; - SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); + SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { - tFreeSCreateVnodeReq(&createReq); dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); + tFreeSCreateVnodeReq(&createReq); return -1; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 980caf3827..bf33e85b95 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -119,6 +119,7 @@ static void *vmOpenVnodeFunc(void *param) { SVnodeThread *pThread = param; SVnodesMgmt *pMgmt = pThread->pMgmt; SDnode *pDnode = pMgmt->pDnode; + char path[TSDB_FILENAME_LEN]; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); @@ -137,8 +138,8 @@ static void *vmOpenVnodeFunc(void *param) { msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; - SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); + SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, msgCb); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e302b5f431..b45077bbc3 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -46,7 +46,7 @@ int vnodeInit(int nthreads); void vnodeCleanup(); int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); +SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); void vnodeClose(SVnode *pVnode); void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs); int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index fa249d3ba1..e57b598ac3 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -108,7 +108,7 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { goto _err; } - pData = taosMemoryMalloc(size); + pData = taosMemoryMalloc(size + 1); if (pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -119,6 +119,8 @@ int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { goto _err; } + pData[size] = '\0'; + taosCloseFile(&pFile); // decode info diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index d92a6de1dc..ee51b16e53 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -53,36 +53,31 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); } -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { - SVnode *pVnode = NULL; +SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { + SVnode *pVnode = NULL; + SVnodeInfo info = {0}; + char dir[TSDB_FILENAME_LEN]; + int ret; - // Set default options - SVnodeCfg cfg = vnodeCfgDefault; - if (pVnodeCfg != NULL) { - cfg.vgId = pVnodeCfg->vgId; - cfg.msgCb = pVnodeCfg->msgCb; - cfg.pTfs = pVnodeCfg->pTfs; - cfg.dbId = pVnodeCfg->dbId; - cfg.hashBegin = pVnodeCfg->hashBegin; - cfg.hashEnd = pVnodeCfg->hashEnd; - cfg.hashMethod = pVnodeCfg->hashMethod; - } + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path); - // Validate options - if (vnodeCheckCfg(&cfg) < 0) { - // TODO + // load vnode info + ret = vnodeLoadInfo(dir, &info); + if (ret < 0) { + vError("failed to open vnode from %s since %s", path, tstrerror(terrno)); return NULL; } - // Create the handle - pVnode = vnodeNew(path, &cfg); + info.config.pTfs = pTfs; + info.config.msgCb = msgCb; + + // crate handle + pVnode = vnodeNew(dir, &info.config); if (pVnode == NULL) { // TODO: handle error return NULL; } - taosMkDir(path); - // Open the vnode if (vnodeOpenImpl(pVnode) < 0) { // TODO: handle error From 78a8f6dfe47e56f45e247f2d8ed3c0f579c6d533 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 16 Apr 2022 07:26:11 +0000 Subject: [PATCH 3/7] fix bug --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 6 +++--- source/dnode/vnode/src/vnd/vnodeCommit.c | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 48a0783ddf..706dc91c3f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -83,9 +83,9 @@ int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; - pCfg->wsize = pCreate->cacheBlockSize; - pCfg->ssize = pCreate->cacheBlockSize; - pCfg->lsize = pCreate->cacheBlockSize; + pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024; + pCfg->ssize = 1024; + pCfg->lsize = 1024 * 1024; pCfg->isHeapAllocator = true; pCfg->ttl = 4; pCfg->keep = pCreate->daysToKeep0; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index e57b598ac3..807266e91d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -288,7 +288,7 @@ _err: static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { SJson *pJson = NULL; - pJson = tjsonCreateObject(); + pJson = tjsonParse(pData); if (pJson == NULL) { return -1; } From 913749ea91450d5e2740b9913ab50696aadc80aa Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 16 Apr 2022 07:41:57 +0000 Subject: [PATCH 4/7] fix TMQ case --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 12 ++++++------ source/dnode/vnode/src/vnd/vnodeCommit.c | 20 ++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeOpen.c | 2 ++ 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 706dc91c3f..12e956fa4c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -97,12 +97,12 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; pCfg->tsdbCfg.retentions = pCreate->pRetensions; pCfg->metaCfg.lruSize = pCreate->cacheBlockSize; - pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod; - pCfg->walCfg.level = pCreate->walLevel; - pCfg->walCfg.retentionPeriod = 10; - pCfg->walCfg.retentionSize = 128; - pCfg->walCfg.rollPeriod = 128; - pCfg->walCfg.segSize = 128; + pCfg->walCfg.level = TAOS_WAL_WRITE; + pCfg->walCfg.fsyncPeriod = 0; + pCfg->walCfg.retentionPeriod = 0; + pCfg->walCfg.retentionSize = 0; + pCfg->walCfg.rollPeriod = 0; + pCfg->walCfg.segSize = 0; pCfg->walCfg.vgId = pCreate->vgId; pCfg->hashBegin = pCreate->hashBegin; pCfg->hashEnd = pCreate->hashEnd; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 807266e91d..3ece3c054e 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -204,6 +204,16 @@ static int vnodeEncodeConfig(const void *pObj, SJson *pJson) { if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; return 0; } @@ -231,6 +241,16 @@ static int vnodeDecodeConfig(const SJson *pJson, void *pObj) { if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.rollPeriod", pCfg->walCfg.rollPeriod) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1; + if (tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1; + if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1; + if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1; return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ee51b16e53..7e17af0180 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -70,6 +70,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { info.config.pTfs = pTfs; info.config.msgCb = msgCb; + // memset(&info.config.walCfg, 0, sizeof(SWalCfg)); + // info.config.walCfg.level = TAOS_WAL_WRITE; // crate handle pVnode = vnodeNew(dir, &info.config); From 2ad1f30fae93f5902799c175ee2789e831d2c015 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 16 Apr 2022 07:50:05 +0000 Subject: [PATCH 5/7] refact --- source/dnode/vnode/src/inc/vnodeInt.h | 3 +- source/dnode/vnode/src/tq/tq.c | 42 ++++++++++++------------- source/dnode/vnode/src/vnd/vnodeOpen.c | 5 +-- source/dnode/vnode/src/vnd/vnodeQuery.c | 6 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 12 +++---- 5 files changed, 33 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e95878b04e..04103ec26d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -80,7 +80,6 @@ struct SVnodeInfo { }; struct SVnode { - int32_t vgId; char* path; SVnodeCfg config; SVState state; @@ -96,6 +95,8 @@ struct SVnode { STfs* pTfs; }; +#define TD_VID(PVNODE) (PVNODE)->config.vgId + // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bfcc5dd2bb..3a4168e759 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -267,7 +267,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -277,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -303,7 +303,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (pTopic == NULL) { vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -312,7 +312,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; @@ -323,7 +323,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; @@ -332,11 +332,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // if data inserted during waiting, launch query and // response to user vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + TD_VID(pTq->pVnode), fetchOffset); break; } vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -361,7 +361,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (taosArrayGetSize(pRes) == 0) { vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -390,7 +390,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); @@ -422,7 +422,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ @@ -446,14 +446,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); SMqPollRspV2 rspV2 = {0}; rspV2.dataLen = 0; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -479,7 +479,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (pTopic == NULL) { vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -488,7 +488,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); rspV2.reqOffset = pReq->currentOffset; rspV2.skipLogNum = 0; @@ -499,7 +499,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; @@ -508,11 +508,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // if data inserted during waiting, launch query and // response to user vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + TD_VID(pTq->pVnode), fetchOffset); break; } vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -537,7 +537,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (taosArrayGetSize(pRes) == 0) { vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); fetchOffset++; rspV2.skipLogNum++; taosArrayDestroy(pRes); @@ -597,7 +597,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = msgLen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); @@ -631,7 +631,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ @@ -742,7 +742,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); ASSERT(pTopic->buffer.output[i].task); } - vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); + vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, TD_VID(pTq->pVnode)); taosArrayPush(pConsumer->topics, pTopic); if (create) { tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 7e17af0180..c8943a023a 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -70,8 +70,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { info.config.pTfs = pTfs; info.config.msgCb = msgCb; - // memset(&info.config.walCfg, 0, sizeof(SWalCfg)); - // info.config.walCfg.level = TAOS_WAL_WRITE; // crate handle pVnode = vnodeNew(dir, &info.config); @@ -106,7 +104,6 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { return NULL; } - pVnode->vgId = pVnodeCfg->vgId; pVnode->msgCb = pVnodeCfg->msgCb; pVnode->pTfs = pVnodeCfg->pTfs; pVnode->path = strdup(path); @@ -144,7 +141,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open tsdb sprintf(dir, "%s/tsdb", pVnode->path); pVnode->pTsdb = - tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); + tsdbOpen(dir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); if (pVnode->pTsdb == NULL) { // TODO: handle error return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 75079d50b2..4202c02a0c 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -16,7 +16,7 @@ #include "vnodeInt.h" int vnodeQueryOpen(SVnode *pVnode) { - return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); + return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } @@ -101,7 +101,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { metaRsp.numOfColumns = nCols; metaRsp.tableType = pTbCfg->type; metaRsp.tuid = uid; - metaRsp.vgId = pVnode->vgId; + metaRsp.vgId = TD_VID(pVnode); memcpy(metaRsp.pSchemas, pSW->pSchema, sizeof(SSchema) * pSW->nCols); if (nTagCols) { @@ -151,7 +151,7 @@ _exit: } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { - pLoad->vgId = pVnode->vgId; + pLoad->vgId = TD_VID(pVnode); pLoad->role = TAOS_SYNC_STATE_LEADER; pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 87cef53042..288241eb66 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -36,7 +36,7 @@ void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { // TODO: handle error /*ASSERT(false);*/ - vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); + vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr()); } } @@ -73,12 +73,12 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_ALTER_STB: return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); case TDMT_VND_DROP_STB: - vTrace("vgId:%d, process drop stb req", pVnode->vgId); + vTrace("vgId:%d, process drop stb req", TD_VID(pVnode)); break; case TDMT_VND_DROP_TABLE: break; case TDMT_VND_SUBMIT: - /*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ + /*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/ if (pVnode->config.streamMode == 0) { *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); (*pRsp)->handle = pMsg->handle; @@ -245,7 +245,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error - vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); + vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name); } // TODO: to encapsule a free API taosMemoryFree(pCreateTbReq->name); @@ -268,7 +268,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR } } - vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); + vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); if (vCreateTbBatchRsp.rspList) { int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); @@ -289,7 +289,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) { SVCreateTbReq vAlterTbReq = {0}; - vTrace("vgId:%d, process alter stb req", pVnode->vgId); + vTrace("vgId:%d, process alter stb req", TD_VID(pVnode)); tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); // TODO: to encapsule a free API taosMemoryFree(vAlterTbReq.stbCfg.pSchema); From fe368d0ffec89f2f9bfc269199ddfdef89299f2b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 16 Apr 2022 08:03:19 +0000 Subject: [PATCH 6/7] refact vnode --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 - source/dnode/vnode/inc/vnode.h | 8 -------- source/dnode/vnode/src/inc/meta.h | 3 +-- source/dnode/vnode/src/inc/tq.h | 4 +--- source/dnode/vnode/src/meta/metaMain.c | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 4 +--- source/dnode/vnode/src/vnd/vnodeOpen.c | 4 ++-- 7 files changed, 9 insertions(+), 23 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 12e956fa4c..7eb08a8f4f 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -96,7 +96,6 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0; pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; pCfg->tsdbCfg.retentions = pCreate->pRetensions; - pCfg->metaCfg.lruSize = pCreate->cacheBlockSize; pCfg->walCfg.level = TAOS_WAL_WRITE; pCfg->walCfg.fsyncPeriod = 0; pCfg->walCfg.retentionPeriod = 0; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index b45077bbc3..c1e96ffcb1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -37,9 +37,7 @@ extern "C" { // vnode typedef struct SVnode SVnode; -typedef struct SMetaCfg SMetaCfg; // todo: remove typedef struct STsdbCfg STsdbCfg; // todo: remove -typedef struct STqCfg STqCfg; // todo: remove typedef struct SVnodeCfg SVnodeCfg; int vnodeInit(int nthreads); @@ -134,10 +132,6 @@ struct STsdbCfg { SArray *retentions; }; -struct STqCfg { - int32_t reserved; -}; - struct SVnodeCfg { int32_t vgId; uint64_t dbId; @@ -151,8 +145,6 @@ struct SVnodeCfg { int8_t streamMode; bool isWeak; STsdbCfg tsdbCfg; - SMetaCfg metaCfg; - STqCfg tqCfg; SWalCfg walCfg; SMsgCb msgCb; uint32_t hashBegin; diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 94a1266f46..f30696c9ac 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -40,7 +40,7 @@ typedef struct SMSmaCursor SMSmaCursor; #define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE -SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); +SMeta* metaOpen(const char* path, SMemAllocatorFactory* pMAF); void metaClose(SMeta* pMeta); void metaRemove(const char* path); int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); @@ -97,7 +97,6 @@ tb_uid_t metaGenerateUid(SMeta* pMeta); struct SMeta { char* path; SVnode* pVnode; - SMetaCfg options; SMetaDB* pDB; SMetaIdx* pIdx; SMetaCache* pCache; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ce81006661..ed33473b16 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -160,7 +160,6 @@ struct STQ { // the handle of meta kvstore bool writeTrigger; char* path; - STqCfg* tqConfig; STqMemRef tqMemRef; STqMetaStore* tqMeta; // STqPushMgr* tqPushMgr; @@ -251,8 +250,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, - SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // required by vnode int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); diff --git a/source/dnode/vnode/src/meta/metaMain.c b/source/dnode/vnode/src/meta/metaMain.c index dd60e56371..879a7e8a6f 100644 --- a/source/dnode/vnode/src/meta/metaMain.c +++ b/source/dnode/vnode/src/meta/metaMain.c @@ -17,16 +17,16 @@ #include "vnodeInt.h" -static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); +static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF); static void metaFree(SMeta *pMeta); static int metaOpenImpl(SMeta *pMeta); static void metaCloseImpl(SMeta *pMeta); -SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { +SMeta *metaOpen(const char *path, SMemAllocatorFactory *pMAF) { SMeta *pMeta = NULL; // Allocate handle - pMeta = metaNew(path, pMetaCfg, pMAF); + pMeta = metaNew(path, pMAF); if (pMeta == NULL) { // TODO: handle error return NULL; @@ -54,7 +54,7 @@ void metaClose(SMeta *pMeta) { void metaRemove(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { +static SMeta *metaNew(const char *path, SMemAllocatorFactory *pMAF) { SMeta *pMeta; size_t psize = strlen(path); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3a4168e759..0aa6023eaf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -19,15 +19,13 @@ int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, - SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return NULL; } pTq->path = strdup(path); - pTq->tqConfig = tqConfig; pTq->pVnode = pVnode; pTq->pWal = pWal; pTq->pVnodeMeta = pVnodeMeta; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c8943a023a..ea27cda998 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -132,7 +132,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open meta sprintf(dir, "%s/meta", pVnode->path); - pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg), vBufPoolGetMAF(pVnode)); + pVnode->pMeta = metaOpen(dir, vBufPoolGetMAF(pVnode)); if (pVnode->pMeta == NULL) { // TODO: handle error return -1; @@ -157,7 +157,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(dir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; From 6e7ab6ca4d62c961bf1ddec897d84da1d8d6ce26 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 16 Apr 2022 08:19:21 +0000 Subject: [PATCH 7/7] refact vnode --- source/dnode/vnode/inc/vnode.h | 2 - source/dnode/vnode/src/inc/vnodeInt.h | 4 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 54 ++++++++------------------ 3 files changed, 19 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c1e96ffcb1..c969ad5abc 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -135,7 +135,6 @@ struct STsdbCfg { struct SVnodeCfg { int32_t vgId; uint64_t dbId; - STfs *pTfs; uint64_t wsize; uint64_t ssize; uint64_t lsize; @@ -146,7 +145,6 @@ struct SVnodeCfg { bool isWeak; STsdbCfg tsdbCfg; SWalCfg walCfg; - SMsgCb msgCb; uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 04103ec26d..a765f5e88a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -83,6 +83,8 @@ struct SVnode { char* path; SVnodeCfg config; SVState state; + STfs* pTfs; + SMsgCb msgCb; SVBufPool* pBufPool; SMeta* pMeta; STsdb* pTsdb; @@ -91,8 +93,6 @@ struct SVnode { SSink* pSink; tsem_t canCommit; SQHandle* pQuery; - SMsgCb msgCb; - STfs* pTfs; }; #define TD_VID(PVNODE) (PVNODE)->config.vgId diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index ea27cda998..e7aeb75ea5 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -15,10 +15,8 @@ #include "vnodeInt.h" -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); -static void vnodeFree(SVnode *pVnode); -static int vnodeOpenImpl(SVnode *pVnode); -static void vnodeCloseImpl(SVnode *pVnode); +static int vnodeOpenImpl(SVnode *pVnode); +static void vnodeCloseImpl(SVnode *pVnode); int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { SVnodeInfo info = {0}; @@ -68,17 +66,24 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { return NULL; } - info.config.pTfs = pTfs; - info.config.msgCb = msgCb; - - // crate handle - pVnode = vnodeNew(dir, &info.config); + // create handle + pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode)); if (pVnode == NULL) { - // TODO: handle error + terrno = TSDB_CODE_OUT_OF_MEMORY; + vError("vgId: %d failed to open vnode since %s", info.config.vgId, tstrerror(terrno)); return NULL; } - // Open the vnode + pVnode->path = strdup(dir); + pVnode->config = info.config; + pVnode->state.committed = info.state.committed; + pVnode->state.processed = pVnode->state.applied = pVnode->state.committed; + pVnode->pTfs = pTfs; + pVnode->msgCb = msgCb; + + tsem_init(&(pVnode->canCommit), 0, 1); + + // open the vnode if (vnodeOpenImpl(pVnode) < 0) { // TODO: handle error return NULL; @@ -90,38 +95,13 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { void vnodeClose(SVnode *pVnode) { if (pVnode) { vnodeCloseImpl(pVnode); - vnodeFree(pVnode); - } -} - -/* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { - SVnode *pVnode = NULL; - - pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode)); - if (pVnode == NULL) { - // TODO - return NULL; - } - - pVnode->msgCb = pVnodeCfg->msgCb; - pVnode->pTfs = pVnodeCfg->pTfs; - pVnode->path = strdup(path); - vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); - - tsem_init(&(pVnode->canCommit), 0, 1); - - return pVnode; -} - -static void vnodeFree(SVnode *pVnode) { - if (pVnode) { tsem_destroy(&(pVnode->canCommit)); taosMemoryFreeClear(pVnode->path); taosMemoryFree(pVnode); } } +/* ------------------------ STATIC METHODS ------------------------ */ static int vnodeOpenImpl(SVnode *pVnode) { char dir[TSDB_FILENAME_LEN];