From a21e0fe75e9a15e7423e2e51e137123b61455bc6 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 20 Apr 2023 10:21:50 +0800 Subject: [PATCH] enh: propose vnode commit synchronously --- include/common/tmsg.h | 2 +- source/dnode/vnode/src/inc/vnd.h | 1 - source/dnode/vnode/src/inc/vnodeInt.h | 4 ---- source/dnode/vnode/src/vnd/vnodeCommit.c | 14 +------------- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 -- source/dnode/vnode/src/vnd/vnodeSync.c | 5 ----- 6 files changed, 2 insertions(+), 26 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb2450e8f7..6d1d3ebce6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) { } static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || - (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); + (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT); } static inline bool syncUtilUserCommit(tmsg_t msgType) { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index ae65e2ba3f..a67f246e73 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit); -void vnodeUpdCommitSched(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 81f7c3d52a..2576fce998 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -378,7 +378,6 @@ struct SVnode { STQ* pTq; SSink* pSink; tsem_t canCommit; - SVCommitSched commitSched; int64_t sync; TdThreadMutex lock; bool blocked; @@ -387,9 +386,6 @@ struct SVnode { int32_t blockSec; int64_t blockSeq; SQHandle* pQuery; -#if 0 - SRpcHandleInfo blockInfo; -#endif }; #define TD_VID(PVNODE) ((PVNODE)->config.vgId) diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 3eb813f394..847125018c 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -143,23 +143,13 @@ _exit: return code; } -void vnodeUpdCommitSched(SVnode *pVnode) { - int64_t randNum = taosRand(); - pVnode->commitSched.commitMs = taosGetMonoTimestampMs(); - pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs); -} - int vnodeShouldCommit(SVnode *pVnode, bool atExit) { - SVCommitSched *pSched = &pVnode->commitSched; - int64_t nowMs = taosGetMonoTimestampMs(); bool diskAvail = osDataSpaceAvailable(); bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { - needCommit = - ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || - ((pVnode->inUse->size > 0) && atExit); + needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit); } taosThreadMutexUnlock(&pVnode->mutex); return needCommit; @@ -431,8 +421,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm); - vnodeUpdCommitSched(pVnode); - // persist wal before starting if (walPersist(pVnode->pWal) < 0) { vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index c7d155be0d..e8f34d27ac 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -286,8 +286,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); - vnodeUpdCommitSched(pVnode); - int8_t rollback = vnodeShouldRollback(pVnode); // open buffer pool diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d4a394b584..fb7f2901a1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak pVnode->blocked = true; pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSeq = seq; -#if 0 - pVnode->blockInfo = pMsg->info; -#endif } taosThreadMutexUnlock(&pVnode->lock); @@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) { } else { tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg); } - - vnodeUpdCommitSched(pVnode); } #if BATCH_ENABLE