diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 75976c58b5..055120568e 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -72,6 +72,7 @@ int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit ==================== int vnodeBegin(SVnode* pVnode); int vnodeShouldCommit(SVnode* pVnode); +int vnodeCommit(SVnode* pVnode); int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 06aa48f6f2..246294d72f 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -27,3 +27,5 @@ int metaBegin(SMeta *pMeta) { return 0; } + +int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 60fa2c8aae..b8d2f3001d 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -22,7 +22,7 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeStartCommit(SVnode *pVnode); static int vnodeEndCommit(SVnode *pVnode); -static int vnodeCommit(void *arg); +static int vnodeCommitImpl(void *arg); static void vnodeWaitCommit(SVnode *pVnode); int vnodeBegin(SVnode *pVnode) { @@ -55,13 +55,7 @@ int vnodeBegin(SVnode *pVnode) { return 0; } -int vnodeShouldCommit(SVnode *pVnode) { - if (pVnode->inUse->size > pVnode->config.szBuf / 3) { - return 1; - } - - return 0; -} +int vnodeShouldCommit(SVnode *pVnode) { return pVnode->inUse->size > pVnode->config.szBuf / 3; } int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { char fname[TSDB_FILENAME_LEN]; @@ -183,7 +177,7 @@ int vnodeAsyncCommit(SVnode *pVnode) { // vnodeBufPoolSwitch(pVnode); tsdbPrepareCommit(pVnode->pTsdb); - vnodeScheduleTask(vnodeCommit, pVnode); + vnodeScheduleTask(vnodeCommitImpl, pVnode); return 0; } @@ -195,7 +189,53 @@ int vnodeSyncCommit(SVnode *pVnode) { return 0; } -static int vnodeCommit(void *arg) { +int vnodeCommit(SVnode *pVnode) { + SVnodeInfo info; + char dir[TSDB_FILENAME_LEN]; + + pVnode->onCommit = pVnode->inUse; + pVnode->inUse = NULL; + + // save info + info.config = pVnode->config; + info.state = pVnode->state; + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); + if (vnodeSaveInfo(dir, &info) < 0) { + ASSERT(0); + return -1; + } + + // commit each sub-system + if (metaCommit(pVnode->pMeta) < 0) { + ASSERT(0); + return -1; + } + if (tsdbCommit(pVnode->pTsdb) < 0) { + ASSERT(0); + return -1; + } + if (tqCommit(pVnode->pTq) < 0) { + ASSERT(0); + return -1; + } + // walCommit (TODO) + + // commit info + if (vnodeCommitInfo(dir, &info) < 0) { + ASSERT(0); + return -1; + } + + // apply the commit (TODO) + vnodeBufPoolReset(pVnode->onCommit); + pVnode->onCommit->next = pVnode->pPool; + pVnode->pPool = pVnode->onCommit; + pVnode->onCommit = NULL; + + return 0; +} + +static int vnodeCommitImpl(void *arg) { SVnode *pVnode = (SVnode *)arg; // metaCommit(pVnode->pMeta); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7586d9be35..82775f9e70 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -136,12 +136,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg vDebug("vgId: %d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); - // Check if it needs to commit + // commit if need if (vnodeShouldCommit(pVnode)) { - // tsem_wait(&(pVnode->canCommit)); - if (vnodeAsyncCommit(pVnode) < 0) { - // TODO: handle error - } + // commit current change + vnodeCommit(pVnode); + + // start a new one + vnodeBegin(pVnode); } return 0;