diff --git a/source/dnode/vnode/impl/CMakeLists.txt b/source/dnode/vnode/impl/CMakeLists.txt index 3623516624..6972605afd 100644 --- a/source/dnode/vnode/impl/CMakeLists.txt +++ b/source/dnode/vnode/impl/CMakeLists.txt @@ -19,5 +19,5 @@ target_link_libraries( # test if(${BUILD_TEST}) - #add_subdirectory(test) + add_subdirectory(test) endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 56e07aca10..605557d4ea 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -71,6 +71,7 @@ struct SVnode { SWal* pWal; SVnodeSync* pSync; SVnodeFS* pFs; + tsem_t canCommit; }; int vnodeScheduleTask(SVnodeTask* task); diff --git a/source/dnode/vnode/impl/src/vnodeCommit.c b/source/dnode/vnode/impl/src/vnodeCommit.c index 7213e31cb4..f5bf60a7e3 100644 --- a/source/dnode/vnode/impl/src/vnodeCommit.c +++ b/source/dnode/vnode/impl/src/vnodeCommit.c @@ -38,9 +38,10 @@ int vnodeCommit(void *arg) { metaCommit(pVnode->pMeta); tqCommit(pVnode->pTq); - tsdbCommit(pVnode->pTq); + tsdbCommit(pVnode->pTsdb); vnodeBufPoolRecycle(pVnode); + tsem_post(&(pVnode->canCommit)); // TODO return 0; } diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index 70d9c7d4b0..c98f3e0800 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -74,11 +74,14 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); + tsem_init(&(pVnode->canCommit), 0, 1); + return pVnode; } static void vnodeFree(SVnode *pVnode) { if (pVnode) { + tsem_destroy(&(pVnode->canCommit)); tfree(pVnode->path); free(pVnode); } diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 85e044266a..9a4efbda4c 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -31,14 +31,14 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { } int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SRpcMsg *pMsg; + SRpcMsg * pMsg; SVnodeReq *pVnodeReq; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); // ser request version - void *pBuf = pMsg->pCont; + void * pBuf = pMsg->pCont; int64_t ver = pVnode->state.processed++; taosEncodeFixedU64(&pBuf, ver); @@ -52,65 +52,64 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { // Apply each request now for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); - SVnodeReq vReq; - // Apply the request - { - void *ptr = vnodeMalloc(pVnode, pMsg->contLen); - if (ptr == NULL) { - // TODO: handle error - } - - // TODO: copy here need to be extended - memcpy(ptr, pMsg->pCont, pMsg->contLen); - - // todo: change the interface here - uint64_t ver; - taosDecodeFixedU64(pMsg->pCont, &ver); - if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { - // TODO: handle error - } - - vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType); - - switch (pMsg->msgType) { - case TSDB_MSG_TYPE_CREATE_TABLE: - if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) { - // TODO: handle error - } - - // TODO: maybe need to clear the requst struct - break; - case TSDB_MSG_TYPE_DROP_TABLE: - if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { - // TODO: handle error - } - break; - case TSDB_MSG_TYPE_SUBMIT: - if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) { - // TODO: handle error - } - break; - default: - break; - } - - pVnode->state.applied = ver; - } - - // Check if it needs to commit - if (vnodeShouldCommit(pVnode)) { - if (vnodeAsyncCommit(pVnode) < 0) { - // TODO: handle error - } - } + // TODO: Now we just need a + vnodeApplyWMsg(pVnode, pMsg, NULL); } return 0; } int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - // TODO + SVnodeReq vReq; + void * ptr = vnodeMalloc(pVnode, pMsg->contLen); + if (ptr == NULL) { + // TODO: handle error + } + + // TODO: copy here need to be extended + memcpy(ptr, pMsg->pCont, pMsg->contLen); + + // todo: change the interface here + uint64_t ver; + taosDecodeFixedU64(pMsg->pCont, &ver); + if (tqPushMsg(pVnode->pTq, ptr, ver) < 0) { + // TODO: handle error + } + + vnodeParseReq(pMsg->pCont, &vReq, pMsg->msgType); + + switch (pMsg->msgType) { + case TSDB_MSG_TYPE_CREATE_TABLE: + if (metaCreateTable(pVnode->pMeta, &(vReq.ctReq)) < 0) { + // TODO: handle error + } + + // TODO: maybe need to clear the requst struct + break; + case TSDB_MSG_TYPE_DROP_TABLE: + if (metaDropTable(pVnode->pMeta, vReq.dtReq.uid) < 0) { + // TODO: handle error + } + break; + case TSDB_MSG_TYPE_SUBMIT: + if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) { + // TODO: handle error + } + break; + default: + break; + } + + pVnode->state.applied = ver; + + // Check if it needs to commit + if (vnodeShouldCommit(pVnode)) { + tsem_wait(&(pVnode->canCommit)); + if (vnodeAsyncCommit(pVnode) < 0) { + // TODO: handle error + } + } return 0; }