From 92e05b8ece2075e907c935d81b9a8958d7401a1f Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 27 Dec 2022 16:28:44 +0800 Subject: [PATCH] enh: schedule vnodeCommit uniformly distributed --- source/dnode/vnode/src/inc/vnd.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 7 ++++++- source/dnode/vnode/src/vnd/vnodeCommit.c | 13 ++++++++++--- source/dnode/vnode/src/vnd/vnodeOpen.c | 3 ++- source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- 5 files changed, 20 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index d8c4b001b1..24821a3a61 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -86,6 +86,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode); +void vnodeUpdCommitSched(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1b84fb0578..75367883f1 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -332,6 +332,11 @@ struct STsdbKeepCfg { int32_t keep2; }; +typedef struct SVCommitSched { + int64_t commitMs; + int64_t maxWaitMs; +} SVCommitSched; + struct SVnode { char* path; SVnodeCfg config; @@ -350,7 +355,7 @@ struct SVnode { STQ* pTq; SSink* pSink; tsem_t canCommit; - int64_t commitMs; + SVCommitSched commitSched; int64_t sync; TdThreadMutex lock; bool blocked; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 4f63d6e043..3738966122 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -58,15 +58,22 @@ int vnodeBegin(SVnode *pVnode) { return 0; } +void vnodeUpdCommitSched(SVnode *pVnode) { + int64_t randNum = taosRand(); + pVnode->commitSched.commitMs = taosGetMonoTimestampMs(); + pVnode->commitSched.maxWaitMs = SYNC_VND_COMMIT_MAX_MS + (randNum % SYNC_VND_COMMIT_MAX_MS); +} + int vnodeShouldCommit(SVnode *pVnode) { if (!pVnode->inUse || !osDataSpaceAvailable()) { return false; } + SVCommitSched *pSched = &pVnode->commitSched; int64_t nowMs = taosGetMonoTimestampMs(); - return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pVnode->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || - (pVnode->inUse->size > 0 && pVnode->commitMs + SYNC_VND_COMMIT_MAX_MS < nowMs)); + return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || + (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); } int vnodeShouldCommitOld(SVnode *pVnode) { @@ -306,7 +313,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm); - pVnode->commitMs = taosGetMonoTimestampMs(); + vnodeUpdCommitSched(pVnode); // persist wal before starting if (walPersist(pVnode->pWal) < 0) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index f2973c188e..edbec0d044 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -142,7 +142,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->path = (char *)&pVnode[1]; strcpy(pVnode->path, path); pVnode->config = info.config; - pVnode->commitMs = taosGetMonoTimestampMs(); pVnode->state.committed = info.state.committed; pVnode->state.commitTerm = info.state.commitTerm; pVnode->state.commitID = info.state.commitID; @@ -158,6 +157,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL); + vnodeUpdCommitSched(pVnode); + int8_t rollback = vnodeShouldRollback(pVnode); // open buffer pool diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index eaea4102a2..a1dfeb9728 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -154,7 +154,7 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode) { vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId); _out: - pVnode->commitMs = taosGetMonoTimestampMs(); + vnodeUpdCommitSched(pVnode); rpcFreeCont(rpcMsg.pCont); rpcMsg.pCont = NULL; }