From 3203e8fe42eec55890c44a96478bd6efde763c64 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 15:18:33 +0800 Subject: [PATCH] try integrating allocator --- source/dnode/vnode/inc/tq.h | 14 +------------- source/dnode/vnode/src/tq/tq.c | 15 +++++---------- source/dnode/vnode/src/vnd/vnodeMain.c | 4 ++-- 3 files changed, 8 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 588305c8ae..ec886286f3 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -198,17 +198,6 @@ typedef struct STqQueryMsg { struct STqQueryMsg* next; } STqQueryMsg; -typedef struct STqLogHandle { - void* logHandle; - void* (*openLogReader)(void* logHandle); - void (*closeLogReader)(void* logReader); - int32_t (*logRead)(void* logReader, void** data, int64_t ver); - - int64_t (*logGetFirstVer)(void* logHandle); - int64_t (*logGetSnapshotVer)(void* logHandle); - int64_t (*logGetLastVer)(void* logHandle); -} STqLogHandle; - typedef struct STqCfg { // TODO } STqCfg; @@ -306,7 +295,6 @@ typedef struct STQ { // the handle of meta kvstore char* path; STqCfg* tqConfig; - STqLogHandle* tqLogHandle; STqMemRef tqMemRef; STqMetaStore* tqMeta; } STQ; @@ -323,7 +311,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // void* will be replace by a msg type diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c4a4f087c6..2b2272caea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -50,7 +50,7 @@ void tqCleanUp() { taosTmrCleanUp(tqMgmt.timer); } -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -58,20 +58,15 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemA } pTq->path = strdup(path); pTq->tqConfig = tqConfig; - pTq->tqLogHandle = tqLogHandle; -#if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if (pTq->tqMemRef.pAllocator == NULL) { // TODO: error code of buffer pool } -#endif pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); if (pTq->tqMeta == NULL) { - free(pTq); -#if 0 allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator); -#endif + free(pTq); return NULL; } @@ -421,10 +416,10 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { // read from wal void* raw = NULL; /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ - int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset); - if (code < 0) { + /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/ + /*if (code < 0) {*/ // TODO: error - } + /*}*/ // get msgType // if submitblk pItem->executor->assign(pItem->executor->runtimeEnv, raw); diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index 76b7ccf0d9..cb0d76ed29 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -119,7 +119,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // TODO: Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; @@ -151,4 +151,4 @@ static void vnodeCloseImpl(SVnode *pVnode) { tqClose(pVnode->pTq); walClose(pVnode->pWal); } -} \ No newline at end of file +}