From e3478a52a58af599988f4cb527198c24369fb6c9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 29 Nov 2021 11:37:26 +0800 Subject: [PATCH] more --- include/dnode/vnode/meta/meta.h | 2 +- include/dnode/vnode/vnode.h | 5 + source/dnode/vnode/impl/inc/vnodeBufferPool.h | 5 +- source/dnode/vnode/impl/inc/vnodeRequest.h | 17 +--- source/dnode/vnode/impl/src/vnodeBufferPool.c | 29 ++++-- source/dnode/vnode/impl/src/vnodeWrite.c | 96 +++++++++++++------ source/dnode/vnode/meta/src/metaTable.c | 3 +- 7 files changed, 103 insertions(+), 54 deletions(-) diff --git a/include/dnode/vnode/meta/meta.h b/include/dnode/vnode/meta/meta.h index 3a8ff3a717..df67b37686 100644 --- a/include/dnode/vnode/meta/meta.h +++ b/include/dnode/vnode/meta/meta.h @@ -74,7 +74,7 @@ typedef struct STbCfg { SMeta *metaOpen(const char *path, const SMetaCfg *pOptions); void metaClose(SMeta *pMeta); void metaRemove(const char *path); -int metaCreateTable(SMeta *pMeta, const void *pReq, const int len); +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); int metaDropTable(SMeta *pMeta, const void *pReq, const int len); int metaCommit(SMeta *pMeta); diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 6092877d0b..84e57c4180 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -131,6 +131,11 @@ void vnodeOptionsInit(SVnodeCfg *pOptions); void vnodeOptionsClear(SVnodeCfg *pOptions); /* ------------------------ REQUESTS ------------------------ */ +typedef struct { + uint64_t ver; + char req[]; +} SVnodeReq; + typedef struct { int err; char info[]; diff --git a/source/dnode/vnode/impl/inc/vnodeBufferPool.h b/source/dnode/vnode/impl/inc/vnodeBufferPool.h index 3033862779..d64dc93847 100644 --- a/source/dnode/vnode/impl/inc/vnodeBufferPool.h +++ b/source/dnode/vnode/impl/inc/vnodeBufferPool.h @@ -25,8 +25,9 @@ extern "C" { typedef struct SVBufPool SVBufPool; -int vnodeOpenBufPool(SVnode *pVnode); -void vnodeCloseBufPool(SVnode *pVnode); +int vnodeOpenBufPool(SVnode *pVnode); +void vnodeCloseBufPool(SVnode *pVnode); +void *vnodeMalloc(SVnode *pVnode, uint64_t size); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/inc/vnodeRequest.h b/source/dnode/vnode/impl/inc/vnodeRequest.h index f2b4ecee91..e7fdff092e 100644 --- a/source/dnode/vnode/impl/inc/vnodeRequest.h +++ b/source/dnode/vnode/impl/inc/vnodeRequest.h @@ -16,23 +16,14 @@ #ifndef _TD_VNODE_REQUEST_H_ #define _TD_VNODE_REQUEST_H_ +#include "vnode.h" + #ifdef __cplusplus extern "C" { #endif -typedef struct SVnodeReq SVnodeReq; -typedef struct SVnodeRsp SVnodeRsp; - -typedef enum {} EVReqT; -typedef enum {} EVRspT; - -struct SVnodeReq { - EVReqT type; -}; - -struct SVnodeRsp { - EVRspT type; -}; +int vnodeBuildCreateTableReq(const SVCreateTableReq *pReq, char *msg, int len); +int vnodeParseCreateTableReq(const char *msg, int len, SVCreateTableReq *pReq); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeBufferPool.c b/source/dnode/vnode/impl/src/vnodeBufferPool.c index 3d8ebfdc47..00203ed9b6 100644 --- a/source/dnode/vnode/impl/src/vnodeBufferPool.c +++ b/source/dnode/vnode/impl/src/vnodeBufferPool.c @@ -71,6 +71,7 @@ static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type); static void vBufPoolFreeNode(SListNode *pNode); static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf); static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma); +static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size); int vnodeOpenBufPool(SVnode *pVnode) { uint64_t capacity; @@ -129,6 +130,24 @@ void vnodeCloseBufPool(SVnode *pVnode) { } } +void *vnodeMalloc(SVnode *pVnode, uint64_t size) { + void *ptr; + + if (pVnode->pBufPool->inuse == NULL) { + SListNode *pNode; + while ((pNode = tdListPopHead(&(pVnode->pBufPool->free))) == NULL) { + // todo + // tsem_wait(); + ASSERT(0); + } + + pVnode->pBufPool->inuse = pNode; + } + + SVMemAllocator *pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data); + return vBufPoolMalloc(pvma, size); +} + /* ------------------------ STATIC METHODS ------------------------ */ static void vArenaAllocatorInit(SVArenaAllocator *pvaa, uint64_t capacity, uint64_t ssize, uint64_t lsize) { /* TODO */ pvaa->ssize = ssize; @@ -201,10 +220,8 @@ static void vBufPoolFreeNode(SListNode *pNode) { free(pNode); } -static void *vBufPoolMalloc(SMemAllocator *pma, uint64_t size) { - SVMAWrapper * pvmaw = (SVMAWrapper *)(pma->impl); - SVMemAllocator *pvma = (SVMemAllocator *)(pvmaw->pNode->data); - void * ptr = NULL; +static void *vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size) { + void *ptr = NULL; if (pvma->type == E_V_ARENA_ALLOCATOR) { SVArenaAllocator *pvaa = &(pvma->vaa); @@ -229,7 +246,7 @@ static void *vBufPoolMalloc(SMemAllocator *pma, uint64_t size) { /* TODO */ } - return NULL; + return ptr; } static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) { @@ -267,7 +284,7 @@ static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) { pvmaw->pNode = pVnode->pBufPool->inuse; pma->impl = pvmaw; - pma->malloc = vBufPoolMalloc; + pma->malloc = NULL; pma->calloc = NULL; /* TODO */ pma->realloc = NULL; /* TODO */ pma->free = NULL; /* TODO */ diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 96eb653d1c..2daee82b69 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -16,13 +16,74 @@ #include "vnodeDef.h" int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SRpcMsg *pReq; - SRpcMsg *pRsp; + SRpcMsg * pMsg; + SVnodeReq *pVnodeReq; - for (size_t i = 0; i < taosArrayGetSize(pMsgs); i++) { - pReq = taosArrayGet(pMsgs, i); + for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { + pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); - vnodeApplyWMsg(pVnode, pReq, &pRsp); + // ser request version + pVnodeReq = (SVnodeReq *)(pMsg->pCont); + pVnodeReq->ver = pVnode->state.processed++; + + if (walWrite(pVnode->pWal, pVnodeReq->ver, pVnodeReq->req, pMsg->contLen - sizeof(pVnodeReq->ver)) < 0) { + // TODO: handle error + } + } + + walFsync(pVnode->pWal, false); + + // Apply each request now + for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { + pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); + pVnodeReq = (SVnodeReq *)(pMsg->pCont); + SVCreateTableReq ctReq; + + // Apply the request + { + void *ptr = vnodeMalloc(pVnode, pMsg->contLen); + if (ptr == NULL) { + // TODO: handle error + } + + memcpy(ptr, pVnodeReq, pMsg->contLen); + + // todo: change the interface here + if (tqPushMsg(pVnode->pTq, pVnodeReq->req, pVnodeReq->ver) < 0) { + // TODO: handle error + } + + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_TABLE: + if (vnodeParseCreateTableReq(pVnodeReq->req, pMsg->contLen - sizeof(pVnodeReq->ver), &(ctReq)) < 0) { + // TODO: handle error + } + + if (metaCreateTable(pVnode->pMeta, &ctReq) < 0) { + // TODO: handle error + } + + // TODO: maybe need to clear the requst struct + break; + case TSDB_MSG_TYPE_DROP_TABLE: + /* code */ + break; + case TSDB_MSG_TYPE_SUBMIT: + /* code */ + break; + default: + break; + } + } + + pVnode->state.applied = pVnodeReq->ver; + + // Check if it needs to commit + if (vnodeShouldCommit(pVnode)) { + if (vnodeAsyncCommit(pVnode) < 0) { + // TODO: handle error + } + } } return 0; @@ -30,31 +91,6 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO - int code = 0; - - switch (pMsg->msgType) { - case TSDB_MSG_TYPE_CREATE_TABLE: - if (metaCreateTable(pVnode->pMeta, pMsg->pCont, pMsg->contLen) < 0) { - /* TODO */ - return -1; - } - break; - case TSDB_MSG_TYPE_DROP_TABLE: - if (metaDropTable(pVnode->pMeta, pMsg->pCont, pMsg->contLen) < 0) { - /* TODO */ - return -1; - } - break; - case TSDB_MSG_TYPE_SUBMIT: - if (tsdbInsertData(pVnode->pTsdb, pMsg->pCont, pMsg->contLen) < 0) { - /* TODO */ - return -1; - } - break; - default: - break; - } - return 0; } diff --git a/source/dnode/vnode/meta/src/metaTable.c b/source/dnode/vnode/meta/src/metaTable.c index f34732a9fe..bed2140f51 100644 --- a/source/dnode/vnode/meta/src/metaTable.c +++ b/source/dnode/vnode/meta/src/metaTable.c @@ -15,8 +15,7 @@ #include "metaDef.h" -int metaCreateTable(SMeta *pMeta, const void *pCont, const int len) { - STbCfg *pTbCfg = NULL; +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { // Validate the tbOptions if (metaValidateTbOptions(pMeta, pTbCfg) < 0) { // TODO: handle error