diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index e92205378a..e486da5419 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -29,6 +29,7 @@ typedef struct STsdbMemAllocator STsdbMemAllocator; STsdb *tsdbOpen(const char *path, const STsdbOptions *); void tsdbClose(STsdb *); void tsdbRemove(const char *path); +int tsdbInsertData(STsdb *pTsdb, void *); // STsdbOptions int tsdbOptionsInit(STsdbOptions *); diff --git a/include/util/mallocator.h b/include/util/mallocator.h index fd66811f38..ffe242017e 100644 --- a/include/util/mallocator.h +++ b/include/util/mallocator.h @@ -22,10 +22,10 @@ extern "C" { #endif -typedef struct SMemAllocator SMemAllocator; +typedef struct SMemAllocator SMemAllocator; +typedef struct SMemAllocatorFactory SMemAllocatorFactory; struct SMemAllocator { - char name[16]; void *impl; void *(*malloc)(SMemAllocator *, uint64_t size); void *(*calloc)(SMemAllocator *, uint64_t nmemb, uint64_t size); @@ -34,11 +34,11 @@ struct SMemAllocator { uint64_t (*usage)(SMemAllocator *); }; -typedef struct { +struct SMemAllocatorFactory { void *impl; - SMemAllocator *(*create)(); - void (*destroy)(SMemAllocator *); -} SMemAllocatorFactory; + SMemAllocator *(*create)(SMemAllocatorFactory *); + void (*destroy)(SMemAllocatorFactory *, SMemAllocator *); +}; #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/inc/vnodeStateMgr.h b/source/dnode/vnode/impl/inc/vnodeStateMgr.h index a32f682846..788426e25e 100644 --- a/source/dnode/vnode/impl/inc/vnodeStateMgr.h +++ b/source/dnode/vnode/impl/inc/vnodeStateMgr.h @@ -21,6 +21,9 @@ extern "C" { #endif typedef struct { + uint64_t processed; + uint64_t committed; + uint64_t applied; } SVState; #ifdef __cplusplus diff --git a/source/dnode/vnode/impl/src/vnodeBufferPool.c b/source/dnode/vnode/impl/src/vnodeBufferPool.c index eba71b5ba3..07dc56db95 100644 --- a/source/dnode/vnode/impl/src/vnodeBufferPool.c +++ b/source/dnode/vnode/impl/src/vnodeBufferPool.c @@ -19,9 +19,12 @@ #define VNODE_BUF_POOL_SHARDS 3 struct SVBufPool { + // buffer pool impl SList free; SList incycle; SListNode *inuse; + // MAF for submodules + SMemAllocatorFactory maf; }; typedef enum { @@ -49,6 +52,11 @@ typedef struct { SVArenaNode node; } SVArenaAllocator; +typedef struct { + SVnode * pVnode; + SListNode *pNode; +} SVMAWrapper; + typedef struct { T_REF_DECLARE() uint64_t capacity; @@ -59,8 +67,10 @@ typedef struct { }; } SVMemAllocator; -static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); -static void vBufPoolFreeNode(SListNode *pNode); +static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); +static void vBufPoolFreeNode(SListNode *pNode); +static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf); +static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma); int vnodeOpenBufPool(SVnode *pVnode) { uint64_t capacity; @@ -89,6 +99,10 @@ int vnodeOpenBufPool(SVnode *pVnode) { tdListAppendNode(&(pVnode->pBufPool->free), pNode); } + pVnode->pBufPool->maf.impl = pVnode; + pVnode->pBufPool->maf.create = vBufPoolCreateMA; + pVnode->pBufPool->maf.destroy = vBufPoolDestroyMA; + return 0; } @@ -185,4 +199,82 @@ static void vBufPoolFreeNode(SListNode *pNode) { } free(pNode); +} + +static void *vBufPoolMalloc(SMemAllocator *pma, uint64_t size) { + SVMAWrapper * pvmaw = (SVMAWrapper *)(pma->impl); + SVMemAllocator *pvma = (SVMemAllocator *)(pvmaw->pNode->data); + void * ptr = NULL; + + if (pvma->type == E_V_ARENA_ALLOCATOR) { + SVArenaAllocator *pvaa = &(pvma->vaa); + + if (POINTER_DISTANCE(pvaa->inuse->ptr, pvaa->inuse->data) + size > pvaa->inuse->size) { + SVArenaNode *pNode = (SVArenaNode *)malloc(sizeof(*pNode) + MAX(size, pvaa->ssize)); + if (pNode == NULL) { + // TODO: handle error + return NULL; + } + + pNode->prev = pvaa->inuse; + pNode->size = MAX(size, pvaa->ssize); + pNode->ptr = pNode->data; + + pvaa->inuse = pNode; + } + + ptr = pvaa->inuse->ptr; + pvaa->inuse->ptr = POINTER_SHIFT(ptr, size); + } else if (pvma->type == E_V_HEAP_ALLOCATOR) { + /* TODO */ + } + + return NULL; +} + +static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) { + SVnode * pVnode; + SMemAllocator * pma; + SVMemAllocator *pvma; + SVMAWrapper * pvmaw; + + pVnode = (SVnode *)(pmaf->impl); + pma = (SMemAllocator *)calloc(1, sizeof(*pma) + sizeof(SVMAWrapper)); + if (pma == NULL) { + // TODO: handle error + return NULL; + } + pvmaw = (SVMAWrapper *)POINTER_SHIFT(pma, sizeof(*pma)); + + // No allocator used currently + if (pVnode->pBufPool->inuse == NULL) { + while (listNEles(&(pVnode->pBufPool->free)) == 0) { + // TODO: wait until all released ro kill query + // tsem_wait(); + ASSERT(0); + } + + pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free)); + pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); + T_REF_INIT_VAL(pvma, 1); + } else { + pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); + } + + T_REF_INC(pvma); + + pvmaw->pVnode = pVnode; + pvmaw->pNode = pVnode->pBufPool->inuse; + + pma->impl = pvmaw; + pma->malloc = vBufPoolMalloc; + pma->calloc = NULL; /* TODO */ + pma->realloc = NULL; /* TODO */ + pma->free = NULL; /* TODO */ + pma->usage = NULL; /* TODO */ + + return pma; +} + +static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { /* TODO */ } \ No newline at end of file diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 810ac570bc..f858309c66 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -21,46 +21,32 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { } int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { -#if 0 - int reqType; /* TODO */ - size_t reqSize; /* TODO */ - uint64_t reqVersion = 0; /* TODO */ - int code = 0; + // TODO + int code = 0; - // Copy the request to vnode buffer - void *pReq = mMalloc(pVnode->inuse, reqSize); - if (pReq == NULL) { - // TODO: handle error - } - - memcpy(pReq, pMsg, reqSize); - - // Push the request to TQ so consumers can consume - tqPushMsg(pVnode->pTq, pReq, 0); - - // Process the request - switch (reqType) { + switch (pMsg->msgType) { case TSDB_MSG_TYPE_CREATE_TABLE: - code = metaCreateTable(pVnode->pMeta, NULL /* TODO */); + if (metaCreateTable(pVnode->pMeta, pMsg->pCont) < 0) { + /* TODO */ + return -1; + } break; case TSDB_MSG_TYPE_DROP_TABLE: - code = metaDropTable(pVnode->pMeta, 0 /* TODO */); + if (metaDropTable(pVnode->pMeta, pMsg->pCont) < 0) { + /* TODO */ + return -1; + } break; case TSDB_MSG_TYPE_SUBMIT: - /* TODO */ + if (tsdbInsertData(pVnode->pTsdb, pMsg->pCont) < 0) { + /* TODO */ + return -1; + } break; default: break; } - if (vnodeShouldCommit(pVnode)) { - if (vnodeAsyncCommit(pVnode) < 0) { - // TODO: handle error - } - } - - return code; -#endif return 0; }