Merge pull request #9161 from taosdata/feature/vnode

Feature/vnode
This commit is contained in:
Shengliang Guan 2021-12-17 13:53:53 +08:00 committed by GitHub
commit dbafa13b55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 354 additions and 306 deletions

View File

@ -16,6 +16,7 @@
#ifndef _TD_META_H_ #ifndef _TD_META_H_
#define _TD_META_H_ #define _TD_META_H_
#include "mallocator.h"
#include "os.h" #include "os.h"
#include "trow.h" #include "trow.h"
@ -71,7 +72,7 @@ typedef struct STbCfg {
} STbCfg; } STbCfg;
// SMeta operations // SMeta operations
SMeta *metaOpen(const char *path, const SMetaCfg *pOptions); SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
void metaClose(SMeta *pMeta); void metaClose(SMeta *pMeta);
void metaRemove(const char *path); void metaRemove(const char *path);
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
@ -79,8 +80,8 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta); int metaCommit(SMeta *pMeta);
// Options // Options
void metaOptionsInit(SMetaCfg *pOptions); void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pOptions); void metaOptionsClear(SMetaCfg *pMetaCfg);
// STbCfg // STbCfg
#define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \ #define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \

View File

@ -16,6 +16,8 @@
#ifndef _TD_TSDB_H_ #ifndef _TD_TSDB_H_
#define _TD_TSDB_H_ #define _TD_TSDB_H_
#include "mallocator.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -25,7 +27,7 @@ typedef struct STsdb STsdb;
typedef struct STsdbCfg STsdbCfg; typedef struct STsdbCfg STsdbCfg;
// STsdb // STsdb
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg); STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
void tsdbClose(STsdb *); void tsdbClose(STsdb *);
void tsdbRemove(const char *path); void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg); int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg);

42
include/util/tmacro.h Normal file
View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MACRO_H_
#define _TD_UTIL_MACRO_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
// Module init/clear MACRO definitions
#define TD_MOD_UNINITIALIZED 0
#define TD_MOD_INITIALIZED 1
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
typedef int8_t td_mode_flag_t;
#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#ifdef __cplusplus
}
#endif
#endif /*_TD_UTIL_MACRO_H_*/

View File

@ -19,5 +19,5 @@ target_link_libraries(
# test # test
if(${BUILD_TEST}) if(${BUILD_TEST})
#add_subdirectory(test) add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})

View File

@ -32,6 +32,8 @@ int vnodeBufPoolRecycle(SVnode *pVnode);
void *vnodeMalloc(SVnode *pVnode, uint64_t size); void *vnodeMalloc(SVnode *pVnode, uint64_t size);
bool vnodeBufPoolIsFull(SVnode *pVnode); bool vnodeBufPoolIsFull(SVnode *pVnode);
SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -0,0 +1,32 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_MAF_H_
#define _TD_VNODE_MAF_H_
#include "vnode.h"
#ifdef __cplusplus
extern "C" {
#endif
int vnodeOpenMAF(SVnode *pVnode);
void vnodeCloseMAF(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_MAF_H_*/

View File

@ -24,10 +24,13 @@ struct SVBufPool {
TD_DLIST(SVMemAllocator) free; TD_DLIST(SVMemAllocator) free;
TD_DLIST(SVMemAllocator) incycle; TD_DLIST(SVMemAllocator) incycle;
SVMemAllocator *inuse; SVMemAllocator *inuse;
// MAF for submodules // MAF for submodules to use
// SMemAllocatorFactory maf; SMemAllocatorFactory *pMAF;
}; };
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA);
int vnodeOpenBufPool(SVnode *pVnode) { int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity; uint64_t capacity;
@ -54,6 +57,15 @@ int vnodeOpenBufPool(SVnode *pVnode) {
tDListAppend(&(pVnode->pBufPool->free), pVMA); tDListAppend(&(pVnode->pBufPool->free), pVMA);
} }
pVnode->pBufPool->pMAF = (SMemAllocatorFactory *)malloc(sizeof(SMemAllocatorFactory));
if (pVnode->pBufPool->pMAF == NULL) {
// TODO: handle error
return -1;
}
pVnode->pBufPool->pMAF->impl = pVnode;
pVnode->pBufPool->pMAF->create = vBufPoolCreateMA;
pVnode->pBufPool->pMAF->destroy = vBufPoolDestroyMA;
return 0; return 0;
} }
@ -125,195 +137,54 @@ bool vnodeBufPoolIsFull(SVnode *pVnode) {
return vmaIsFull(pVnode->pBufPool->inuse); return vmaIsFull(pVnode->pBufPool->inuse);
} }
#if 0 SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode) { return pVnode->pBufPool->pMAF; }
typedef enum {
// Heap allocator
E_V_HEAP_ALLOCATOR = 0,
// Arena allocator
E_V_ARENA_ALLOCATOR
} EVMemAllocatorT;
typedef struct {
/* TODO */
} SVHeapAllocator;
typedef struct SVArenaNode {
struct SVArenaNode *prev;
uint64_t size;
void * ptr;
char data[];
} SVArenaNode;
typedef struct {
uint64_t ssize; // step size
uint64_t lsize; // limit size
SVArenaNode *inuse;
SVArenaNode node;
} SVArenaAllocator;
typedef struct {
SVnode * pVnode;
SListNode *pNode;
} SVMAWrapper;
static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode);
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma);
static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size);
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type) { typedef struct {
SListNode * pNode;
SVMemAllocator *pvma;
uint64_t msize;
uint64_t ssize = 4096; // TODO
uint64_t lsize = 1024; // TODO
msize = sizeof(SListNode) + sizeof(SVMemAllocator);
if (type == E_V_ARENA_ALLOCATOR) {
msize += capacity;
}
pNode = (SListNode *)calloc(1, msize);
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
pvma = (SVMemAllocator *)(pNode->data);
pvma->capacity = capacity;
pvma->type = type;
switch (type) {
case E_V_ARENA_ALLOCATOR:
vArenaAllocatorInit(&(pvma->vaa), capacity, ssize, lsize);
break;
case E_V_HEAP_ALLOCATOR:
// vHeapAllocatorInit(&(pvma->vha));
break;
default:
ASSERT(0);
}
return pNode;
}
static void vBufPoolFreeNode(SListNode *pNode) {
SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data);
switch (pvma->type) {
case E_V_ARENA_ALLOCATOR:
vArenaAllocatorClear(&(pvma->vaa));
break;
case E_V_HEAP_ALLOCATOR:
// vHeapAllocatorClear(&(pvma->vha));
break;
default:
break;
}
free(pNode);
}
static void *vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size) {
void *ptr = NULL;
if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa);
if (POINTER_DISTANCE(pvaa->inuse->ptr, pvaa->inuse->data) + size > pvaa->inuse->size) {
SVArenaNode *pNode = (SVArenaNode *)malloc(sizeof(*pNode) + MAX(size, pvaa->ssize));
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
pNode->prev = pvaa->inuse;
pNode->size = MAX(size, pvaa->ssize);
pNode->ptr = pNode->data;
pvaa->inuse = pNode;
}
ptr = pvaa->inuse->ptr;
pvaa->inuse->ptr = POINTER_SHIFT(ptr, size);
} else if (pvma->type == E_V_HEAP_ALLOCATOR) {
/* TODO */
}
return ptr;
}
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) {
SVnode * pVnode; SVnode * pVnode;
SMemAllocator * pma; SVMemAllocator *pVMA;
SVMemAllocator *pvma; } SVMAWrapper;
SVMAWrapper * pvmaw;
pVnode = (SVnode *)(pmaf->impl); static FORCE_INLINE void *vmaMaloocCb(SMemAllocator *pMA, uint64_t size) {
pma = (SMemAllocator *)calloc(1, sizeof(*pma) + sizeof(SVMAWrapper)); SVMAWrapper *pWrapper = (SVMAWrapper *)(pMA->impl);
if (pma == NULL) {
// TODO: handle error return vmaMalloc(pWrapper->pVMA, size);
}
// TODO: Add atomic operations here
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF) {
SMemAllocator *pMA;
SVnode * pVnode = (SVnode *)(pMAF->impl);
SVMAWrapper * pWrapper;
pMA = (SMemAllocator *)calloc(1, sizeof(*pMA) + sizeof(SVMAWrapper));
if (pMA == NULL) {
return NULL; return NULL;
} }
pvmaw = (SVMAWrapper *)POINTER_SHIFT(pma, sizeof(*pma));
// No allocator used currently pVnode->pBufPool->inuse->_ref.val++;
if (pVnode->pBufPool->inuse == NULL) { pWrapper = POINTER_SHIFT(pMA, sizeof(*pMA));
while (listNEles(&(pVnode->pBufPool->free)) == 0) { pWrapper->pVnode = pVnode;
// TODO: wait until all released ro kill query pWrapper->pVMA = pVnode->pBufPool->inuse;
// tsem_wait();
ASSERT(0); pMA->impl = pWrapper;
pMA->malloc = vmaMaloocCb;
pMA->calloc = NULL;
pMA->realloc = NULL;
pMA->free = NULL;
pMA->usage = NULL;
return pMA;
} }
pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free)); static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); SVMAWrapper * pWrapper = (SVMAWrapper *)(pMA->impl);
T_REF_INIT_VAL(pvma, 1); SVnode * pVnode = pWrapper->pVnode;
} else { SVMemAllocator *pVMA = pWrapper->pVMA;
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
}
T_REF_INC(pvma); free(pMA);
if (--pVMA->_ref.val == 0) {
pvmaw->pVnode = pVnode; tDListPop(&(pVnode->pBufPool->incycle), pVMA);
pvmaw->pNode = pVnode->pBufPool->inuse; tDListAppend(&(pVnode->pBufPool->free), pVMA);
pma->impl = pvmaw;
pma->malloc = NULL;
pma->calloc = NULL; /* TODO */
pma->realloc = NULL; /* TODO */
pma->free = NULL; /* TODO */
pma->usage = NULL; /* TODO */
return pma;
}
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { /* TODO */
SVnode * pVnode = (SVnode *)(pmaf->impl);
SListNode * pNode = ((SVMAWrapper *)(pma->impl))->pNode;
SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data);
if (T_REF_DEC(pvma) == 0) {
if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa);
while (pvaa->inuse != &(pvaa->node)) {
SVArenaNode *pNode = pvaa->inuse;
pvaa->inuse = pNode->prev;
/* code */
}
pvaa->inuse->ptr = pvaa->inuse->data;
} else if (pvma->type == E_V_HEAP_ALLOCATOR) {
} else {
ASSERT(0);
}
// Move node from incycle to free
tdListAppendNode(&(pVnode->pBufPool->free), tdListPopNode(&(pVnode->pBufPool->incycle), pNode));
// tsem_post(); todo: sem_post
} }
} }
#endif

View File

@ -94,7 +94,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open meta // Open meta
sprintf(dir, "%s/meta", pVnode->path); sprintf(dir, "%s/meta", pVnode->path);
pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg)); pVnode->pMeta = metaOpen(dir, &(pVnode->config.metaCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pMeta == NULL) { if (pVnode->pMeta == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
@ -102,7 +102,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb // Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path); sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg)); pVnode->pTsdb = tsdbOpen(dir, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTsdb == NULL) { if (pVnode->pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
@ -110,7 +110,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// TODO: Open TQ // TODO: Open TQ
sprintf(dir, "%s/tq", pVnode->path); sprintf(dir, "%s/tq", pVnode->path);
pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, NULL); pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode));
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
// TODO: handle error // TODO: handle error
return -1; return -1;
@ -131,7 +131,9 @@ static int vnodeOpenImpl(SVnode *pVnode) {
static void vnodeCloseImpl(SVnode *pVnode) { static void vnodeCloseImpl(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
tsdbClose(pVnode->pTsdb);
metaClose(pVnode->pMeta); metaClose(pVnode->pMeta);
tsdbClose(pVnode->pTsdb);
tqClose(pVnode->pTq);
walClose(pVnode->pWal);
} }
} }

View File

@ -25,9 +25,10 @@ int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type) {
switch (type) { switch (type) {
case TSDB_MSG_TYPE_CREATE_TABLE: case TSDB_MSG_TYPE_CREATE_TABLE:
tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq)); tsize += vnodeBuildCreateTableReq(buf, &(pReq->ctReq));
break;
case TSDB_MSG_TYPE_SUBMIT:
/* code */ /* code */
break; break;
default: default:
break; break;
} }

View File

@ -0,0 +1,2 @@
// https://stackoverflow.com/questions/8565666/benchmarking-with-googletest
// https://github.com/google/benchmark

View File

@ -14,7 +14,7 @@
#include "vnode.h" #include "vnode.h"
static STSchema *createBasicSchema() { static STSchema *vtCreateBasicSchema() {
STSchemaBuilder sb; STSchemaBuilder sb;
STSchema * pSchema = NULL; STSchema * pSchema = NULL;
@ -32,7 +32,7 @@ static STSchema *createBasicSchema() {
return pSchema; return pSchema;
} }
static STSchema *createBasicTagSchema() { static STSchema *vtCreateBasicTagSchema() {
STSchemaBuilder sb; STSchemaBuilder sb;
STSchema * pSchema = NULL; STSchema * pSchema = NULL;
@ -50,7 +50,7 @@ static STSchema *createBasicTagSchema() {
return pSchema; return pSchema;
} }
static SKVRow createBasicTag() { static SKVRow vtCreateBasicTag() {
SKVRowBuilder rb; SKVRowBuilder rb;
SKVRow pTag; SKVRow pTag;
@ -71,80 +71,43 @@ static SKVRow createBasicTag() {
return pTag; return pTag;
} }
#if 0 static void vtBuildCreateStbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
TEST(vnodeApiTest, test_create_table_encode_and_decode_function) { SRpcMsg * pMsg;
tb_uid_t suid = 1638166374163; STSchema *pSchema;
STSchema *pSchema = createBasicSchema(); STSchema *pTagSchema;
STSchema *pTagSchema = createBasicTagSchema(); int zs;
char tbname[128] = "st"; void * pBuf;
char * buffer = new char[1024];
void * pBuf = (void *)buffer; pSchema = vtCreateBasicSchema();
pTagSchema = vtCreateBasicTagSchema();
SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema); SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); zs = vnodeBuildReq(NULL, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
SVnodeReq decoded_req;
vnodeParseReq(buffer, &decoded_req, TSDB_MSG_TYPE_CREATE_TABLE);
int k = 10;
}
#endif
TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
vnodeDestroy("vnode1");
GTEST_ASSERT_GE(vnodeInit(2), 0);
// Create and open a vnode
SVnode *pVnode = vnodeOpen("vnode1", NULL);
ASSERT_NE(pVnode, nullptr);
tb_uid_t suid = 1638166374163;
{
// Create a super table
STSchema *pSchema = createBasicSchema();
STSchema *pTagSchema = createBasicTagSchema();
char tbname[128] = "st";
SArray * pMsgs = (SArray *)taosArrayInit(1, sizeof(SRpcMsg *));
SVnodeReq vCreateSTbReq = VNODE_INIT_CREATE_STB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pSchema, pTagSchema);
int zs = vnodeBuildReq(NULL, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + zs);
pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE;
pMsg->contLen = zs; pMsg->contLen = zs;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg)); pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(SRpcMsg));
void *pBuf = pMsg->pCont; pBuf = pMsg->pCont;
vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE); vnodeBuildReq(&pBuf, &vCreateSTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
META_CLEAR_TB_CFG(&vCreateSTbReq); META_CLEAR_TB_CFG(&vCreateSTbReq);
taosArrayPush(pMsgs, &(pMsg));
vnodeProcessWMsgs(pVnode, pMsgs);
free(pMsg);
taosArrayDestroy(pMsgs);
tdFreeSchema(pSchema); tdFreeSchema(pSchema);
tdFreeSchema(pTagSchema); tdFreeSchema(pTagSchema);
*ppMsg = pMsg;
} }
{ static void vtBuildCreateCtbReq(tb_uid_t suid, char *tbname, SRpcMsg **ppMsg) {
// Create some child tables SRpcMsg *pMsg;
int ntables = 1000000; int tz;
int batch = 10; SKVRow pTag = vtCreateBasicTag();
for (int i = 0; i < ntables / batch; i++) {
SArray *pMsgs = (SArray *)taosArrayInit(batch, sizeof(SRpcMsg *));
for (int j = 0; j < batch; j++) {
SKVRow pTag = createBasicTag();
char tbname[128];
sprintf(tbname, "tb%d", i * batch + j);
SVnodeReq vCreateCTbReq = VNODE_INIT_CREATE_CTB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pTag); SVnodeReq vCreateCTbReq = VNODE_INIT_CREATE_CTB_REQ(tbname, UINT32_MAX, UINT32_MAX, suid, pTag);
int tz = vnodeBuildReq(NULL, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE); tz = vnodeBuildReq(NULL, &vCreateCTbReq, TSDB_MSG_TYPE_CREATE_TABLE);
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz); pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg) + tz);
pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE; pMsg->msgType = TSDB_MSG_TYPE_CREATE_TABLE;
pMsg->contLen = tz; pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg)); pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
@ -154,35 +117,157 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
META_CLEAR_TB_CFG(&vCreateCTbReq); META_CLEAR_TB_CFG(&vCreateCTbReq);
free(pTag); free(pTag);
taosArrayPush(pMsgs, &(pMsg)); *ppMsg = pMsg;
} }
vnodeProcessWMsgs(pVnode, pMsgs); static void vtBuildCreateNtbReq(char *tbname, SRpcMsg **ppMsg) {
// TODO
}
for (int j = 0; j < batch; j++) { static void vtBuildSubmitReq(SRpcMsg **ppMsg) {
SRpcMsg *pMsg = *(SRpcMsg **)taosArrayPop(pMsgs); SRpcMsg * pMsg;
SSubmitMsg *pSubmitMsg;
SSubmitBlk *pSubmitBlk;
int tz = 1024; // TODO
pMsg = (SRpcMsg *)malloc(sizeof(*pMsg) + tz);
pMsg->msgType = TSDB_MSG_TYPE_SUBMIT;
pMsg->contLen = tz;
pMsg->pCont = POINTER_SHIFT(pMsg, sizeof(*pMsg));
// For submit msg header
pSubmitMsg = (SSubmitMsg *)(pMsg->pCont);
// pSubmitMsg->header.contLen = 0;
// pSubmitMsg->header.vgId = 0;
// pSubmitMsg->length = 0;
pSubmitMsg->numOfBlocks = 1;
// For submit blk
pSubmitBlk = (SSubmitBlk *)(pSubmitMsg->blocks);
pSubmitBlk->uid = 0;
pSubmitBlk->tid = 0;
pSubmitBlk->padding = 0;
pSubmitBlk->sversion = 0;
pSubmitBlk->dataLen = 0;
pSubmitBlk->numOfRows = 0;
// For row batch
*ppMsg = pMsg;
}
static void vtClearMsgBatch(SArray *pMsgArr) {
SRpcMsg *pMsg;
for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgArr, i);
free(pMsg); free(pMsg);
} }
taosArrayDestroy(pMsgs); taosArrayClear(pMsgArr);
// std::cout << "the " << i << "th batch is created" << std::endl;
} }
TEST(vnodeApiTest, vnode_simple_create_table_test) {
tb_uid_t suid = 1638166374163;
SRpcMsg *pMsg;
SArray * pMsgArr = NULL;
SVnode * pVnode;
int rcode;
int ntables = 1000000;
int batch = 10;
char tbname[128];
pMsgArr = (SArray *)taosArrayInit(batch, sizeof(pMsg));
vnodeDestroy("vnode1");
GTEST_ASSERT_GE(vnodeInit(2), 0);
// CREATE AND OPEN A VNODE
pVnode = vnodeOpen("vnode1", NULL);
ASSERT_NE(pVnode, nullptr);
// CREATE A SUPER TABLE
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
ASSERT_EQ(rcode, 0);
vtClearMsgBatch(pMsgArr);
// CREATE A LOT OF CHILD TABLES
for (int i = 0; i < ntables / batch; i++) {
// Build request batch
for (int j = 0; j < batch; j++) {
sprintf(tbname, "ct%d", i * batch + j + 1);
vtBuildCreateCtbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
}
// Process request batch
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
ASSERT_EQ(rcode, 0);
// Clear request batch
vtClearMsgBatch(pMsgArr);
}
// CLOSE THE VNODE
vnodeClose(pVnode);
vnodeClear();
taosArrayDestroy(pMsgArr);
}
TEST(vnodeApiTest, vnode_simple_insert_test) {
const char *vname = "vnode2";
char tbname[128];
tb_uid_t suid = 1638166374163;
SRpcMsg * pMsg;
SArray * pMsgArr;
int rcode;
SVnode * pVnode;
int batch = 1;
int loop = 1000000;
pMsgArr = (SArray *)taosArrayInit(0, sizeof(pMsg));
vnodeDestroy(vname);
GTEST_ASSERT_GE(vnodeInit(2), 0);
// Open a vnode
pVnode = vnodeOpen(vname, NULL);
GTEST_ASSERT_NE(pVnode, nullptr);
// 1. CREATE A SUPER TABLE
sprintf(tbname, "st");
vtBuildCreateStbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
vtClearMsgBatch(pMsgArr);
// 2. CREATE A CHILD TABLE
sprintf(tbname, "t0");
vtBuildCreateCtbReq(suid, tbname, &pMsg);
taosArrayPush(pMsgArr, &pMsg);
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
vtClearMsgBatch(pMsgArr);
// 3. WRITE A LOT OF TIME-SERIES DATA
for (int j = 0; j < loop; j++) {
for (int i = 0; i < batch; i++) {
vtBuildSubmitReq(&pMsg);
taosArrayPush(pMsgArr, &pMsg);
}
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
GTEST_ASSERT_EQ(rcode, 0);
vtClearMsgBatch(pMsgArr);
} }
// Close the vnode // Close the vnode
vnodeClose(pVnode); vnodeClose(pVnode);
vnodeClear(); vnodeClear();
}
taosArrayDestroy(pMsgArr);
TEST(vnodeApiTest, DISABLED_vnode_process_create_table) {
STSchema * pSchema = NULL;
STSchema * pTagSchema = NULL;
char stname[15];
SVCreateTableReq pReq = META_INIT_STB_CFG(stname, UINT32_MAX, UINT32_MAX, 0, pSchema, pTagSchema);
int k = 10;
META_CLEAR_TB_CFG(pReq);
} }

View File

@ -17,27 +17,27 @@
#include "metaDef.h" #include "metaDef.h"
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions); static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
static void metaFree(SMeta *pMeta); static void metaFree(SMeta *pMeta);
static int metaOpenImpl(SMeta *pMeta); static int metaOpenImpl(SMeta *pMeta);
static void metaCloseImpl(SMeta *pMeta); static void metaCloseImpl(SMeta *pMeta);
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaOptions) { SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) {
SMeta *pMeta = NULL; SMeta *pMeta = NULL;
// Set default options // Set default options
if (pMetaOptions == NULL) { if (pMetaCfg == NULL) {
pMetaOptions = &defaultMetaOptions; pMetaCfg = &defaultMetaOptions;
} }
// Validate the options // Validate the options
if (metaValidateOptions(pMetaOptions) < 0) { if (metaValidateOptions(pMetaCfg) < 0) {
// TODO: deal with error // TODO: deal with error
return NULL; return NULL;
} }
// Allocate handle // Allocate handle
pMeta = metaNew(path, pMetaOptions); pMeta = metaNew(path, pMetaCfg, pMAF);
if (pMeta == NULL) { if (pMeta == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
@ -65,7 +65,7 @@ void metaClose(SMeta *pMeta) {
void metaRemove(const char *path) { taosRemoveDir(path); } void metaRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) { static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) {
SMeta *pMeta; SMeta *pMeta;
size_t psize = strlen(path); size_t psize = strlen(path);
@ -80,7 +80,8 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) {
return NULL; return NULL;
} }
metaOptionsCopy(&(pMeta->options), pMetaOptions); metaOptionsCopy(&(pMeta->options), pMetaCfg);
pMeta->pmaf = pMAF;
return pMeta; return pMeta;
}; };

View File

@ -63,7 +63,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
return pTq; return pTq;
} }
static int tqProtoCheck(TmqMsgHead* pMsg) { return pMsg->protoVer == 0; } void tqClose(STQ*pTq) {
// TODO
}
static int tqProtoCheck(TmqMsgHead *pMsg) {
return pMsg->protoVer == 0;
}
static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) { static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) {
// clean old item and move forward // clean old item and move forward

View File

@ -15,27 +15,27 @@
#include "tsdbDef.h" #include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions); static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
static void tsdbFree(STsdb *pTsdb); static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbOptions) { STsdb *tsdbOpen(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
// Set default TSDB Options // Set default TSDB Options
if (pTsdbOptions == NULL) { if (pTsdbCfg == NULL) {
pTsdbOptions = &defautlTsdbOptions; pTsdbCfg = &defautlTsdbOptions;
} }
// Validate the options // Validate the options
if (tsdbValidateOptions(pTsdbOptions) < 0) { if (tsdbValidateOptions(pTsdbCfg) < 0) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
} }
// Create the handle // Create the handle
pTsdb = tsdbNew(path, pTsdbOptions); pTsdb = tsdbNew(path, pTsdbCfg, pMAF);
if (pTsdb == NULL) { if (pTsdb == NULL) {
// TODO: handle error // TODO: handle error
return NULL; return NULL;
@ -62,7 +62,7 @@ void tsdbClose(STsdb *pTsdb) {
void tsdbRemove(const char *path) { taosRemoveDir(path); } void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) { static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
STsdb *pTsdb = NULL; STsdb *pTsdb = NULL;
pTsdb = (STsdb *)calloc(1, sizeof(STsdb)); pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
@ -72,7 +72,8 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) {
} }
pTsdb->path = strdup(path); pTsdb->path = strdup(path);
tsdbOptionsCopy(&(pTsdb->options), pTsdbOptions); tsdbOptionsCopy(&(pTsdb->options), pTsdbCfg);
pTsdb->pmaf = pMAF;
return pTsdb; return pTsdb;
} }