From d8386754eeb6a2411e2f51ab7958d320b073cb97 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 11 Apr 2023 14:16:31 +0800 Subject: [PATCH] enh: try to propose vnode commit at vnode closing --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 6 +++++ source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/inc/vnd.h | 3 +-- source/dnode/vnode/src/vnd/vnodeCommit.c | 5 +++-- source/dnode/vnode/src/vnd/vnodeSync.c | 28 ++++++++++++++---------- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 4c5b1246e7..0244a4fd6e 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +#include "vnd.h" SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pVnode = NULL; @@ -78,6 +79,11 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) { char path[TSDB_FILENAME_LEN] = {0}; + bool atExit = true; + + if (vnodeIsLeader(pVnode->pImpl)) { + vnodeProposeCommitOnNeed(pVnode->pImpl, atExit); + } taosThreadRwlockWrlock(&pMgmt->lock); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a9e5fe628b..7ecfadf728 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -92,7 +92,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); -void vnodeProposeCommitOnNeed(SVnode *pVnode); +void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); // meta typedef struct SMeta SMeta; // todo: remove diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 134909090f..ae65e2ba3f 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -96,7 +96,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); -int32_t vnodeShouldCommit(SVnode* pVnode); +int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit); void vnodeUpdCommitSched(SVnode* pVnode); void vnodeRollback(SVnode* pVnode); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); @@ -115,7 +115,6 @@ void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode); -int vnodeShouldCommit(SVnode* pVnode); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 09f1ca7877..3eb813f394 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -149,7 +149,7 @@ void vnodeUpdCommitSched(SVnode *pVnode) { pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs); } -int vnodeShouldCommit(SVnode *pVnode) { +int vnodeShouldCommit(SVnode *pVnode, bool atExit) { SVCommitSched *pSched = &pVnode->commitSched; int64_t nowMs = taosGetMonoTimestampMs(); bool diskAvail = osDataSpaceAvailable(); @@ -158,7 +158,8 @@ int vnodeShouldCommit(SVnode *pVnode) { taosThreadMutexLock(&pVnode->mutex); if (pVnode->inUse && diskAvail) { needCommit = - ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)); + ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || + ((pVnode->inUse->size > 0) && atExit); } taosThreadMutexUnlock(&pVnode->mutex); return needCommit; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d681f5b65e..2b4eff08d3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -129,8 +129,8 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak return code; } -void vnodeProposeCommitOnNeed(SVnode *pVnode) { - if (!vnodeShouldCommit(pVnode)) { +void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) { + if (!vnodeShouldCommit(pVnode, atExit)) { return; } @@ -145,18 +145,20 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode) { rpcMsg.pCont = pHead; rpcMsg.info.noResp = 1; + vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId); bool isWeak = false; - if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) { - vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr()); - goto _out; + + if (!atExit) { + if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) { + vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr()); + } + rpcFreeCont(rpcMsg.pCont); + rpcMsg.pCont = NULL; + } else { + tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg); } - vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId); - -_out: vnodeUpdCommitSched(pVnode); - rpcFreeCont(rpcMsg.pCont); - rpcMsg.pCont = NULL; } #if BATCH_ENABLE @@ -236,7 +238,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) continue; } - vnodeProposeCommitOnNeed(pVnode); + bool atExit = false; + vnodeProposeCommitOnNeed(pVnode, atExit); code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { @@ -288,7 +291,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) continue; } - vnodeProposeCommitOnNeed(pVnode); + bool atExit = false; + vnodeProposeCommitOnNeed(pVnode, atExit); code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) {