diff --git a/include/dnode/vnode/meta/meta.h b/include/dnode/vnode/meta/meta.h index 44ae1bb79f..113a970548 100644 --- a/include/dnode/vnode/meta/meta.h +++ b/include/dnode/vnode/meta/meta.h @@ -16,6 +16,7 @@ #ifndef _TD_META_H_ #define _TD_META_H_ +#include "mallocator.h" #include "os.h" #include "trow.h" @@ -71,7 +72,7 @@ typedef struct STbCfg { } STbCfg; // SMeta operations -SMeta *metaOpen(const char *path, const SMetaCfg *pOptions); +SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); void metaClose(SMeta *pMeta); void metaRemove(const char *path); int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); @@ -79,8 +80,8 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); // Options -void metaOptionsInit(SMetaCfg *pOptions); -void metaOptionsClear(SMetaCfg *pOptions); +void metaOptionsInit(SMetaCfg *pMetaCfg); +void metaOptionsClear(SMetaCfg *pMetaCfg); // STbCfg #define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \ diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index f8eac9768f..b85c6b64f6 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -16,6 +16,8 @@ #ifndef _TD_TSDB_H_ #define _TD_TSDB_H_ +#include "mallocator.h" + #ifdef __cplusplus extern "C" { #endif @@ -25,7 +27,7 @@ typedef struct STsdb STsdb; typedef struct STsdbCfg STsdbCfg; // STsdb -STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg); +STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); void tsdbClose(STsdb *); void tsdbRemove(const char *path); int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg); diff --git a/include/util/tmacro.h b/include/util/tmacro.h new file mode 100644 index 0000000000..5cca8a1062 --- /dev/null +++ b/include/util/tmacro.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_UTIL_MACRO_H_ +#define _TD_UTIL_MACRO_H_ + +#include "os.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// Module init/clear MACRO definitions +#define TD_MOD_UNINITIALIZED 0 +#define TD_MOD_INITIALIZED 1 + +#define TD_MOD_UNCLEARD 0 +#define TD_MOD_CLEARD 1 + +typedef int8_t td_mode_flag_t; + +#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED) + +#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD) + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_UTIL_MACRO_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/impl/CMakeLists.txt b/source/dnode/vnode/impl/CMakeLists.txt index 3623516624..6972605afd 100644 --- a/source/dnode/vnode/impl/CMakeLists.txt +++ b/source/dnode/vnode/impl/CMakeLists.txt @@ -19,5 +19,5 @@ target_link_libraries( # test if(${BUILD_TEST}) - #add_subdirectory(test) + add_subdirectory(test) endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/dnode/vnode/impl/inc/vnodeBufferPool.h b/source/dnode/vnode/impl/inc/vnodeBufferPool.h index d96671d2bd..b4535597ee 100644 --- a/source/dnode/vnode/impl/inc/vnodeBufferPool.h +++ b/source/dnode/vnode/impl/inc/vnodeBufferPool.h @@ -32,6 +32,8 @@ int vnodeBufPoolRecycle(SVnode *pVnode); void *vnodeMalloc(SVnode *pVnode, uint64_t size); bool vnodeBufPoolIsFull(SVnode *pVnode); +SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/impl/inc/vnodeMAF.h b/source/dnode/vnode/impl/inc/vnodeMAF.h new file mode 100644 index 0000000000..7aa405103c --- /dev/null +++ b/source/dnode/vnode/impl/inc/vnodeMAF.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_VNODE_MAF_H_ +#define _TD_VNODE_MAF_H_ + +#include "vnode.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int vnodeOpenMAF(SVnode *pVnode); +void vnodeCloseMAF(SVnode *pVnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_VNODE_MAF_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeBufferPool.c b/source/dnode/vnode/impl/src/vnodeBufferPool.c index 152a346f0a..6c1ededfc9 100644 --- a/source/dnode/vnode/impl/src/vnodeBufferPool.c +++ b/source/dnode/vnode/impl/src/vnodeBufferPool.c @@ -24,10 +24,13 @@ struct SVBufPool { TD_DLIST(SVMemAllocator) free; TD_DLIST(SVMemAllocator) incycle; SVMemAllocator *inuse; - // MAF for submodules - // SMemAllocatorFactory maf; + // MAF for submodules to use + SMemAllocatorFactory *pMAF; }; +static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF); +static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA); + int vnodeOpenBufPool(SVnode *pVnode) { uint64_t capacity; @@ -54,6 +57,15 @@ int vnodeOpenBufPool(SVnode *pVnode) { tDListAppend(&(pVnode->pBufPool->free), pVMA); } + pVnode->pBufPool->pMAF = (SMemAllocatorFactory *)malloc(sizeof(SMemAllocatorFactory)); + if (pVnode->pBufPool->pMAF == NULL) { + // TODO: handle error + return -1; + } + pVnode->pBufPool->pMAF->impl = pVnode; + pVnode->pBufPool->pMAF->create = vBufPoolCreateMA; + pVnode->pBufPool->pMAF->destroy = vBufPoolDestroyMA; + return 0; } @@ -125,195 +137,54 @@ bool vnodeBufPoolIsFull(SVnode *pVnode) { return vmaIsFull(pVnode->pBufPool->inuse); } -#if 0 - -typedef enum { - // Heap allocator - E_V_HEAP_ALLOCATOR = 0, - // Arena allocator - E_V_ARENA_ALLOCATOR -} EVMemAllocatorT; - -typedef struct { - /* TODO */ -} SVHeapAllocator; - -typedef struct SVArenaNode { - struct SVArenaNode *prev; - uint64_t size; - void * ptr; - char data[]; -} SVArenaNode; - -typedef struct { - uint64_t ssize; // step size - uint64_t lsize; // limit size - SVArenaNode *inuse; - SVArenaNode node; -} SVArenaAllocator; - -typedef struct { - SVnode * pVnode; - SListNode *pNode; -} SVMAWrapper; - - -static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); -static void vBufPoolFreeNode(SListNode *pNode); -static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf); -static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma); -static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size); +SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode) { return pVnode->pBufPool->pMAF; } /* ------------------------ STATIC METHODS ------------------------ */ -static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type) { - SListNode * pNode; - SVMemAllocator *pvma; - uint64_t msize; - uint64_t ssize = 4096; // TODO - uint64_t lsize = 1024; // TODO - - msize = sizeof(SListNode) + sizeof(SVMemAllocator); - if (type == E_V_ARENA_ALLOCATOR) { - msize += capacity; - } - - pNode = (SListNode *)calloc(1, msize); - if (pNode == NULL) { - // TODO: handle error - return NULL; - } - - pvma = (SVMemAllocator *)(pNode->data); - pvma->capacity = capacity; - pvma->type = type; - - switch (type) { - case E_V_ARENA_ALLOCATOR: - vArenaAllocatorInit(&(pvma->vaa), capacity, ssize, lsize); - break; - case E_V_HEAP_ALLOCATOR: - // vHeapAllocatorInit(&(pvma->vha)); - break; - default: - ASSERT(0); - } - - return pNode; -} - -static void vBufPoolFreeNode(SListNode *pNode) { - SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data); - - switch (pvma->type) { - case E_V_ARENA_ALLOCATOR: - vArenaAllocatorClear(&(pvma->vaa)); - break; - case E_V_HEAP_ALLOCATOR: - // vHeapAllocatorClear(&(pvma->vha)); - break; - default: - break; - } - - free(pNode); -} - -static void *vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size) { - void *ptr = NULL; - - if (pvma->type == E_V_ARENA_ALLOCATOR) { - SVArenaAllocator *pvaa = &(pvma->vaa); - - if (POINTER_DISTANCE(pvaa->inuse->ptr, pvaa->inuse->data) + size > pvaa->inuse->size) { - SVArenaNode *pNode = (SVArenaNode *)malloc(sizeof(*pNode) + MAX(size, pvaa->ssize)); - if (pNode == NULL) { - // TODO: handle error - return NULL; - } - - pNode->prev = pvaa->inuse; - pNode->size = MAX(size, pvaa->ssize); - pNode->ptr = pNode->data; - - pvaa->inuse = pNode; - } - - ptr = pvaa->inuse->ptr; - pvaa->inuse->ptr = POINTER_SHIFT(ptr, size); - } else if (pvma->type == E_V_HEAP_ALLOCATOR) { - /* TODO */ - } - - return ptr; -} - -static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) { +typedef struct { SVnode * pVnode; - SMemAllocator * pma; - SVMemAllocator *pvma; - SVMAWrapper * pvmaw; + SVMemAllocator *pVMA; +} SVMAWrapper; - pVnode = (SVnode *)(pmaf->impl); - pma = (SMemAllocator *)calloc(1, sizeof(*pma) + sizeof(SVMAWrapper)); - if (pma == NULL) { - // TODO: handle error +static FORCE_INLINE void *vmaMaloocCb(SMemAllocator *pMA, uint64_t size) { + SVMAWrapper *pWrapper = (SVMAWrapper *)(pMA->impl); + + return vmaMalloc(pWrapper->pVMA, size); +} + +// TODO: Add atomic operations here +static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF) { + SMemAllocator *pMA; + SVnode * pVnode = (SVnode *)(pMAF->impl); + SVMAWrapper * pWrapper; + + pMA = (SMemAllocator *)calloc(1, sizeof(*pMA) + sizeof(SVMAWrapper)); + if (pMA == NULL) { return NULL; } - pvmaw = (SVMAWrapper *)POINTER_SHIFT(pma, sizeof(*pma)); - // No allocator used currently - if (pVnode->pBufPool->inuse == NULL) { - while (listNEles(&(pVnode->pBufPool->free)) == 0) { - // TODO: wait until all released ro kill query - // tsem_wait(); - ASSERT(0); - } + pVnode->pBufPool->inuse->_ref.val++; + pWrapper = POINTER_SHIFT(pMA, sizeof(*pMA)); + pWrapper->pVnode = pVnode; + pWrapper->pVMA = pVnode->pBufPool->inuse; - pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free)); - pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); - T_REF_INIT_VAL(pvma, 1); - } else { - pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); - } + pMA->impl = pWrapper; + pMA->malloc = vmaMaloocCb; + pMA->calloc = NULL; + pMA->realloc = NULL; + pMA->free = NULL; + pMA->usage = NULL; - T_REF_INC(pvma); - - pvmaw->pVnode = pVnode; - pvmaw->pNode = pVnode->pBufPool->inuse; - - pma->impl = pvmaw; - pma->malloc = NULL; - pma->calloc = NULL; /* TODO */ - pma->realloc = NULL; /* TODO */ - pma->free = NULL; /* TODO */ - pma->usage = NULL; /* TODO */ - - return pma; + return pMA; } -static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { /* TODO */ - SVnode * pVnode = (SVnode *)(pmaf->impl); - SListNode * pNode = ((SVMAWrapper *)(pma->impl))->pNode; - SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data); +static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) { + SVMAWrapper * pWrapper = (SVMAWrapper *)(pMA->impl); + SVnode * pVnode = pWrapper->pVnode; + SVMemAllocator *pVMA = pWrapper->pVMA; - if (T_REF_DEC(pvma) == 0) { - if (pvma->type == E_V_ARENA_ALLOCATOR) { - SVArenaAllocator *pvaa = &(pvma->vaa); - while (pvaa->inuse != &(pvaa->node)) { - SVArenaNode *pNode = pvaa->inuse; - pvaa->inuse = pNode->prev; - /* code */ - } - - pvaa->inuse->ptr = pvaa->inuse->data; - } else if (pvma->type == E_V_HEAP_ALLOCATOR) { - } else { - ASSERT(0); - } - - // Move node from incycle to free - tdListAppendNode(&(pVnode->pBufPool->free), tdListPopNode(&(pVnode->pBufPool->incycle), pNode)); - // tsem_post(); todo: sem_post + free(pMA); + if (--pVMA->_ref.val == 0) { + tDListPop(&(pVnode->pBufPool->incycle), pVMA); + tDListAppend(&(pVnode->pBufPool->free), pVMA); } -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index 59e3bae5d7..70d9c7d4b0 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -94,7 +94,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open meta sprintf(dir, "%s/meta", pVnode->path); - pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg)); + pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pMeta == NULL) { // TODO: handle error return -1; @@ -102,7 +102,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open tsdb sprintf(dir, "%s/tsdb", pVnode->path); - pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg)); + pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTsdb == NULL) { // TODO: handle error return -1; @@ -110,7 +110,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // TODO: Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, NULL); + pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; @@ -131,7 +131,9 @@ static int vnodeOpenImpl(SVnode *pVnode) { static void vnodeCloseImpl(SVnode *pVnode) { if (pVnode) { vnodeCloseBufPool(pVnode); - tsdbClose(pVnode->pTsdb); metaClose(pVnode->pMeta); + tsdbClose(pVnode->pTsdb); + tqClose(pVnode->pTq); + walClose(pVnode->pWal); } } \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeRequest.c b/source/dnode/vnode/impl/src/vnodeRequest.c index be5f5c890c..249bde4e56 100644 --- a/source/dnode/vnode/impl/src/vnodeRequest.c +++ b/source/dnode/vnode/impl/src/vnodeRequest.c @@ -25,9 +25,10 @@ int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type) { switch (type) { case TSDB_MSG_TYPE_CREATE_TABLE: tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq)); + break; + case TSDB_MSG_TYPE_SUBMIT: /* code */ break; - default: break; } diff --git a/source/dnode/vnode/impl/test/vBenchmarkTest.cpp b/source/dnode/vnode/impl/test/vBenchmarkTest.cpp new file mode 100644 index 0000000000..e218886231 --- /dev/null +++ b/source/dnode/vnode/impl/test/vBenchmarkTest.cpp @@ -0,0 +1,2 @@ +// https://stackoverflow.com/questions/8565666/benchmarking-with-googletest +// https://github.com/google/benchmark \ No newline at end of file diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp index f0bca3aa2e..6f2e6f721a 100644 --- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp +++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp @@ -14,7 +14,7 @@ #include "vnode.h" -static STSchema *createBasicSchema() { +static STSchema *vtCreateBasicSchema() { STSchemaBuilder sb; STSchema * pSchema = NULL; @@ -32,7 +32,7 @@ static STSchema *createBasicSchema() { return pSchema; } -static STSchema *createBasicTagSchema() { +static STSchema *vtCreateBasicTagSchema() { STSchemaBuilder sb; STSchema * pSchema = NULL; @@ -50,7 +50,7 @@ static STSchema *createBasicTagSchema() { return pSchema; } -static SKVRow createBasicTag() { +static SKVRow vtCreateBasicTag() { SKVRowBuilder rb; SKVRow pTag; @@ -71,118 +71,203 @@ static SKVRow createBasicTag() { return pTag; } -#if 0 -TEST(vnodeApiTest, test_create_table_encode_and_decode_function) { - tb_uid_t suid = 1638166374163; - STSchema *pSchema = createBasicSchema(); - STSchema *pTagSchema = createBasicTagSchema(); - char tbname[128] = "st"; - char * buffer = new char[1024]; - void * pBuf = (void *)buffer; +static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) { + SRpcMsg * pMsg; + STSchema *pSchema; + STSchema *pTagSchema; + int zs; + void * pBuf; + + pSchema = vtCreateBasicSchema(); + pTagSchema = vtCreateBasicTagSchema(); + SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema); + zs = vnodeBuildReq(NULL, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs); + pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; + pMsg->contLen = zs; + pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg)); + + pBuf = pMsg->pCont; vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + META_CLEAR_TB_CFG(&vCreateSTbReq); - SVnodeReq decoded_req; + tdFreeSchema(pSchema); + tdFreeSchema(pTagSchema); - vnodeParseReq(buffer, &decoded_req, TSDB_MSG_TYPE_CREATE_TABLE); - - int k = 10; + *ppMsg = pMsg; } -#endif -TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) { +static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) { + SRpcMsg *pMsg; + int tz; + SKVRow pTag = vtCreateBasicTag(); + + SVnodeReq vCreateCTbReq = VNODE_INIT_CREATE_CTB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pTag); + + tz = vnodeBuildReq(NULL, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz); + pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; + pMsg->contLen = tz; + pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg)); + void *pBuf = pMsg->pCont; + + vnodeBuildReq(&pBuf, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE); + META_CLEAR_TB_CFG(&vCreateCTbReq); + free(pTag); + + *ppMsg = pMsg; +} + +static void vtBuildCreateNtbReq(char *tbname, SRpcMsg **ppMsg) { + // TODO +} + +static void vtBuildSubmitReq(SRpcMsg **ppMsg) { + SRpcMsg * pMsg; + SSubmitMsg *pSubmitMsg; + SSubmitBlk *pSubmitBlk; + int tz = 1024; // TODO + + pMsg = (SRpcMsg *)malloc(sizeof(*pMsg) + tz); + pMsg->msgType = TSDB_MSG_TYPE_SUBMIT; + pMsg->contLen = tz; + pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg)); + + // For submit msg header + pSubmitMsg = (SSubmitMsg *)(pMsg->pCont); + // pSubmitMsg->header.contLen = 0; + // pSubmitMsg->header.vgId = 0; + // pSubmitMsg->length = 0; + pSubmitMsg->numOfBlocks = 1; + + // For submit blk + pSubmitBlk = (SSubmitBlk *)(pSubmitMsg->blocks); + pSubmitBlk->uid = 0; + pSubmitBlk->tid = 0; + pSubmitBlk->padding = 0; + pSubmitBlk->sversion = 0; + pSubmitBlk->dataLen = 0; + pSubmitBlk->numOfRows = 0; + + // For row batch + + *ppMsg = pMsg; +} + +static void vtClearMsgBatch(SArray *pMsgArr) { + SRpcMsg *pMsg; + for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) { + pMsg = *(SRpcMsg **)taosArrayGet(pMsgArr, i); + free(pMsg); + } + + taosArrayClear(pMsgArr); +} + +TEST(vnodeApiTest, vnode_simple_create_table_test) { + tb_uid_t suid = 1638166374163; + SRpcMsg *pMsg; + SArray * pMsgArr = NULL; + SVnode * pVnode; + int rcode; + int ntables = 1000000; + int batch = 10; + char tbname[128]; + + pMsgArr = (SArray *)taosArrayInit(batch, sizeof(pMsg)); + vnodeDestroy("vnode1"); + GTEST_ASSERT_GE(vnodeInit(2), 0); + + // CREATE AND OPEN A VNODE + pVnode = vnodeOpen("vnode1", NULL); + ASSERT_NE(pVnode, nullptr); + + // CREATE A SUPER TABLE + sprintf(tbname, "st"); + vtBuildCreateStbReq(suid, tbname, &pMsg); + taosArrayPush(pMsgArr, &pMsg); + rcode = vnodeProcessWMsgs(pVnode, pMsgArr); + ASSERT_EQ(rcode, 0); + vtClearMsgBatch(pMsgArr); + + // CREATE A LOT OF CHILD TABLES + for (int i = 0; i < ntables / batch; i++) { + // Build request batch + for (int j = 0; j < batch; j++) { + sprintf(tbname, "ct%d", i * batch + j + 1); + vtBuildCreateCtbReq(suid, tbname, &pMsg); + taosArrayPush(pMsgArr, &pMsg); + } + + // Process request batch + rcode = vnodeProcessWMsgs(pVnode, pMsgArr); + ASSERT_EQ(rcode, 0); + + // Clear request batch + vtClearMsgBatch(pMsgArr); + } + + // CLOSE THE VNODE + vnodeClose(pVnode); + vnodeClear(); + + taosArrayDestroy(pMsgArr); +} + +TEST(vnodeApiTest, vnode_simple_insert_test) { + const char *vname = "vnode2"; + char tbname[128]; + tb_uid_t suid = 1638166374163; + SRpcMsg * pMsg; + SArray * pMsgArr; + int rcode; + SVnode * pVnode; + int batch = 1; + int loop = 1000000; + + pMsgArr = (SArray *)taosArrayInit(0, sizeof(pMsg)); + + vnodeDestroy(vname); GTEST_ASSERT_GE(vnodeInit(2), 0); - // Create and open a vnode - SVnode *pVnode = vnodeOpen("vnode1", NULL); - ASSERT_NE(pVnode, nullptr); + // Open a vnode + pVnode = vnodeOpen(vname, NULL); + GTEST_ASSERT_NE(pVnode, nullptr); - tb_uid_t suid = 1638166374163; - { - // Create a super table - STSchema *pSchema = createBasicSchema(); - STSchema *pTagSchema = createBasicTagSchema(); - char tbname[128] = "st"; + // 1. CREATE A SUPER TABLE + sprintf(tbname, "st"); + vtBuildCreateStbReq(suid, tbname, &pMsg); + taosArrayPush(pMsgArr, &pMsg); + rcode = vnodeProcessWMsgs(pVnode, pMsgArr); + GTEST_ASSERT_EQ(rcode, 0); + vtClearMsgBatch(pMsgArr); - SArray * pMsgs = (SArray *)taosArrayInit(1, sizeof(SRpcMsg *)); - SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema); + // 2. CREATE A CHILD TABLE + sprintf(tbname, "t0"); + vtBuildCreateCtbReq(suid, tbname, &pMsg); + taosArrayPush(pMsgArr, &pMsg); + rcode = vnodeProcessWMsgs(pVnode, pMsgArr); + GTEST_ASSERT_EQ(rcode, 0); + vtClearMsgBatch(pMsgArr); - int zs = vnodeBuildReq(NULL, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); - SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs); - pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; - pMsg->contLen = zs; - pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg)); - - void *pBuf = pMsg->pCont; - - vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); - META_CLEAR_TB_CFG(&vCreateSTbReq); - - taosArrayPush(pMsgs, &(pMsg)); - - vnodeProcessWMsgs(pVnode, pMsgs); - - free(pMsg); - taosArrayDestroy(pMsgs); - tdFreeSchema(pSchema); - tdFreeSchema(pTagSchema); - } - - { - // Create some child tables - int ntables = 1000000; - int batch = 10; - for (int i = 0; i < ntables / batch; i++) { - SArray *pMsgs = (SArray *)taosArrayInit(batch, sizeof(SRpcMsg *)); - for (int j = 0; j < batch; j++) { - SKVRow pTag = createBasicTag(); - char tbname[128]; - sprintf(tbname, "tb%d", i * batch + j); - SVnodeReq vCreateCTbReq = VNODE_INIT_CREATE_CTB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pTag); - - int tz = vnodeBuildReq(NULL, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE); - SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz); - pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; - pMsg->contLen = tz; - pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg)); - void *pBuf = pMsg->pCont; - - vnodeBuildReq(&pBuf, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE); - META_CLEAR_TB_CFG(&vCreateCTbReq); - free(pTag); - - taosArrayPush(pMsgs, &(pMsg)); - } - - vnodeProcessWMsgs(pVnode, pMsgs); - - for (int j = 0; j < batch; j++) { - SRpcMsg *pMsg = *(SRpcMsg **)taosArrayPop(pMsgs); - free(pMsg); - } - - taosArrayDestroy(pMsgs); - - // std::cout << "the " << i << "th batch is created" << std::endl; + // 3. WRITE A LOT OF TIME-SERIES DATA + for (int j = 0; j < loop; j++) { + for (int i = 0; i < batch; i++) { + vtBuildSubmitReq(&pMsg); + taosArrayPush(pMsgArr, &pMsg); } + rcode = vnodeProcessWMsgs(pVnode, pMsgArr); + GTEST_ASSERT_EQ(rcode, 0); + vtClearMsgBatch(pMsgArr); } // Close the vnode vnodeClose(pVnode); - vnodeClear(); -} -TEST(vnodeApiTest, DISABLED_vnode_process_create_table) { - STSchema * pSchema = NULL; - STSchema * pTagSchema = NULL; - char stname[15]; - SVCreateTableReq pReq = META_INIT_STB_CFG(stname, UINT32_MAX, UINT32_MAX, 0, pSchema, pTagSchema); - - int k = 10; - - META_CLEAR_TB_CFG(pReq); -} + taosArrayDestroy(pMsgArr); +} \ No newline at end of file diff --git a/source/dnode/vnode/meta/inc/metaDef.h b/source/dnode/vnode/meta/inc/metaDef.h index 0204031e76..e1c15af5aa 100644 --- a/source/dnode/vnode/meta/inc/metaDef.h +++ b/source/dnode/vnode/meta/inc/metaDef.h @@ -34,7 +34,7 @@ extern "C" { struct SMeta { char* path; SMetaCfg options; - SMetaDB* pDB; + SMetaDB* pDB; SMetaIdx* pIdx; SMetaCache* pCache; STbUidGenerator uidGnrt; diff --git a/source/dnode/vnode/meta/src/metaMain.c b/source/dnode/vnode/meta/src/metaMain.c index 53055dcea3..b6aa029b19 100644 --- a/source/dnode/vnode/meta/src/metaMain.c +++ b/source/dnode/vnode/meta/src/metaMain.c @@ -17,27 +17,27 @@ #include "metaDef.h" -static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions); +static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); static void metaFree(SMeta *pMeta); static int metaOpenImpl(SMeta *pMeta); static void metaCloseImpl(SMeta *pMeta); -SMeta *metaOpen(const char *path, const SMetaCfg *pMetaOptions) { +SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { SMeta *pMeta = NULL; // Set default options - if (pMetaOptions == NULL) { - pMetaOptions = &defaultMetaOptions; + if (pMetaCfg == NULL) { + pMetaCfg = &defaultMetaOptions; } // Validate the options - if (metaValidateOptions(pMetaOptions) < 0) { + if (metaValidateOptions(pMetaCfg) < 0) { // TODO: deal with error return NULL; } // Allocate handle - pMeta = metaNew(path, pMetaOptions); + pMeta = metaNew(path, pMetaCfg, pMAF); if (pMeta == NULL) { // TODO: handle error return NULL; @@ -65,7 +65,7 @@ void metaClose(SMeta *pMeta) { void metaRemove(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) { +static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { SMeta *pMeta; size_t psize = strlen(path); @@ -80,7 +80,8 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) { return NULL; } - metaOptionsCopy(&(pMeta->options), pMetaOptions); + metaOptionsCopy(&(pMeta->options), pMetaCfg); + pMeta->pmaf = pMAF; return pMeta; }; diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 2c07529219..b88ef353b0 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -63,7 +63,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA return pTq; } -static int tqProtoCheck(TmqMsgHead* pMsg) { return pMsg->protoVer == 0; } +void tqClose(STQ*pTq) { + // TODO +} + +static int tqProtoCheck(TmqMsgHead *pMsg) { + return pMsg->protoVer == 0; +} static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) { // clean old item and move forward diff --git a/source/dnode/vnode/tsdb/src/tsdbMain.c b/source/dnode/vnode/tsdb/src/tsdbMain.c index 2fe7a61930..d67d45660d 100644 --- a/source/dnode/vnode/tsdb/src/tsdbMain.c +++ b/source/dnode/vnode/tsdb/src/tsdbMain.c @@ -15,27 +15,27 @@ #include "tsdbDef.h" -static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions); +static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); static void tsdbFree(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb); -STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbOptions) { +STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { STsdb *pTsdb = NULL; // Set default TSDB Options - if (pTsdbOptions == NULL) { - pTsdbOptions = &defautlTsdbOptions; + if (pTsdbCfg == NULL) { + pTsdbCfg = &defautlTsdbOptions; } // Validate the options - if (tsdbValidateOptions(pTsdbOptions) < 0) { + if (tsdbValidateOptions(pTsdbCfg) < 0) { // TODO: handle error return NULL; } // Create the handle - pTsdb = tsdbNew(path, pTsdbOptions); + pTsdb = tsdbNew(path, pTsdbCfg, pMAF); if (pTsdb == NULL) { // TODO: handle error return NULL; @@ -62,7 +62,7 @@ void tsdbClose(STsdb *pTsdb) { void tsdbRemove(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) { +static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { STsdb *pTsdb = NULL; pTsdb = (STsdb *)calloc(1, sizeof(STsdb)); @@ -72,7 +72,8 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) { } pTsdb->path = strdup(path); - tsdbOptionsCopy(&(pTsdb->options), pTsdbOptions); + tsdbOptionsCopy(&(pTsdb->options), pTsdbCfg); + pTsdb->pmaf = pMAF; return pTsdb; }